package inc.yukawa.chain.kafka.dao.mono;

import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:chain-kafka-core-2.0.5.jar:inc/yukawa/chain/kafka/dao/mono/KafkaStreamsDao.class */
public abstract class KafkaStreamsDao implements SmartLifecycle, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaStreamsDao.class);
    protected KafkaStreams streams;
    protected final Properties streamProps;
    protected ApplicationContext ctx;
    protected long timeout = 5000;

    public KafkaStreamsDao(Properties properties) {
        this.streamProps = properties;
    }

    public String toString() {
        return getClass().getSimpleName() + "{}";
    }

    protected abstract StreamsBuilder buildStreams();

    @Override // org.springframework.context.Lifecycle
    public void start() {
        StreamsBuilder buildStreams = buildStreams();
        if (buildStreams != null) {
            Topology build = buildStreams.build();
            log.debug("Staring topology {}", build.describe());
            Properties properties = this.streamProps;
            log.debug("Staring streamProps {}", properties);
            this.streams = initStreams(build, properties);
            this.streams.start();
        }
        log.info("Started {}", this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaStreams initStreams(Topology topology, Properties properties) {
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        kafkaStreams.setUncaughtExceptionHandler(this::onStreamError);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close(Duration.ofSeconds(5L));
        }));
        return kafkaStreams;
    }

    protected void onStreamError(Thread thread, Throwable th) {
        log.error("EXIT APPLICATION: Stream error occurred in thread " + thread.getName(), th);
        System.exit(100);
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (this.streams != null) {
            log.debug("Closing streams {}", this.streams.state());
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.streams != null && this.streams.state().isRunning();
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.ctx = applicationContext;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }
}
