package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.function.BiConsumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

/* loaded from: input_file:spring-kafka-2.3.8.RELEASE.jar:org/springframework/kafka/listener/RetryingBatchErrorHandler.class */
public class RetryingBatchErrorHandler implements ListenerInvokingBatchErrorHandler {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryingBatchErrorHandler.class));
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final SeekToCurrentBatchErrorHandler seeker;

    public RetryingBatchErrorHandler() {
        this(new FixedBackOff(), null);
    }

    public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer consumerRecordRecoverer) {
        this.seeker = new SeekToCurrentBatchErrorHandler();
        this.backOff = backOff;
        this.recoverer = (consumerRecords, exc) -> {
            if (consumerRecordRecoverer == null) {
                LOGGER.error(exc, () -> {
                    return "Records discarded: " + tpos(consumerRecords);
                });
            } else {
                consumerRecords.spliterator().forEachRemaining(consumerRecord -> {
                    consumerRecordRecoverer.accept(consumerRecord, exc);
                });
            }
        };
    }

    @Override // org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler, org.springframework.kafka.listener.ContainerAwareBatchErrorHandler, org.springframework.kafka.listener.BatchErrorHandler
    public void handle(Exception exc, ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer, Runnable runnable) {
        BackOffExecution start = this.backOff.start();
        String str = null;
        consumer.pause(consumer.assignment());
        for (long nextBackOff = start.nextBackOff(); nextBackOff != -1; nextBackOff = start.nextBackOff()) {
            consumer.poll(Duration.ZERO);
            try {
                Thread.sleep(nextBackOff);
                try {
                    runnable.run();
                    return;
                } catch (Exception e) {
                    if (str == null) {
                        try {
                            str = tpos(consumerRecords);
                        } finally {
                            consumer.resume(consumer.assignment());
                        }
                    }
                    String str2 = str;
                    LOGGER.debug(e, () -> {
                        return "Retry failed for: " + str2;
                    });
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                this.seeker.handle(exc, consumerRecords, consumer, messageListenerContainer);
                throw new KafkaException("Interrupted during retry", e2);
            }
        }
        try {
            this.recoverer.accept(consumerRecords, exc);
        } catch (Exception e3) {
            LOGGER.error(e3, () -> {
                return "Recoverer threw an exception; re-seeking batch";
            });
            this.seeker.handle(exc, consumerRecords, consumer, messageListenerContainer);
        }
        consumer.resume(consumer.assignment());
    }

    private String tpos(ConsumerRecords<?, ?> consumerRecords) {
        StringBuffer stringBuffer = new StringBuffer();
        consumerRecords.spliterator().forEachRemaining(consumerRecord -> {
            stringBuffer.append(consumerRecord.topic()).append('-').append(consumerRecord.partition()).append('@').append(consumerRecord.offset()).append(',');
        });
        stringBuffer.deleteCharAt(stringBuffer.length() - 1);
        return stringBuffer.toString();
    }
}
