/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.dao.mono;

import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;

public abstract class KafkaStreamsDao
implements SmartLifecycle,
ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsDao.class);
    protected KafkaStreams streams;
    protected final Properties streamProps;
    protected ApplicationContext ctx;
    protected long timeout = 5000L;

    public KafkaStreamsDao(Properties streamProps) {
        this.streamProps = streamProps;
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{}";
    }

    protected abstract StreamsBuilder buildStreams();

    public void start() {
        StreamsBuilder builder = this.buildStreams();
        if (builder != null) {
            Topology topology = builder.build();
            log.debug("Staring topology {}", (Object)topology.describe());
            Properties props = this.streamProps;
            log.debug("Staring streamProps {}", (Object)props);
            this.streams = this.initStreams(topology, props);
            this.streams.start();
        }
        log.info("Started {}", (Object)this);
    }

    protected KafkaStreams initStreams(Topology topology, Properties props) {
        KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
        kafkaStreams.setUncaughtExceptionHandler(e -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close(Duration.ofSeconds(5L))));
        return kafkaStreams;
    }

    public void stop() {
        if (this.streams != null) {
            log.debug("Closing streams {}", (Object)this.streams.state());
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    public boolean isRunning() {
        return this.streams != null && this.streams.state().isRunningOrRebalancing();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.ctx = applicationContext;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }
}

