/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.consumer;

import inc.yukawa.chain.kafka.consumer.ReactiveConsumer;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;

public abstract class SkipOnErrorReactiveConsumer<K, V>
extends ReactiveConsumer<K, V> {
    public SkipOnErrorReactiveConsumer(ReceiverOptions<K, V> receiverOptions) {
        super(receiverOptions);
    }

    public SkipOnErrorReactiveConsumer(ReceiverOptions<K, V> receiverOptions, NewTopic topic) {
        super(receiverOptions, topic);
    }

    public SkipOnErrorReactiveConsumer(ReceiverOptions<K, V> receiverOptions, String topic) {
        super(receiverOptions, topic);
    }

    @Override
    protected Mono<Void> onSingleRecord(ConsumerRecord<K, V> r) {
        return super.onSingleRecord(r).onErrorResume(e -> {
            this.log.warn("[{}@{}[p{}]] Skipping record: {} because of error", new Object[]{r.key(), r.topic(), r.partition(), r.value(), e});
            return Mono.empty();
        });
    }
}

