package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka-streams-2.3.1.jar:org/apache/kafka/streams/state/internals/RocksDBStore.class */
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocksDBStore.class);
    private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 16777216;
    private static final long BLOCK_CACHE_SIZE = 52428800;
    private static final long BLOCK_SIZE = 4096;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    final String name;
    private final String parentDir;
    final Set<KeyValueIterator<Bytes, byte[]>> openIterators;
    File dbDir;
    RocksDB db;
    RocksDBAccessor dbAccessor;
    private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
    WriteOptions wOptions;
    FlushOptions fOptions;
    private Cache cache;
    private BloomFilter filter;
    private RocksDBConfigSetter configSetter;
    private volatile boolean prepareForBulkload;
    ProcessorContext internalProcessorContext;
    volatile BatchingStateRestoreCallback batchingStateRestoreCallback;
    protected volatile boolean open;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka-streams-2.3.1.jar:org/apache/kafka/streams/state/internals/RocksDBStore$RocksDBAccessor.class */
    public interface RocksDBAccessor {
        void put(byte[] bArr, byte[] bArr2);

        void prepareBatch(List<KeyValue<Bytes, byte[]>> list, WriteBatch writeBatch) throws RocksDBException;

        byte[] get(byte[] bArr) throws RocksDBException;

        byte[] getOnly(byte[] bArr) throws RocksDBException;

        KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2);

        KeyValueIterator<Bytes, byte[]> all();

        long approximateNumEntries() throws RocksDBException;

        void flush() throws RocksDBException;

        void prepareBatchForRestore(Collection<KeyValue<byte[], byte[]>> collection, WriteBatch writeBatch) throws RocksDBException;

        void addToBatch(byte[] bArr, byte[] bArr2, WriteBatch writeBatch) throws RocksDBException;

        void close();

        void toggleDbForBulkLoading();
    }

    /* loaded from: input_file:kafka-streams-2.3.1.jar:org/apache/kafka/streams/state/internals/RocksDBStore$RocksDBBatchingRestoreCallback.class */
    static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
        private final RocksDBStore rocksDBStore;

        RocksDBBatchingRestoreCallback(RocksDBStore rocksDBStore) {
            this.rocksDBStore = rocksDBStore;
        }

        @Override // org.apache.kafka.streams.processor.BatchingStateRestoreCallback
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> collection) {
            try {
                WriteBatch writeBatch = new WriteBatch();
                Throwable th = null;
                try {
                    try {
                        this.rocksDBStore.dbAccessor.prepareBatchForRestore(collection, writeBatch);
                        this.rocksDBStore.write(writeBatch);
                        if (writeBatch != null) {
                            if (0 != 0) {
                                try {
                                    writeBatch.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                writeBatch.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.rocksDBStore.name, e);
            }
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            this.rocksDBStore.toggleDbForBulkLoading(true);
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            this.rocksDBStore.toggleDbForBulkLoading(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka-streams-2.3.1.jar:org/apache/kafka/streams/state/internals/RocksDBStore$SingleColumnFamilyAccessor.class */
    public class SingleColumnFamilyAccessor implements RocksDBAccessor {
        private final ColumnFamilyHandle columnFamily;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SingleColumnFamilyAccessor(ColumnFamilyHandle columnFamilyHandle) {
            this.columnFamily = columnFamilyHandle;
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void put(byte[] bArr, byte[] bArr2) {
            if (bArr2 == null) {
                try {
                    RocksDBStore.this.db.delete(this.columnFamily, RocksDBStore.this.wOptions, bArr);
                } catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while removing key from store " + RocksDBStore.this.name, e);
                }
            } else {
                try {
                    RocksDBStore.this.db.put(this.columnFamily, RocksDBStore.this.wOptions, bArr, bArr2);
                } catch (RocksDBException e2) {
                    throw new ProcessorStateException("Error while putting key/value into store " + RocksDBStore.this.name, e2);
                }
            }
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void prepareBatch(List<KeyValue<Bytes, byte[]>> list, WriteBatch writeBatch) throws RocksDBException {
            for (KeyValue<Bytes, byte[]> keyValue : list) {
                Objects.requireNonNull(keyValue.key, "key cannot be null");
                addToBatch(keyValue.key.get(), keyValue.value, writeBatch);
            }
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public byte[] get(byte[] bArr) throws RocksDBException {
            return RocksDBStore.this.db.get(this.columnFamily, bArr);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public byte[] getOnly(byte[] bArr) throws RocksDBException {
            return RocksDBStore.this.db.get(this.columnFamily, bArr);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
            return new RocksDBRangeIterator(RocksDBStore.this.name, RocksDBStore.this.db.newIterator(this.columnFamily), RocksDBStore.this.openIterators, bytes, bytes2);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public KeyValueIterator<Bytes, byte[]> all() {
            RocksIterator newIterator = RocksDBStore.this.db.newIterator(this.columnFamily);
            newIterator.seekToFirst();
            return new RocksDbIterator(RocksDBStore.this.name, newIterator, RocksDBStore.this.openIterators);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public long approximateNumEntries() throws RocksDBException {
            return RocksDBStore.this.db.getLongProperty(this.columnFamily, "rocksdb.estimate-num-keys");
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void flush() throws RocksDBException {
            RocksDBStore.this.db.flush(RocksDBStore.this.fOptions, this.columnFamily);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void prepareBatchForRestore(Collection<KeyValue<byte[], byte[]>> collection, WriteBatch writeBatch) throws RocksDBException {
            for (KeyValue<byte[], byte[]> keyValue : collection) {
                addToBatch(keyValue.key, keyValue.value, writeBatch);
            }
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void addToBatch(byte[] bArr, byte[] bArr2, WriteBatch writeBatch) throws RocksDBException {
            if (bArr2 == null) {
                writeBatch.delete(this.columnFamily, bArr);
            } else {
                writeBatch.put(this.columnFamily, bArr, bArr2);
            }
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void close() {
            this.columnFamily.close();
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDBAccessor
        public void toggleDbForBulkLoading() {
            try {
                RocksDBStore.this.db.compactRange(this.columnFamily, true, 1, 0);
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error while range compacting during restoring  store " + RocksDBStore.this.name, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBStore(String str) {
        this(str, DB_FILE_DIR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBStore(String str, String str2) {
        this.openIterators = Collections.synchronizedSet(new HashSet());
        this.prepareForBulkload = false;
        this.batchingStateRestoreCallback = null;
        this.open = false;
        this.name = str;
        this.parentDir = str2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openDB(ProcessorContext processorContext) {
        DBOptions dBOptions = new DBOptions();
        ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
        this.userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dBOptions, columnFamilyOptions);
        BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
        this.cache = new LRUCache(BLOCK_CACHE_SIZE);
        blockBasedTableConfig.setBlockCache(this.cache);
        blockBasedTableConfig.setBlockSize(4096L);
        this.filter = new BloomFilter();
        blockBasedTableConfig.setFilter(this.filter);
        this.userSpecifiedOptions.optimizeFiltersForHits();
        this.userSpecifiedOptions.setTableFormatConfig((TableFormatConfig) blockBasedTableConfig);
        this.userSpecifiedOptions.setWriteBufferSize(WRITE_BUFFER_SIZE);
        this.userSpecifiedOptions.setCompressionType(COMPRESSION_TYPE);
        this.userSpecifiedOptions.setCompactionStyle(COMPACTION_STYLE);
        this.userSpecifiedOptions.setMaxWriteBufferNumber(3);
        this.userSpecifiedOptions.setCreateIfMissing(true);
        this.userSpecifiedOptions.setErrorIfExists(false);
        this.userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.userSpecifiedOptions.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Map<String, Object> appConfigs = processorContext.appConfigs();
        Class cls = (Class) appConfigs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
        if (cls != null) {
            this.configSetter = (RocksDBConfigSetter) Utils.newInstance(cls);
            this.configSetter.setConfig(this.name, this.userSpecifiedOptions, appConfigs);
        }
        if (this.prepareForBulkload) {
            this.userSpecifiedOptions.prepareForBulkLoad();
        }
        this.dbDir = new File(new File(processorContext.stateDir(), this.parentDir), this.name);
        try {
            Files.createDirectories(this.dbDir.getParentFile().toPath(), new FileAttribute[0]);
            Files.createDirectories(this.dbDir.getAbsoluteFile().toPath(), new FileAttribute[0]);
            openRocksDB(dBOptions, columnFamilyOptions);
            this.open = true;
        } catch (IOException e) {
            throw new ProcessorStateException(e);
        }
    }

    void openRocksDB(DBOptions dBOptions, ColumnFamilyOptions columnFamilyOptions) {
        List singletonList = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
        ArrayList arrayList = new ArrayList(singletonList.size());
        try {
            this.db = RocksDB.open(dBOptions, this.dbDir.getAbsolutePath(), (List<ColumnFamilyDescriptor>) singletonList, arrayList);
            this.dbAccessor = new SingleColumnFamilyAccessor((ColumnFamilyHandle) arrayList.get(0));
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + this.dbDir.toString(), e);
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.internalProcessorContext = processorContext;
        openDB(processorContext);
        this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
        processorContext.register(stateStore, this.batchingStateRestoreCallback);
    }

    boolean isPrepareForBulkload() {
        return this.prepareForBulkload;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void put(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.dbAccessor.put(bytes.get(), bArr);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        byte[] bArr2 = get(bytes);
        if (bArr2 == null) {
            put(bytes, bArr);
        }
        return bArr2;
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        try {
            WriteBatch writeBatch = new WriteBatch();
            Throwable th = null;
            try {
                try {
                    this.dbAccessor.prepareBatch(list, writeBatch);
                    write(writeBatch);
                    if (writeBatch != null) {
                        if (0 != 0) {
                            try {
                                writeBatch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            writeBatch.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized byte[] get(Bytes bytes) {
        validateStoreOpen();
        try {
            return this.dbAccessor.get(bytes.get());
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] delete(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        try {
            byte[] only = this.dbAccessor.getOnly(bytes.get());
            put(bytes, (byte[]) null);
            return only;
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        Objects.requireNonNull(bytes, "from cannot be null");
        Objects.requireNonNull(bytes2, "to cannot be null");
        if (bytes.compareTo(bytes2) > 0) {
            log.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> range = this.dbAccessor.range(bytes, bytes2);
        this.openIterators.add(range);
        return range;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> all = this.dbAccessor.all();
        this.openIterators.add(all);
        return all;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public long approximateNumEntries() {
        validateStoreOpen();
        try {
            long approximateNumEntries = this.dbAccessor.approximateNumEntries();
            if (isOverflowing(approximateNumEntries)) {
                return Long.MAX_VALUE;
            }
            return approximateNumEntries;
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
    }

    private boolean isOverflowing(long j) {
        return j < 0;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        try {
            this.dbAccessor.flush();
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.state.internals.BulkLoadingStore
    public void toggleDbForBulkLoading(boolean z) {
        String[] list;
        if (z && (list = this.dbDir.list((file, str) -> {
            return SST_FILE_EXTENSION.matcher(str).matches();
        })) != null && list.length > 0) {
            this.dbAccessor.toggleDbForBulkLoading();
        }
        close();
        this.prepareForBulkload = z;
        openDB(this.internalProcessorContext);
    }

    @Override // org.apache.kafka.streams.state.internals.BulkLoadingStore
    public void addToBatch(KeyValue<byte[], byte[]> keyValue, WriteBatch writeBatch) throws RocksDBException {
        this.dbAccessor.addToBatch(keyValue.key, keyValue.value, writeBatch);
    }

    @Override // org.apache.kafka.streams.state.internals.BulkLoadingStore
    public void write(WriteBatch writeBatch) throws RocksDBException {
        this.db.write(this.wOptions, writeBatch);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public synchronized void close() {
        if (this.open) {
            this.open = false;
            closeOpenIterators();
            if (this.configSetter != null) {
                this.configSetter.close(this.name, this.userSpecifiedOptions);
                this.configSetter = null;
            }
            this.dbAccessor.close();
            this.db.close();
            this.userSpecifiedOptions.close();
            this.wOptions.close();
            this.fOptions.close();
            this.filter.close();
            this.cache.close();
            this.dbAccessor = null;
            this.userSpecifiedOptions = null;
            this.wOptions = null;
            this.fOptions = null;
            this.db = null;
            this.filter = null;
            this.cache = null;
        }
    }

    private void closeOpenIterators() {
        HashSet hashSet;
        synchronized (this.openIterators) {
            hashSet = new HashSet(this.openIterators);
        }
        if (hashSet.size() != 0) {
            log.warn("Closing {} open iterators for store {}", Integer.valueOf(hashSet.size()), this.name);
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                ((KeyValueIterator) it.next()).close();
            }
        }
    }

    public Options getOptions() {
        return this.userSpecifiedOptions;
    }
}
