/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.security.kafka.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import inc.yukawa.chain.base.mono.dao.MonoLoadDao;
import inc.yukawa.chain.base.mono.dao.MonoReadDao;
import inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao;
import inc.yukawa.chain.kafka.util.KafkaUtil;
import inc.yukawa.chain.kafka.util.MetadataService;
import inc.yukawa.chain.kafka.util.StreamUtil;
import inc.yukawa.chain.security.domain.Account;
import inc.yukawa.chain.security.domain.AccountStatus;
import inc.yukawa.chain.security.domain.Credentials;
import inc.yukawa.chain.security.filter.AccountFilter;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class AccountLoadDao2
extends KafkaStreamsDao
implements MonoLoadDao<String, Account>,
MonoReadDao<String, Account, AccountFilter> {
    private static final Logger log = LoggerFactory.getLogger(AccountLoadDao2.class);
    private final ObjectMapper mapper;
    @Value(value="${chain.security.account.topic}")
    private String accountTopic;
    @Value(value="${chain.security.account.store:accounts}")
    private String accountsStoreName;
    private MetadataService metadataService;

    public AccountLoadDao2(Properties streamProps, ObjectMapper mapper) {
        super(streamProps);
        this.mapper = mapper;
    }

    public void start() {
        super.start();
        this.metadataService = new MetadataService(this.streams);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AccountLoadDao{");
        sb.append("accountTopic='").append(this.accountTopic).append('\'');
        sb.append('}');
        return sb.toString();
    }

    protected StreamsBuilder buildStreams() {
        StreamsBuilder builder = new StreamsBuilder();
        Serde keySerDes = Serdes.String();
        Serde accountSer = KafkaUtil.getSerDes(Account.class, (boolean)false, (ObjectMapper)this.mapper);
        HashMap<String, String> cfg = new HashMap<String, String>();
        cfg.put("spring.json.type.mapping", "inc.accentro.integration.auth.core.domain.Credentials:inc.yukawa.chain.security.domain.Credentials,inc.accentro.integration.auth.core.domain.Account:inc.yukawa.chain.security.domain.Account");
        accountSer.configure(cfg, false);
        Materialized accountStore = Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)this.accountsStoreName)).withKeySerde(keySerDes).withValueSerde(accountSer);
        builder.stream(this.accountTopic, Consumed.with((Serde)Serdes.String(), (Serde)accountSer)).mapValues((readOnlyKey, value) -> {
            if (value != null) {
                return value;
            }
            return new Account(new Credentials(readOnlyKey, null), null, new AccountStatus(Boolean.valueOf(false), Boolean.valueOf(true), Boolean.valueOf(true), Boolean.valueOf(true)));
        }).groupByKey().reduce((ov, nv) -> {
            log.debug("buildStreams: reduce\n\tov: {}\n\tnv: {}", ov, nv);
            if (ov == null) {
                return nv;
            }
            if (nv.getCredentials() != null && nv.getCredentials().getPassword() != null) {
                ov.setCredentials(nv.getCredentials());
            }
            if (nv.getRoles() != null) {
                ov.setRoles(nv.getRoles());
            }
            if (nv.getRoleContexts() != null) {
                ov.setRoleContexts(nv.getRoleContexts());
            }
            if (nv.getStatus() != null) {
                ov.setStatus(nv.getStatus());
            }
            if (nv.getDetails() != null) {
                ov.setDetails(nv.getDetails());
            }
            return ov;
        }, accountStore);
        return builder;
    }

    public Mono<Account> load(String key) {
        log.debug("Load: {} from {} of {}", new Object[]{key, this.accountsStoreName, this.accountTopic});
        return Mono.fromSupplier(this::waitForAccountsStore).filter(s -> s.get((Object)key) != null).map(s -> (Account)s.get((Object)key));
    }

    public Flux<Account> find(AccountFilter filter) {
        return Flux.create(fluxSink -> {
            this.waitForAccountsStore().all().forEachRemaining(kv -> fluxSink.next((Object)((Account)kv.value)));
            fluxSink.complete();
        });
    }

    protected ReadOnlyKeyValueStore<String, Account> waitForAccountsStore() {
        QueryableStoreType st = QueryableStoreTypes.keyValueStore();
        return (ReadOnlyKeyValueStore)StreamUtil.waitUntilStoreIsQueryable((String)this.accountsStoreName, (QueryableStoreType)st, (KafkaStreams)this.streams, (long)this.timeout);
    }

    public Flux<HostStoreInfo> meta() {
        return Flux.fromStream(this.metadataService.streamsMetadata().stream());
    }
}

