package inc.chaos.ally.kafka.dao;

import inc.chaos.ally.kafka.KafkaStreamsCycle;
import inc.chaos.ally.kafka.streams.StreamUtil;
import inc.chaos.ally.kafka.streams.meta.HostStoreInfo;
import inc.chaos.ally.kafka.streams.meta.MetadataService;
import java.util.Optional;
import javax.ws.rs.client.Client;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Deprecated
/* loaded from: input_file:inc/chaos/ally/kafka/dao/KafkaStoreReaderBase.class */
public class KafkaStoreReaderBase<K, V, F> implements KafkaStreamsCycle {
    protected static final Logger log = LoggerFactory.getLogger(KafkaStoreReaderBase.class);
    protected String storeName;
    protected KafkaStreams streams;
    protected MetadataService metadataService;
    protected HostStoreInfo thisHost;
    protected Serializer<K> keySerial;
    protected Client apiClient;
    protected long timeout = 30000;
    protected String urlPattern = "http://%s:%d%s%s/%s";
    protected String apiPath = "";

    /* JADX INFO: Access modifiers changed from: protected */
    public K toKey(F f) {
        throw new UnsupportedOperationException("KafkaStoreReaderBase.toKey: @toDo");
    }

    protected F toFilter(K k) {
        throw new UnsupportedOperationException("KafkaStoreReaderBase.toFilter: @toDo");
    }

    protected ReadOnlyKeyValueStore<K, V> getKeyValueStore(String str) {
        return (ReadOnlyKeyValueStore) StreamUtil.waitUntilStoreIsQueryable(str, QueryableStoreTypes.keyValueStore(), this.streams, this.timeout);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<ReadOnlyKeyValueStore<K, V>> findKeyValueStore(String str) {
        return Mono.justOrEmpty(getKeyValueStore(str));
    }

    protected Mono<HostStoreInfo> monoHostForKey(K k, String str) {
        return Mono.just(this.metadataService.streamsMetadataForStoreAndKey(str, k, this.keySerial));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Optional<HostStoreInfo>> otherHostForKey(K k, String str) {
        return monoHostForKey(k, str).map(hostStoreInfo -> {
            log.debug("store[{}] {} {}", new Object[]{str, k, hostStoreInfo});
            if (hostStoreInfo == null) {
                throw new InvalidStateStoreException("store " + str + " not found");
            }
            return (this.thisHost.getHost().equals(hostStoreInfo.getHost()) && this.thisHost.getPort() == hostStoreInfo.getPort()) ? Optional.empty() : Optional.of(hostStoreInfo);
        });
    }

    @Override // inc.chaos.ally.kafka.StreamsCycle
    public void initStreams(KafkaStreams kafkaStreams) {
        this.streams = kafkaStreams;
        this.metadataService = new MetadataService(this.streams);
    }

    @Override // inc.chaos.ally.kafka.StreamsCycle
    public StreamsBuilder buildStreams(StreamsBuilder streamsBuilder) {
        return streamsBuilder;
    }

    protected String formatHostUrl(K k, HostStoreInfo hostStoreInfo, String str) {
        return String.format(this.urlPattern, hostStoreInfo.getHost(), Integer.valueOf(hostStoreInfo.getPort()), this.thisHost.getCtxp(), str, k);
    }

    public HostStoreInfo getThisHost() {
        return this.thisHost;
    }

    public void setThisHost(HostStoreInfo hostStoreInfo) {
        this.thisHost = hostStoreInfo;
    }
}
