package inc.yukawa.chain.modules.console.repo;

import com.fasterxml.jackson.databind.ObjectMapper;
import inc.yukawa.chain.kafka.dao.mono.KafkaStoreLoadDao;
import inc.yukawa.chain.kafka.util.KafkaUtil;
import inc.yukawa.chain.modules.console.core.domain.LogData;
import inc.yukawa.chain.modules.console.core.domain.LogEntry;
import inc.yukawa.chain.modules.console.core.domain.LogError;
import inc.yukawa.chain.modules.console.core.domain.UseCase;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Profile({"usecase-aspect", "all-aspects", "default"})
@Component
/* loaded from: input_file:inc/yukawa/chain/modules/console/repo/UseCaseStore.class */
public class UseCaseStore extends KafkaStoreLoadDao<String, UseCase> {

    @Value("${chain.console.usecase.data.field.maxLen:500}")
    private int maxFieldLen;

    @Value("${chain.console.usecase.event.topic}")
    private String eventTopic;

    @Value("${chain.console.usecase.data.topic}")
    private String dataTopic;

    @Value("${chain.console.usecase.payload.topic}")
    private String payloadTopic;

    @Value("${chain.console.usecase.error.topic}")
    private String errorTopic;
    private Serde<LogEntry> logEntrySerde;
    private Serde<UseCase> useCaseSerde;
    private Serde<LogData> payloadSerde;
    private Serde<LogError> errorSerde;
    private UseCaseAggregator aggregator;

    @Autowired
    public UseCaseStore(@Qualifier("console.StreamsProps") Properties properties, ObjectMapper objectMapper, UseCaseAggregator useCaseAggregator) {
        super("usecases", properties);
        this.logEntrySerde = KafkaUtil.getSerDes(LogEntry.class, false, objectMapper);
        this.useCaseSerde = KafkaUtil.getSerDes(UseCase.class, false, objectMapper);
        this.payloadSerde = KafkaUtil.getSerDes(LogData.class, false, objectMapper);
        this.errorSerde = KafkaUtil.getSerDes(LogError.class, false, objectMapper);
        this.aggregator = useCaseAggregator;
    }

    public Flux<UseCase> find(Predicate<UseCase> predicate) {
        KeyValueIterator all = getKeyValueStore().all();
        Flux fromIterable = Flux.fromIterable(() -> {
            return all;
        });
        all.getClass();
        return fromIterable.doOnTerminate(all::close).map(keyValue -> {
            return (UseCase) keyValue.value;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(predicate);
    }

    protected StreamsBuilder buildStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, KeyValue<String, Serializable>>[] splitEventsFeed = splitEventsFeed(streamsBuilder);
        handleSteps(splitEventsFeed[0].mapValues(keyValue -> {
            return (LogEntry) keyValue.value;
        }));
        handlePayloads(splitEventsFeed[1].map((str, keyValue2) -> {
            return keyValue2;
        }).mapValues(serializable -> {
            return (LogData) serializable;
        }));
        handleErrors(splitEventsFeed[2].map((str2, keyValue3) -> {
            return keyValue3;
        }).mapValues(serializable2 -> {
            return (LogError) serializable2;
        }));
        return streamsBuilder;
    }

    private void handleSteps(KStream<String, LogEntry> kStream) {
        kStream.groupByKey(Grouped.with(Serdes.String(), this.logEntrySerde)).aggregate(UseCase::new, this.aggregator, Materialized.as(Stores.persistentKeyValueStore(this.storeName)).withKeySerde(Serdes.String()).withValueSerde(this.useCaseSerde)).toStream().to(this.dataTopic, Produced.with(Serdes.String(), this.useCaseSerde));
    }

    private void handleErrors(KStream<String, LogError> kStream) {
        kStream.to(this.errorTopic, Produced.with(Serdes.String(), this.errorSerde));
    }

    private void handlePayloads(KStream<String, LogData> kStream) {
        kStream.to(this.payloadTopic, Produced.with(Serdes.String(), this.payloadSerde));
    }

    private KStream<String, KeyValue<String, Serializable>>[] splitEventsFeed(StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(this.eventTopic, Consumed.with(Serdes.String(), this.logEntrySerde)).filter((str, logEntry) -> {
            return logEntry != null;
        }).mapValues(this::generateStepId).flatMapValues(this::splitEvent).branch(new org.apache.kafka.streams.kstream.Predicate[]{(str2, keyValue) -> {
            return keyValue.value instanceof LogEntry;
        }, (str3, keyValue2) -> {
            return keyValue2.value instanceof LogData;
        }, (str4, keyValue3) -> {
            return keyValue3.value instanceof LogError;
        }});
    }

    private Iterable<KeyValue<String, Serializable>> splitEvent(String str, LogEntry logEntry) {
        LogData data = logEntry.getData();
        LogError error = logEntry.getError();
        logEntry.setData(truncate(data));
        logEntry.setError(truncate(error));
        LinkedList linkedList = new LinkedList();
        linkedList.add(KeyValue.pair(str, logEntry));
        linkedList.add(KeyValue.pair(logEntry.getId(), data));
        linkedList.add(KeyValue.pair(logEntry.getId(), error));
        return linkedList;
    }

    private LogData truncate(LogData logData) {
        if (logData == null) {
            return null;
        }
        String text = logData.getText();
        if (text == null && "text/plain".equalsIgnoreCase(logData.getContentType()) && logData.getBytes() != null) {
            text = new String(logData.getBytes());
        }
        if (text != null) {
            text = text.substring(0, Math.min(text.length(), this.maxFieldLen));
        }
        return new LogData(logData.getContentType(), text);
    }

    private LogError truncate(LogError logError) {
        if (logError == null) {
            return null;
        }
        return new LogError(logError.getClazz(), logError.getMessage(), logError.getStack() != null ? logError.getStack().substring(0, Math.min(logError.getStack().length(), this.maxFieldLen)) : null);
    }

    private LogEntry generateStepId(String str, LogEntry logEntry) {
        if (logEntry.getId() == null) {
            logEntry.setId(UUID.randomUUID().toString());
        }
        return logEntry;
    }
}
