package inc.yukawa.chain.modules.main.bootdb.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.hibernate6.Hibernate6Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import inc.yukawa.chain.base.core.domain.notification.Notification;
import inc.yukawa.chain.base.mono.dao.MonoWriteDao;
import inc.yukawa.chain.kafka.config.KafkaStreamsConfigBase;
import inc.yukawa.chain.kafka.dao.mono.KafkaAsyncWriteDao;
import inc.yukawa.chain.modules.main.core.aspect.PushTokenAspect;
import inc.yukawa.chain.modules.main.core.aspect.TopicSubscriptionAspect;
import inc.yukawa.chain.modules.main.core.aspect.UsersAspect;
import inc.yukawa.chain.modules.main.core.event.user.UserEvent;
import inc.yukawa.chain.modules.main.service.push.PushTokenCommandListener;
import inc.yukawa.chain.modules.main.service.subscription.TopicSubscriptionCommandListener;
import inc.yukawa.chain.modules.main.service.user.UserCommandListener;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.RetryListener;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Profile({"!no-kafka"})
@Configuration
/* loaded from: input_file:inc/yukawa/chain/modules/main/bootdb/config/KafkaConfig.class */
public class KafkaConfig extends KafkaStreamsConfigBase {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConfig.class);

    @Value("${chain.main.kafka.consumer.retry.attempts:20}")
    private Integer retryAttempts;

    @Profile({"users-aspect", "all-aspects", "default"})
    @Bean({"main.UserEvtWriteDao"})
    public MonoWriteDao<String, UserEvent> userEvtWriteDao(@Qualifier("main.UserEvtTopic") NewTopic newTopic, ReactiveKafkaProducerTemplate<String, Object> reactiveKafkaProducerTemplate) {
        return new KafkaAsyncWriteDao(reactiveKafkaProducerTemplate, newTopic);
    }

    @Profile({"users-aspect", "all-aspects", "default"})
    @Bean({"main.UserCommandListener"})
    public UserCommandListener userCommandListener(UsersAspect usersAspect) {
        return new UserCommandListener(usersAspect);
    }

    @Profile({"push-token-aspect", "all-aspects", "default"})
    @Bean({"main.PushTokenCommandListener"})
    public PushTokenCommandListener pushTokenCommandListener(PushTokenAspect pushTokenAspect) {
        return new PushTokenCommandListener(pushTokenAspect);
    }

    @Profile({"topic-subscription-aspect", "all-aspects", "default"})
    @Bean({"main.TopicSubscriptionCommandListener"})
    public TopicSubscriptionCommandListener topicSubscriptionCommandListener(TopicSubscriptionAspect topicSubscriptionAspect) {
        return new TopicSubscriptionCommandListener(topicSubscriptionAspect);
    }

    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper objectMapper = super.objectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.registerModule(new Hibernate6Module());
        return objectMapper;
    }

    @Bean({"kafkaTemplate", "initTemplate"})
    public KafkaTemplate<String, ? super Object> kafkaTemplate(ObjectMapper objectMapper) {
        return kafkaTemplate(Serdes.String().serializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> reactiveKafkaProducerTemplate(KafkaProperties kafkaProperties, ObjectMapper objectMapper) {
        return reactiveKafkaTemplate(kafkaProperties, Serdes.String().serializer(), new JsonSerializer(objectMapper));
    }

    protected ProducerFactory<String, ?> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfig(), Serdes.String().serializer(), new JsonSerializer(objectMapper()));
    }

    @Profile({"notification-aspect", "all-aspects", "default"})
    @Bean
    public KafkaTemplate<String, Notification> notificationTemplate(@Qualifier("main.NotificationInboundTopic") NewTopic newTopic) {
        KafkaTemplate<String, Notification> kafkaTemplate = new KafkaTemplate<>(producerFactory(), true);
        kafkaTemplate.setDefaultTopic(newTopic.name());
        return kafkaTemplate;
    }

    @Profile({"users-aspect", "all-aspects", "default"})
    @Bean({"main.UserEvtTemplate"})
    public KafkaTemplate<String, UserEvent> userEvtTemplate(@Qualifier("main.UserEvtTopic") NewTopic newTopic) {
        return kafkaTemplate(Serdes.String().serializer(), new JsonSerializer(objectMapper()), newTopic.name());
    }

    @Bean(name = {"main.ListenerContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<String, Object> listenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(newJsonConsumerFactory());
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(this.retryAttempts.intValue()));
        defaultErrorHandler.setRetryListeners(new RetryListener[]{(consumerRecord, exc, i) -> {
            LOG.warn("Retry attempt {} after: {}", Integer.valueOf(i), exc.getCause() != null ? exc.getCause().getMessage() : exc.getMessage());
        }});
        defaultErrorHandler.addRetryableExceptions(new Class[]{SocketTimeoutException.class});
        defaultErrorHandler.addNotRetryableExceptions(new Class[]{NullPointerException.class});
        concurrentKafkaListenerContainerFactory.setCommonErrorHandler(defaultErrorHandler);
        concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        return concurrentKafkaListenerContainerFactory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServers);
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", JsonDeserializer.class);
        hashMap.put("enable.auto.commit", false);
        hashMap.put("auto.offset.reset", "earliest");
        return hashMap;
    }

    private ConsumerFactory<String, Object> newJsonConsumerFactory() {
        JsonDeserializer jsonDeserializer = new JsonDeserializer(objectMapper());
        jsonDeserializer.addTrustedPackages(new String[]{"*"});
        return newConsumerFactory(new ErrorHandlingDeserializer(new StringDeserializer()), new ErrorHandlingDeserializer(jsonDeserializer));
    }

    private <K, V> ConsumerFactory<K, V> newConsumerFactory(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs());
        if (deserializer != null) {
            defaultKafkaConsumerFactory.setKeyDeserializer(deserializer);
        }
        if (deserializer2 != null) {
            defaultKafkaConsumerFactory.setValueDeserializer(deserializer2);
        }
        return defaultKafkaConsumerFactory;
    }
}
