package inc.yukawa.chain.kafka.event;

import inc.yukawa.chain.base.core.event.ChainEventBean;
import inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;

/* loaded from: input_file:chain-kafka-core-2.0.6.jar:inc/yukawa/chain/kafka/event/KafkaEventHandlerStream.class */
public class KafkaEventHandlerStream<K, V, E extends ChainEventBean<V>> extends KafkaStreamsDao {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaEventHandlerStream.class);
    public static final String SKIP = "SKIP";
    protected final Serde<K> keySerde;
    protected final Serde<V> dataSerde;
    protected final Serde<E> eventSerde;
    protected final NewTopic dataTopic;
    protected final NewTopic eventTopic;
    protected Aggregator<Object, E, E> eventHandler;
    protected ParameterizedTypeReference<E> eventType;

    public KafkaEventHandlerStream(Properties properties, NewTopic newTopic, NewTopic newTopic2, Serde<K> serde, Serde<V> serde2, Serde<E> serde3, Aggregator<Object, E, E> aggregator) {
        super(properties);
        this.dataTopic = newTopic;
        this.eventTopic = newTopic2;
        this.keySerde = serde;
        this.dataSerde = serde2;
        this.eventSerde = serde3;
        this.eventHandler = aggregator;
        this.eventType = ParameterizedTypeReference.forType(ResolvableType.forInstance(this).as(KafkaEventHandlerStream.class).getGeneric(2).getType());
    }

    @Override // inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao
    protected StreamsBuilder buildStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        eventPostProcess(eventPreProcess(streamsBuilder.stream(this.eventTopic.name(), Consumed.with(this.keySerde, this.eventSerde))).groupByKey(Grouped.with(this.keySerde, this.eventSerde)).aggregate(this::blankEvent, this.eventHandler, Materialized.with(this.keySerde, this.eventSerde)).toStream()).mapValues(chainEventBean -> {
            if (chainEventBean == null) {
                return null;
            }
            return chainEventBean.getPayload();
        }).peek((obj, obj2) -> {
            LOG.debug("[{}@{}] put: {}", this.dataTopic.name(), obj, obj2);
        }).to(this.dataTopic.name(), Produced.with(this.keySerde, this.dataSerde));
        return streamsBuilder;
    }

    protected KStream<K, E> eventPreProcess(KStream<K, E> kStream) {
        return kStream.peek((obj, chainEventBean) -> {
            LOG.trace("[{}@{}] process: {}", this.eventTopic.name(), obj, chainEventBean);
        }).filter(this::preFilter);
    }

    protected boolean preFilter(K k, E e) {
        boolean z = (k == null || e == null || e.getName() == null || (e.getPayload() == null && !"DELETE".equalsIgnoreCase(e.getName()))) ? false : true;
        if (!z) {
            LOG.warn("Skipping invalid event: {} {}", k, e);
        }
        return z;
    }

    protected KStream<K, E> eventPostProcess(KStream<K, E> kStream) {
        return kStream.filter(this::postFilter);
    }

    protected boolean postFilter(K k, E e) {
        return e == null || !SKIP.equalsIgnoreCase(e.getName());
    }

    protected E blankEvent() {
        try {
            return (E) Class.forName(this.eventType.getType().getTypeName()).newInstance();
        } catch (Exception e) {
            LOG.error("Unable to create blank event", (Throwable) e);
            throw new IllegalStateException(e);
        }
    }
}
