package inc.yukawa.chain.kafka.util;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:inc/yukawa/chain/kafka/util/StreamUtil.class */
public class StreamUtil {
    private static final Logger log = LoggerFactory.getLogger(StreamUtil.class);
    private static final long STORE_SLEEP_TIME = 200;

    /* loaded from: input_file:inc/yukawa/chain/kafka/util/StreamUtil$CustomRocksDBConfig.class */
    public static class CustomRocksDBConfig implements RocksDBConfigSetter {
        public void setConfig(String str, Options options, Map<String, Object> map) {
            options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        }

        public void close(String str, Options options) {
        }
    }

    public static String initHostName(String str) {
        return (StringUtils.isEmpty(str) || "auto".equals(str)) ? findHostName() : str;
    }

    public static String findHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public static String initHostAddress(String str) {
        return (StringUtils.isEmpty(str) || "auto".equals(str)) ? findHostAddress() : str;
    }

    public static String findHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public static String initAppServerConfig(String str, int i) {
        return initHostAddress(str) + ":" + i;
    }

    public static Properties baseStreamsProps(String str, String str2, String str3, String str4) {
        Properties properties = new Properties();
        properties.putAll(baseStreamsConfig(str, str2, str3, str4));
        return properties;
    }

    public static Map<String, Object> baseStreamsConfig(String str, String str2, String str3, String str4) {
        HashMap hashMap = new HashMap();
        hashMap.put("rocksdb.config.setter", CustomRocksDBConfig.class);
        if (str3 != null) {
            hashMap.put("application.id", str3);
        }
        hashMap.put("bootstrap.servers", str);
        hashMap.put("state.dir", str2);
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("processing.guarantee", str4);
        hashMap.put("commit.interval.ms", 200);
        hashMap.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 30000);
        hashMap.put("default.key.serde", Serdes.String().getClass());
        hashMap.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        hashMap.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class.getName());
        return hashMap;
    }

    @Deprecated
    public static <T> T waitUntilStoreIsQueryable(String str, QueryableStoreType<T> queryableStoreType, KafkaStreams kafkaStreams) {
        int i = 0;
        while (true) {
            try {
                return (T) kafkaStreams.store(str, queryableStoreType);
            } catch (InvalidStateStoreException e) {
                i++;
                if (i % 10 == 0) {
                    log.debug("Waiting for store {} {} : {}", new Object[]{str, Integer.valueOf(i), e.getMessage()});
                }
                if (i > 100) {
                    throw e;
                }
                try {
                    Thread.sleep(STORE_SLEEP_TIME);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }

    public static <T> T waitUntilStoreIsQueryable(String str, QueryableStoreType<T> queryableStoreType, KafkaStreams kafkaStreams, long j) {
        return (T) waitUntilStoreIsQueryable(str, queryableStoreType, kafkaStreams, j, STORE_SLEEP_TIME);
    }

    public static <T> T waitUntilStoreIsQueryable(String str, QueryableStoreType<T> queryableStoreType, KafkaStreams kafkaStreams, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (true) {
            try {
                return (T) kafkaStreams.store(str, queryableStoreType);
            } catch (InvalidStateStoreException e) {
                if (System.currentTimeMillis() >= currentTimeMillis) {
                    throw e;
                }
                try {
                    Thread.sleep(j2);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }
}
