package inc.yukawa.chain.kafka.dao.mono;

import inc.yukawa.chain.base.mono.dao.MonoWriteDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import reactor.core.publisher.Mono;

/* loaded from: input_file:chain-kafka-core-2.0.5.jar:inc/yukawa/chain/kafka/dao/mono/KafkaAsyncWriteDao.class */
public class KafkaAsyncWriteDao<K, V> extends KafkaTemplateDao<K, V> implements MonoWriteDao<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaAsyncWriteDao.class);

    public KafkaAsyncWriteDao(KafkaTemplate<K, V> kafkaTemplate) {
        super(kafkaTemplate);
    }

    @Override // inc.yukawa.chain.base.mono.dao.MonoWriteDao
    public Mono<V> put(K k, V v) {
        return Mono.create(monoSink -> {
            this.template.send(findTopic(), k, v).addCallback(sendResult -> {
                Logger logger = LOG;
                Object[] objArr = new Object[3];
                objArr[0] = sendResult != null ? sendResult.getRecordMetadata() : null;
                objArr[1] = k;
                objArr[2] = v;
                logger.debug("[{}] sent: key={}, value={}", objArr);
                monoSink.success(sendResult);
            }, th -> {
                LOG.error("Send failed for: " + k, th);
                monoSink.error(th);
            });
        }).map((v0) -> {
            return v0.getProducerRecord();
        }).filter(producerRecord -> {
            return producerRecord.value() != null;
        }).map((v0) -> {
            return v0.value();
        });
    }

    @Override // inc.yukawa.chain.base.mono.dao.MonoWriteDao
    public Mono<Integer> deleteKey(K k) {
        return Mono.create(monoSink -> {
            this.template.send(findTopic(), k, null).addCallback(sendResult -> {
                LOG.debug("[{}] delete {}", sendResult != null ? sendResult.getRecordMetadata() : null, k);
                monoSink.success(1);
            }, th -> {
                LOG.error("Send failed for: " + k, th);
                monoSink.error(th);
            });
        });
    }
}
