/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.util;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

public class StreamUtil {
    private static final Logger log = LoggerFactory.getLogger(StreamUtil.class);

    public static String initHostName(String configValue) {
        return StringUtils.hasText((String)configValue) && !"auto".equals(configValue) ? configValue : StreamUtil.findHostName();
    }

    public static String findHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException var1) {
            throw new RuntimeException(var1);
        }
    }

    public static String initHostAddress(String configValue) {
        return StringUtils.hasText((String)configValue) && !"auto".equals(configValue) ? configValue : StreamUtil.findHostAddress();
    }

    public static String findHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException var1) {
            throw new RuntimeException(var1);
        }
    }

    public static String initAppServerConfig(String host, int port) {
        return StreamUtil.initHostAddress(host) + ":" + port;
    }

    public static Properties baseStreamsProps(String bootstrapServers, String stateDir, String appId, String processingGuarantee) {
        Properties config = new Properties();
        config.putAll(StreamUtil.baseStreamsConfig(bootstrapServers, stateDir, appId, processingGuarantee));
        return config;
    }

    public static Map<String, Object> baseStreamsConfig(String bootstrapServers, String stateDir, String appId, String processingGuarantee) {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("rocksdb.config.setter", CustomRocksDBConfig.class);
        if (appId != null) {
            config.put("application.id", appId);
        }
        config.put("bootstrap.servers", bootstrapServers);
        config.put("state.dir", stateDir);
        config.put("auto.offset.reset", "earliest");
        config.put("processing.guarantee", processingGuarantee);
        config.put("commit.interval.ms", 200);
        config.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), 30000);
        config.put("default.key.serde", Serdes.String().getClass());
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        return config;
    }

    @Deprecated
    public static <T> T waitUntilStoreIsQueryable(String storeName, QueryableStoreType<T> queryableStoreType, KafkaStreams streams) {
        int count = 0;
        while (true) {
            try {
                return (T)streams.store(StoreQueryParameters.fromNameAndType((String)storeName, queryableStoreType));
            }
            catch (InvalidStateStoreException var7) {
                if (++count % 10 == 0) {
                    log.debug("Waiting for store {} {} : {}", new Object[]{storeName, count, var7.getMessage()});
                }
                if (count > 100) {
                    throw var7;
                }
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException var6) {
                    throw new RuntimeException(var6);
                }
            }
        }
    }

    public static <T> T waitUntilStoreIsQueryable(String storeName, QueryableStoreType<T> queryableStoreType, KafkaStreams streams, long timeout) {
        return StreamUtil.waitUntilStoreIsQueryable(storeName, queryableStoreType, streams, timeout, 200L);
    }

    public static <T> T waitUntilStoreIsQueryable(String storeName, QueryableStoreType<T> queryableStoreType, KafkaStreams streams, long timeout, long sleep) {
        long timeoutMillis = System.currentTimeMillis() + timeout;
        while (true) {
            try {
                return (T)streams.store(StoreQueryParameters.fromNameAndType((String)storeName, queryableStoreType));
            }
            catch (InvalidStateStoreException var12) {
                if (System.currentTimeMillis() >= timeoutMillis) {
                    throw var12;
                }
                try {
                    Thread.sleep(sleep);
                }
                catch (InterruptedException var11) {
                    throw new RuntimeException(var11);
                }
            }
        }
    }

    public static class CustomRocksDBConfig
    implements RocksDBConfigSetter {
        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
            options.setIncreaseParallelism(compactionParallelism);
        }

        public void close(String storeName, Options options) {
        }
    }
}

