package inc.yukawa.chain.kafka.dao.mono;

import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import inc.yukawa.chain.kafka.util.MetadataService;
import inc.yukawa.chain.kafka.util.StreamUtil;
import jakarta.annotation.PostConstruct;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

/* loaded from: input_file:inc/yukawa/chain/kafka/dao/mono/DistributedKafkaStoreLoadDao.class */
public abstract class DistributedKafkaStoreLoadDao<K, V> extends KafkaStoreLoadDao<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedKafkaStoreLoadDao.class);
    protected Serializer<K> keySerializer;
    protected WebClient webClient;
    protected MetadataService metaService;
    protected final ParameterizedTypeReference<V> type;

    @Value("${server.servlet.scheme:http}")
    protected String scheme;

    @Value("${server.port}")
    protected int port;

    @Value("${server.host:auto}")
    protected String host;

    public DistributedKafkaStoreLoadDao(WebClient webClient, String str, Properties properties, Serializer<K> serializer) {
        super(str, properties);
        this.type = new ParameterizedTypeReference<V>() { // from class: inc.yukawa.chain.kafka.dao.mono.DistributedKafkaStoreLoadDao.1
        };
        this.scheme = "http";
        this.webClient = webClient;
        this.keySerializer = serializer;
    }

    @PostConstruct
    public void initHost() {
        this.host = StreamUtil.initHostAddress(this.host);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao
    public KafkaStreams initStreams(Topology topology, Properties properties) {
        KafkaStreams initStreams = super.initStreams(topology, properties);
        this.metaService = new MetadataService(initStreams);
        return initStreams;
    }

    @Override // inc.yukawa.chain.kafka.dao.mono.KafkaStoreLoadDao
    public Mono<V> load(K k) {
        HostStoreInfo streamsMetadataForStoreAndKey = this.metaService.streamsMetadataForStoreAndKey(this.storeName, k, this.keySerializer);
        if (isUnavailable(streamsMetadataForStoreAndKey)) {
            LOG.warn("[{}]: unavailable key: {}", this.storeName, k);
            return Mono.error(new WebClientResponseException(HttpStatus.SERVICE_UNAVAILABLE.value(), "No host assigned to partition with key: " + String.valueOf(k), (HttpHeaders) null, (byte[]) null, (Charset) null));
        }
        if (isLocal(streamsMetadataForStoreAndKey)) {
            LOG.debug("[{}]: local call for key: {}", this.storeName, k);
            return super.load((DistributedKafkaStoreLoadDao<K, V>) k);
        }
        String uriString = loadFromRemoteUrl(streamsMetadataForStoreAndKey, k).build().toUriString();
        LOG.debug("[{}]: remote call for key: {} using url: {}", new Object[]{this.storeName, k, uriString});
        return this.webClient.get().uri(uriString, new Object[0]).retrieve().bodyToMono(this.type);
    }

    protected UriComponentsBuilder loadFromRemoteUrl(HostStoreInfo hostStoreInfo, K k) {
        return UriComponentsBuilder.newInstance().scheme(this.scheme).host(hostStoreInfo.getHost()).port(hostStoreInfo.getPort()).path(hostStoreInfo.getCtxp()).path(loadPath()).queryParam(keyParam(), new Object[]{k});
    }

    protected abstract String loadPath();

    protected String keyParam() {
        return "id";
    }

    protected boolean isUnavailable(HostStoreInfo hostStoreInfo) {
        return HostInfo.unavailable().host().equals(hostStoreInfo.getHost()) && HostInfo.unavailable().port() == hostStoreInfo.getPort();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isLocal(HostStoreInfo hostStoreInfo) {
        return hostStoreInfo.getHost().equals(this.host) && hostStoreInfo.getPort() == this.port;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public String getHost() {
        return this.host;
    }

    public void setHost(String str) {
        this.host = str;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // inc.yukawa.chain.kafka.dao.mono.KafkaStoreLoadDao
    /* renamed from: load, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object mo3load(Object obj) {
        return load((DistributedKafkaStoreLoadDao<K, V>) obj);
    }
}
