/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.dao.mono;

import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import inc.yukawa.chain.kafka.dao.mono.KafkaStoreLoadDao;
import inc.yukawa.chain.kafka.util.MetadataService;
import inc.yukawa.chain.kafka.util.StreamUtil;
import jakarta.annotation.PostConstruct;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

public abstract class DistributedKafkaStoreLoadDao<K, V>
extends KafkaStoreLoadDao<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedKafkaStoreLoadDao.class);
    protected Serializer<K> keySerializer;
    protected WebClient webClient;
    protected MetadataService metaService;
    protected final ParameterizedTypeReference<V> type = new ParameterizedTypeReference<V>(){};
    @Value(value="${server.servlet.scheme:http}")
    protected String scheme = "http";
    @Value(value="${server.port}")
    protected int port;
    @Value(value="${server.host:auto}")
    protected String host;

    public DistributedKafkaStoreLoadDao(WebClient webClient, String storeName, Properties streamProps, Serializer<K> keySerializer) {
        super(storeName, streamProps);
        this.webClient = webClient;
        this.keySerializer = keySerializer;
    }

    @PostConstruct
    public void initHost() {
        this.host = StreamUtil.initHostAddress(this.host);
    }

    @Override
    protected KafkaStreams initStreams(Topology topology, Properties props) {
        KafkaStreams kafkaStreams = super.initStreams(topology, props);
        this.metaService = new MetadataService(kafkaStreams);
        return kafkaStreams;
    }

    @Override
    public Mono<V> load(K key) {
        HostStoreInfo hostInfo = this.metaService.streamsMetadataForStoreAndKey(this.storeName, key, this.keySerializer);
        if (this.isUnavailable(hostInfo)) {
            LOG.warn("[{}]: unavailable key: {}", (Object)this.storeName, key);
            return Mono.error((Throwable)new WebClientResponseException(HttpStatus.SERVICE_UNAVAILABLE.value(), "No host assigned to partition with key: " + String.valueOf(key), null, null, null));
        }
        if (this.isLocal(hostInfo)) {
            LOG.debug("[{}]: local call for key: {}", (Object)this.storeName, key);
            return super.load(key);
        }
        String url = this.loadFromRemoteUrl(hostInfo, key).build().toUriString();
        LOG.debug("[{}]: remote call for key: {} using url: {}", new Object[]{this.storeName, key, url});
        return this.webClient.get().uri(url, new Object[0]).retrieve().bodyToMono(this.type);
    }

    protected UriComponentsBuilder loadFromRemoteUrl(HostStoreInfo hostInfo, K key) {
        return UriComponentsBuilder.newInstance().scheme(this.scheme).host(hostInfo.getHost()).port(hostInfo.getPort()).path(hostInfo.getCtxp()).path(this.loadPath()).queryParam(this.keyParam(), new Object[]{key});
    }

    protected abstract String loadPath();

    protected String keyParam() {
        return "id";
    }

    protected boolean isUnavailable(HostStoreInfo hostWithKey) {
        return HostInfo.unavailable().host().equals(hostWithKey.getHost()) && HostInfo.unavailable().port() == hostWithKey.getPort();
    }

    protected boolean isLocal(HostStoreInfo hostInfo) {
        return hostInfo.getHost().equals(this.host) && hostInfo.getPort() == this.port;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getHost() {
        return this.host;
    }

    public void setHost(String host) {
        this.host = host;
    }
}

