package inc.yukawa.chain.kafka.dao.mono;

import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;

/* loaded from: input_file:inc/yukawa/chain/kafka/dao/mono/DistributedKafkaStoreReadDao.class */
public abstract class DistributedKafkaStoreReadDao<K, V, F> extends DistributedKafkaStoreLoadDao<K, V> implements KafkaMonoReadDao<K, V, F> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedKafkaStoreReadDao.class);
    protected final ParameterizedTypeReference<KeyValue<K, V>> keyValueType;

    public DistributedKafkaStoreReadDao(WebClient webClient, String str, Properties properties, Serializer<K> serializer) {
        super(webClient, str, properties, serializer);
        this.keyValueType = new ParameterizedTypeReference<KeyValue<K, V>>() { // from class: inc.yukawa.chain.kafka.dao.mono.DistributedKafkaStoreReadDao.1
        };
    }

    public Flux<KeyValue<K, V>> findLocalKeyValues(F f) {
        LOG.debug("[{}]: local call find: {}", this.storeName, f);
        KeyValueIterator all = getKeyValueStore().all();
        Flux fromIterable = Flux.fromIterable(() -> {
            return all;
        });
        Objects.requireNonNull(all);
        return fromIterable.doOnTerminate(all::close).filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(asPredicate(f));
    }

    @Override // inc.yukawa.chain.kafka.dao.mono.KafkaMonoReadDao
    public Flux<KeyValue<K, V>> findKeyValues(F f) {
        return Flux.merge((Iterable) this.metaService.streamsMetadataForStore(this.storeName).stream().map(hostStoreInfo -> {
            return findKeyValues(hostStoreInfo, f);
        }).collect(Collectors.toList()));
    }

    protected Flux<KeyValue<K, V>> findKeyValues(HostStoreInfo hostStoreInfo, F f) {
        return isLocal(hostStoreInfo) ? findLocalKeyValues(f) : findRemoteKeyValues(hostStoreInfo, f);
    }

    protected Flux<KeyValue<K, V>> findRemoteKeyValues(HostStoreInfo hostStoreInfo, F f) {
        String uriString = findFromRemoteUrl(hostStoreInfo, f).build().toUriString();
        LOG.debug("[{}]: remote call for find: {} using url: {}", new Object[]{this.storeName, f, uriString});
        return this.webClient.post().uri(uriString, new Object[0]).bodyValue(f).retrieve().bodyToFlux(this.keyValueType);
    }

    protected UriComponentsBuilder findFromRemoteUrl(HostStoreInfo hostStoreInfo, F f) {
        return UriComponentsBuilder.newInstance().scheme(this.scheme).host(hostStoreInfo.getHost()).port(hostStoreInfo.getPort()).path(hostStoreInfo.getCtxp()).path(findPath());
    }

    protected abstract String findPath();
}
