package inc.yukawa.chain.security.kafka.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import inc.yukawa.chain.base.core.domain.info.HostStoreInfo;
import inc.yukawa.chain.base.mono.dao.MonoLoadDao;
import inc.yukawa.chain.base.mono.dao.MonoReadDao;
import inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao;
import inc.yukawa.chain.kafka.util.KafkaUtil;
import inc.yukawa.chain.kafka.util.MetadataService;
import inc.yukawa.chain.kafka.util.StreamUtil;
import inc.yukawa.chain.security.domain.Account;
import inc.yukawa.chain.security.domain.Credentials;
import inc.yukawa.chain.security.filter.AccountFilter;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.support.serializer.JsonSerde;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:inc/yukawa/chain/security/kafka/dao/AccountLoadDao2.class */
public class AccountLoadDao2 extends KafkaStreamsDao implements MonoLoadDao<String, Account>, MonoReadDao<String, Account, AccountFilter> {
    private static final Logger log = LoggerFactory.getLogger(AccountLoadDao2.class);
    private final ObjectMapper mapper;

    @Value("${chain.security.account.topic}")
    private String accountTopic;

    @Value("${chain.security.account.store:accounts}")
    private String accountsStoreName;
    private MetadataService metadataService;

    public AccountLoadDao2(Properties properties, ObjectMapper objectMapper) {
        super(properties);
        this.mapper = objectMapper;
    }

    public void start() {
        super.start();
        this.metadataService = new MetadataService(this.streams);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AccountLoadDao{");
        sb.append("accountTopic='").append(this.accountTopic).append('\'');
        sb.append('}');
        return sb.toString();
    }

    protected StreamsBuilder buildStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Serde String = Serdes.String();
        Serde serDes = KafkaUtil.getSerDes(Account.class, false, this.mapper);
        HashMap hashMap = new HashMap();
        hashMap.put("spring.json.type.mapping", "inc.accentro.integration.auth.core.domain.Credentials:inc.yukawa.chain.security.domain.Credentials,inc.accentro.integration.auth.core.domain.Account:inc.yukawa.chain.security.domain.Account");
        serDes.configure(hashMap, false);
        KafkaUtil.getSerDes(Credentials.class, false, this.mapper).configure(hashMap, false);
        new JsonSerde(Set.class);
        streamsBuilder.stream(this.accountTopic, Consumed.with(Serdes.String(), serDes)).groupByKey().reduce((account, account2) -> {
            if (account == null) {
                return account2;
            }
            if (account2.getCredentials() != null && account2.getCredentials().getPassword() != null) {
                account.setCredentials(account2.getCredentials());
            }
            if (account2.getRoles() != null) {
                account.setRoles(account2.getRoles());
            }
            if (account2.getStatus() != null) {
                account.setStatus(account2.getStatus());
            }
            return account;
        }, Materialized.as(Stores.inMemoryKeyValueStore(this.accountsStoreName)).withKeySerde(String).withValueSerde(serDes));
        return streamsBuilder;
    }

    /* renamed from: load, reason: merged with bridge method [inline-methods] */
    public Mono<Account> m3load(String str) {
        log.debug("Load: {} from {} of {}", new Object[]{str, this.accountsStoreName, this.accountTopic});
        return Mono.fromSupplier(this::waitForAccountsStore).filter(readOnlyKeyValueStore -> {
            return readOnlyKeyValueStore.get(str) != null;
        }).map(readOnlyKeyValueStore2 -> {
            return (Account) readOnlyKeyValueStore2.get(str);
        });
    }

    public Flux<Account> find(AccountFilter accountFilter) {
        return Flux.create(fluxSink -> {
            waitForAccountsStore().all().forEachRemaining(keyValue -> {
                fluxSink.next(keyValue.value);
            });
            fluxSink.complete();
        });
    }

    protected ReadOnlyKeyValueStore<String, Account> waitForAccountsStore() {
        return (ReadOnlyKeyValueStore) StreamUtil.waitUntilStoreIsQueryable(this.accountsStoreName, QueryableStoreTypes.keyValueStore(), this.streams, this.timeout);
    }

    public Flux<HostStoreInfo> meta() {
        return Flux.fromStream(this.metadataService.streamsMetadata().stream());
    }
}
