/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.dao.mono;

import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import inc.yukawa.chain.kafka.dao.mono.DistributedKafkaStoreLoadDao;
import inc.yukawa.chain.kafka.dao.mono.KafkaMonoReadDao;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;

public abstract class DistributedKafkaStoreReadDao<K, V, F>
extends DistributedKafkaStoreLoadDao<K, V>
implements KafkaMonoReadDao<K, V, F> {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedKafkaStoreReadDao.class);
    protected final ParameterizedTypeReference<KeyValue<K, V>> keyValueType = new ParameterizedTypeReference<KeyValue<K, V>>(){};

    public DistributedKafkaStoreReadDao(WebClient webClient, String storeName, Properties streamProps, Serializer<K> keySerializer) {
        super(webClient, storeName, streamProps, keySerializer);
    }

    public Flux<KeyValue<K, V>> findLocalKeyValues(F filter) {
        LOG.debug("[{}]: local call find: {}", (Object)this.storeName, filter);
        KeyValueIterator all = this.getKeyValueStore().all();
        return Flux.fromIterable(() -> all).doOnTerminate(() -> ((KeyValueIterator)all).close()).filter(Objects::nonNull).filter(this.asPredicate(filter));
    }

    @Override
    public Flux<KeyValue<K, V>> findKeyValues(F filter) {
        List<HostStoreInfo> hostStoreInfos = this.metaService.streamsMetadataForStore(this.storeName);
        return Flux.merge((Iterable)hostStoreInfos.stream().map(hostInfo -> this.findKeyValues((HostStoreInfo)hostInfo, filter)).collect(Collectors.toList()));
    }

    protected Flux<KeyValue<K, V>> findKeyValues(HostStoreInfo hostInfo, F filter) {
        if (this.isLocal(hostInfo)) {
            return this.findLocalKeyValues(filter);
        }
        return this.findRemoteKeyValues(hostInfo, filter);
    }

    protected Flux<KeyValue<K, V>> findRemoteKeyValues(HostStoreInfo hostInfo, F filter) {
        String url = this.findFromRemoteUrl(hostInfo, filter).build().toUriString();
        LOG.debug("[{}]: remote call for find: {} using url: {}", new Object[]{this.storeName, filter, url});
        return ((WebClient.RequestBodySpec)this.webClient.post().uri(url, new Object[0])).bodyValue(filter).retrieve().bodyToFlux(this.keyValueType);
    }

    protected UriComponentsBuilder findFromRemoteUrl(HostStoreInfo hostInfo, F filter) {
        return UriComponentsBuilder.newInstance().scheme(this.scheme).host(hostInfo.getHost()).port(hostInfo.getPort()).path(hostInfo.getCtxp()).path(this.findPath());
    }

    protected abstract String findPath();
}

