/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.util.retry.Retry;

public abstract class ReactiveConsumer<K, V> {
    protected final Logger log = LoggerFactory.getLogger(ReactiveConsumer.class);
    protected final KafkaReceiver<K, V> receiver;
    protected final Scheduler scheduler = Schedulers.newSingle((String)this.getClass().getSimpleName(), (boolean)true);
    protected List<Class<? extends Throwable>> foreverRetryableExceptions = new ArrayList<Class<? extends Throwable>>();
    protected List<Class<? extends Throwable>> retryableExceptions = new ArrayList<Class<? extends Throwable>>();
    @Value(value="${chain.kafka.reactiveConsumer.retryAttempts:3}")
    protected long retryAttempts = 3L;
    protected LogAccessor logAccessor;

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions) {
        this.receiver = KafkaReceiver.create(receiverOptions);
        boolean hasSubscription = receiverOptions.subscriptionTopics() != null && !receiverOptions.subscriptionTopics().isEmpty() || receiverOptions.subscriptionPattern() != null;
        Assert.isTrue((boolean)hasSubscription, (String)"receiverOptions should have topic specified");
    }

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions, NewTopic topic) {
        this(receiverOptions, topic.name());
    }

    public ReactiveConsumer(ReceiverOptions<K, V> receiverOptions, String topic) {
        this(receiverOptions.subscription(List.of(topic)));
    }

    @EventListener(value={ApplicationReadyEvent.class})
    public void doOnStartup() {
        this.flux().subscribe();
    }

    protected Flux<Void> flux() {
        return this.receiver.receiveAutoAck().publishOn(this.scheduler).concatMap(this::onRecords).retryWhen((Retry)Retry.backoff((long)Long.MAX_VALUE, (Duration)Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)));
    }

    protected Mono<Void> onRecords(Flux<ConsumerRecord<K, V>> records) {
        return records.concatMap(r -> {
            this.log.trace("[{}@{}[p{}]] onRecord: {}", new Object[]{r.key(), r.topic(), r.partition(), r.value()});
            if (r.value() == null) {
                DeserializationException e = SerializationUtils.getExceptionFromHeader((ConsumerRecord)r, (String)"springDeserializerExceptionValue", (LogAccessor)this.getLogAccessor());
                this.log.warn("Skipping invalid record (value deserialization error): {}", r, (Object)e);
                return Mono.empty();
            }
            if (r.key() == null) {
                DeserializationException e = SerializationUtils.getExceptionFromHeader((ConsumerRecord)r, (String)"springDeserializerExceptionKey", (LogAccessor)this.getLogAccessor());
                this.log.warn("Skipping invalid record (key deserialization error): {}", r, (Object)e);
                return Mono.empty();
            }
            return this.onSingleRecord((ConsumerRecord<K, V>)r);
        }).then();
    }

    protected Mono<Void> onSingleRecord(ConsumerRecord<K, V> r) {
        return this.onRecord(r).doOnError(e -> this.log.info("[{}@{}[p{}]] record processing error: {} because of error", new Object[]{r.key(), r.topic(), r.partition(), r.value(), e})).retryWhen(this.foreverRetry()).retryWhen(this.retry());
    }

    protected Retry foreverRetry() {
        return Retry.backoff((long)Long.MAX_VALUE, (Duration)Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)).filter(this.foreverRetryFilter()).doBeforeRetry(retrySignal -> this.log.info("Retry #{}: Cause: {}", (Object)retrySignal.totalRetriesInARow(), (Object)retrySignal.failure().getMessage()));
    }

    protected Retry retry() {
        return Retry.backoff((long)this.retryAttempts, (Duration)Duration.of(2L, ChronoUnit.SECONDS)).maxBackoff(Duration.of(30L, ChronoUnit.SECONDS)).filter(this.retryFilter()).doBeforeRetry(retrySignal -> this.log.info("Retry #{}: Cause: {}", (Object)retrySignal.totalRetriesInARow(), (Object)retrySignal.failure().getMessage()));
    }

    protected Predicate<Throwable> foreverRetryFilter() {
        return th -> this.getForeverRetryableExceptions().stream().anyMatch(e -> e.isInstance(th));
    }

    protected Predicate<Throwable> retryFilter() {
        return this.foreverRetryFilter().negate().and(th -> this.getRetryableExceptions().stream().anyMatch(e -> e.isInstance(th)));
    }

    protected abstract Mono<Void> onRecord(ConsumerRecord<K, V> var1);

    protected List<Class<? extends Throwable>> getForeverRetryableExceptions() {
        return this.foreverRetryableExceptions;
    }

    protected List<Class<? extends Throwable>> getRetryableExceptions() {
        return this.retryableExceptions;
    }

    public void setForeverRetryableExceptions(List<Class<? extends Throwable>> foreverRetryableExceptions) {
        this.foreverRetryableExceptions = foreverRetryableExceptions;
    }

    public void setRetryableExceptions(List<Class<? extends Throwable>> retryableExceptions) {
        this.retryableExceptions = retryableExceptions;
    }

    public long getRetryAttempts() {
        return this.retryAttempts;
    }

    public void setRetryAttempts(long retryAttempts) {
        this.retryAttempts = retryAttempts;
    }

    public LogAccessor getLogAccessor() {
        if (this.logAccessor == null) {
            this.logAccessor = new LogAccessor(ReactiveConsumer.class);
        }
        return this.logAccessor;
    }
}

