/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.event;

import inc.yukawa.chain.base.core.event.ChainEventBean;
import inc.yukawa.chain.kafka.dao.mono.KafkaStreamsDao;
import java.lang.reflect.Type;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.util.StringUtils;

public class KafkaEventHandlerStream<K, V, E extends ChainEventBean<V>>
extends KafkaStreamsDao {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaEventHandlerStream.class);
    public static final String SKIP = "SKIP";
    protected final Serde<K> keySerde;
    protected final Serde<V> dataSerde;
    protected final Serde<E> eventSerde;
    protected final NewTopic dataTopic;
    protected final NewTopic eventTopic;
    protected Aggregator<Object, E, E> eventHandler;
    protected ParameterizedTypeReference<E> eventType;

    public KafkaEventHandlerStream(Properties streamProps, NewTopic dataTopic, NewTopic eventTopic, Serde<K> keySerde, Serde<V> dataSerde, Serde<E> eventSerde, Aggregator<Object, E, E> eventHandler) {
        super(streamProps);
        this.dataTopic = dataTopic;
        this.eventTopic = eventTopic;
        this.keySerde = keySerde;
        this.dataSerde = dataSerde;
        this.eventSerde = eventSerde;
        this.eventHandler = eventHandler;
        this.eventType = ParameterizedTypeReference.forType((Type)ResolvableType.forInstance((Object)this).as(KafkaEventHandlerStream.class).getGeneric(new int[]{2}).getType());
    }

    @Override
    protected StreamsBuilder buildStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<K, E> events = streamsBuilder.stream(this.eventTopic.name(), Consumed.with(this.keySerde, this.eventSerde));
        events = this.eventPreProcess(events);
        events = events.groupByKey(Grouped.with(this.keySerde, this.eventSerde)).aggregate(this::blankEvent, this.eventHandler, Materialized.with(this.keySerde, this.eventSerde)).toStream();
        events = this.eventPostProcess(events);
        events.mapValues(v -> v == null ? null : v.getPayload()).peek((k, v) -> LOG.debug("[{}@{}] put: {}", new Object[]{this.dataTopic.name(), k, v})).to(this.dataTopic.name(), Produced.with(this.keySerde, this.dataSerde));
        return streamsBuilder;
    }

    protected KStream<K, E> eventPreProcess(KStream<K, E> stream) {
        return stream.peek((k, v) -> LOG.trace("[{}@{}] process: {}", new Object[]{this.eventTopic.name(), k, v})).filter(this::preFilter);
    }

    protected boolean preFilter(K key, E event) {
        boolean accept;
        boolean bl = accept = key != null && event != null && event.getName() != null && (event.getPayload() != null || "DELETE".equalsIgnoreCase(event.getName()));
        if (key instanceof String) {
            accept &= StringUtils.hasText((String)((String)key));
        }
        if (!accept) {
            LOG.warn("Skipping invalid event: {} {}", key, event);
        }
        return accept;
    }

    protected KStream<K, E> eventPostProcess(KStream<K, E> stream) {
        return stream.filter(this::postFilter);
    }

    protected boolean postFilter(K key, E event) {
        boolean accept;
        boolean bl = accept = key != null && (event == null || !SKIP.equalsIgnoreCase(event.getName()));
        if (key instanceof String) {
            accept &= StringUtils.hasText((String)((String)key));
        }
        if (!accept) {
            LOG.warn("Skipping invalid post event: {} {}", key, event);
        }
        return accept;
    }

    protected E blankEvent() {
        try {
            return (E)((ChainEventBean)Class.forName(this.eventType.getType().getTypeName()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
        }
        catch (Exception e) {
            LOG.error("Unable to create blank event", (Throwable)e);
            throw new IllegalStateException(e);
        }
    }
}

