package inc.yukawa.chain.kafka.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.util.retry.Retry;

/* loaded from: input_file:inc/yukawa/chain/kafka/consumer/ReactiveConsumer.class */
public abstract class ReactiveConsumer<K, V> {
    protected final Logger log;
    protected final KafkaReceiver<K, V> receiver;
    protected final Scheduler scheduler;
    protected List<Class<? extends Throwable>> foreverRetryableExceptions;
    protected List<Class<? extends Throwable>> retryableExceptions;

    @Value("${chain.kafka.reactiveConsumer.retryAttempts:3}")
    protected long retryAttempts;
    protected LogAccessor logAccessor;

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions) {
        this.log = LoggerFactory.getLogger(ReactiveConsumer.class);
        this.scheduler = Schedulers.newSingle(getClass().getSimpleName(), true);
        this.foreverRetryableExceptions = new ArrayList();
        this.retryableExceptions = new ArrayList();
        this.retryAttempts = 3L;
        this.receiver = KafkaReceiver.create(receiverOptions);
        Assert.isTrue(((receiverOptions.subscriptionTopics() == null || receiverOptions.subscriptionTopics().isEmpty()) && receiverOptions.subscriptionPattern() == null) ? false : true, "receiverOptions should have topic specified");
    }

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions, NewTopic newTopic) {
        this(receiverOptions, newTopic.name());
    }

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions, String str) {
        this(receiverOptions.subscription(List.of(str)));
    }

    @EventListener({ApplicationReadyEvent.class})
    public void doOnStartup() {
        flux().subscribe();
    }

    protected Flux<Void> flux() {
        return this.receiver.receiveAutoAck().publishOn(this.scheduler).concatMap(this::onRecords).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)));
    }

    protected Mono<Void> onRecords(Flux<ConsumerRecord<K, V>> flux) {
        return flux.concatMap(consumerRecord -> {
            this.log.trace("[{}@{}[p{}]] onRecord: {}", new Object[]{consumerRecord.key(), consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), consumerRecord.value()});
            if (consumerRecord.value() == null) {
                this.log.warn("Skipping invalid record (value deserialization error): {}", consumerRecord, SerializationUtils.getExceptionFromHeader(consumerRecord, "springDeserializerExceptionValue", getLogAccessor()));
                return Mono.empty();
            }
            if (consumerRecord.key() != null) {
                return onSingleRecord(consumerRecord);
            }
            this.log.warn("Skipping invalid record (key deserialization error): {}", consumerRecord, SerializationUtils.getExceptionFromHeader(consumerRecord, "springDeserializerExceptionKey", getLogAccessor()));
            return Mono.empty();
        }).then();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> onSingleRecord(ConsumerRecord<K, V> consumerRecord) {
        return onRecord(consumerRecord).doOnError(th -> {
            this.log.info("[{}@{}[p{}]] record processing error: {} because of error", new Object[]{consumerRecord.key(), consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), consumerRecord.value(), th});
        }).retryWhen(foreverRetry()).retryWhen(retry());
    }

    protected Retry foreverRetry() {
        return Retry.backoff(Long.MAX_VALUE, Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)).filter(foreverRetryFilter()).doBeforeRetry(retrySignal -> {
            this.log.info("Retry #{}: Cause: {}", Long.valueOf(retrySignal.totalRetriesInARow()), retrySignal.failure().getMessage());
        });
    }

    protected Retry retry() {
        return Retry.backoff(this.retryAttempts, Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)).filter(retryFilter()).doBeforeRetry(retrySignal -> {
            this.log.info("Retry #{}: Cause: {}", Long.valueOf(retrySignal.totalRetriesInARow()), retrySignal.failure().getMessage());
        });
    }

    protected Predicate<Throwable> foreverRetryFilter() {
        return th -> {
            return getForeverRetryableExceptions().stream().anyMatch(cls -> {
                return cls.isInstance(th);
            });
        };
    }

    protected Predicate<Throwable> retryFilter() {
        return foreverRetryFilter().negate().and(th -> {
            return getRetryableExceptions().stream().anyMatch(cls -> {
                return cls.isInstance(th);
            });
        });
    }

    protected abstract Mono<Void> onRecord(ConsumerRecord<K, V> consumerRecord);

    protected List<Class<? extends Throwable>> getForeverRetryableExceptions() {
        return this.foreverRetryableExceptions;
    }

    protected List<Class<? extends Throwable>> getRetryableExceptions() {
        return this.retryableExceptions;
    }

    public void setForeverRetryableExceptions(List<Class<? extends Throwable>> list) {
        this.foreverRetryableExceptions = list;
    }

    public void setRetryableExceptions(List<Class<? extends Throwable>> list) {
        this.retryableExceptions = list;
    }

    public long getRetryAttempts() {
        return this.retryAttempts;
    }

    public void setRetryAttempts(long j) {
        this.retryAttempts = j;
    }

    public LogAccessor getLogAccessor() {
        if (this.logAccessor == null) {
            this.logAccessor = new LogAccessor(ReactiveConsumer.class);
        }
        return this.logAccessor;
    }
}
