/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.modules.main.bootdb.config;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.hibernate7.Hibernate7Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import inc.yukawa.chain.base.core.domain.notification.Notification;
import inc.yukawa.chain.base.mono.dao.MonoWriteDao;
import inc.yukawa.chain.kafka.config.KafkaStreamsConfigBase;
import inc.yukawa.chain.kafka.dao.mono.KafkaAsyncWriteDao;
import inc.yukawa.chain.modules.main.core.aspect.PushTokenAspect;
import inc.yukawa.chain.modules.main.core.aspect.TopicSubscriptionAspect;
import inc.yukawa.chain.modules.main.core.aspect.UsersAspect;
import inc.yukawa.chain.modules.main.core.event.user.UserEvent;
import inc.yukawa.chain.modules.main.service.push.PushTokenCommandListener;
import inc.yukawa.chain.modules.main.service.subscription.TopicSubscriptionCommandListener;
import inc.yukawa.chain.modules.main.service.user.UserCommandListener;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.RetryListener;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.BackOff;

@Configuration
@Profile(value={"!no-kafka"})
public class KafkaConfig
extends KafkaStreamsConfigBase {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConfig.class);
    @Value(value="${chain.main.kafka.consumer.retry.attempts:20}")
    private Integer retryAttempts;

    @Bean(value={"main.UserEvtWriteDao"})
    @Profile(value={"users-aspect", "all-aspects", "default"})
    public MonoWriteDao<String, UserEvent> userEvtWriteDao(@Qualifier(value="main.UserEvtTopic") NewTopic topic, ReactiveKafkaProducerTemplate<String, Object> template) {
        return new KafkaAsyncWriteDao(template, topic);
    }

    @Bean(value={"main.UserCommandListener"})
    @Profile(value={"users-aspect", "all-aspects", "default"})
    public UserCommandListener userCommandListener(UsersAspect aspect) {
        return new UserCommandListener(aspect);
    }

    @Bean(value={"main.PushTokenCommandListener"})
    @Profile(value={"push-token-aspect", "all-aspects", "default"})
    public PushTokenCommandListener pushTokenCommandListener(PushTokenAspect aspect) {
        return new PushTokenCommandListener(aspect);
    }

    @Bean(value={"main.TopicSubscriptionCommandListener"})
    @Profile(value={"topic-subscription-aspect", "all-aspects", "default"})
    public TopicSubscriptionCommandListener topicSubscriptionCommandListener(TopicSubscriptionAspect aspect) {
        return new TopicSubscriptionCommandListener(aspect);
    }

    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = super.objectMapper();
        mapper.registerModule((Module)new JavaTimeModule());
        mapper.registerModule((Module)new Hibernate7Module());
        return mapper;
    }

    @Bean(value={"kafkaTemplate", "initTemplate"})
    public KafkaTemplate<String, ? super Object> kafkaTemplate(ObjectMapper mapper) {
        return this.kafkaTemplate(Serdes.String().serializer(), (Serializer)new JsonSerializer(mapper));
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> reactiveKafkaProducerTemplate(KafkaProperties properties, ObjectMapper mapper) {
        return this.reactiveKafkaTemplate(properties, Serdes.String().serializer(), (Serializer)new JsonSerializer(mapper));
    }

    protected ProducerFactory<String, ?> producerFactory() {
        return new DefaultKafkaProducerFactory(this.producerConfig(), Serdes.String().serializer(), (Serializer)new JsonSerializer(this.objectMapper()));
    }

    @Bean
    @Profile(value={"notification-aspect", "all-aspects", "default"})
    public KafkaTemplate<String, Notification> notificationTemplate(@Qualifier(value="main.NotificationInboundTopic") NewTopic topic) {
        KafkaTemplate template = new KafkaTemplate(this.producerFactory(), true);
        template.setDefaultTopic(topic.name());
        return template;
    }

    @Bean(value={"main.UserEvtTemplate"})
    @Profile(value={"users-aspect", "all-aspects", "default"})
    public KafkaTemplate<String, UserEvent> userEvtTemplate(@Qualifier(value="main.UserEvtTopic") NewTopic topic) {
        return this.kafkaTemplate(Serdes.String().serializer(), (Serializer)new JsonSerializer(this.objectMapper()), topic.name());
    }

    @Bean(name={"main.ListenerContainerFactory"})
    public ConcurrentKafkaListenerContainerFactory<String, Object> listenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.newJsonConsumerFactory());
        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(this.retryAttempts.intValue());
        DefaultErrorHandler errorHandler = new DefaultErrorHandler((BackOff)backOff);
        errorHandler.setRetryListeners(new RetryListener[]{(record, ex, deliveryAttempt) -> LOG.warn("Retry attempt {} after: {}", (Object)deliveryAttempt, (Object)(ex.getCause() != null ? ex.getCause().getMessage() : ex.getMessage()))});
        errorHandler.addRetryableExceptions(new Class[]{SocketTimeoutException.class});
        errorHandler.addNotRetryableExceptions(new Class[]{NullPointerException.class});
        factory.setCommonErrorHandler((CommonErrorHandler)errorHandler);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", JsonDeserializer.class);
        props.put("enable.auto.commit", false);
        props.put("auto.offset.reset", "earliest");
        return props;
    }

    private ConsumerFactory<String, Object> newJsonConsumerFactory() {
        JsonDeserializer objectJsonDeserializer = new JsonDeserializer(this.objectMapper());
        objectJsonDeserializer.addTrustedPackages(new String[]{"*"});
        return this.newConsumerFactory((Deserializer)new ErrorHandlingDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new ErrorHandlingDeserializer((Deserializer)objectJsonDeserializer));
    }

    private <K, V> ConsumerFactory<K, V> newConsumerFactory(Deserializer<K> keyParser, Deserializer<V> valueParser) {
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(this.consumerConfigs());
        if (keyParser != null) {
            consumerFactory.setKeyDeserializer(keyParser);
        }
        if (valueParser != null) {
            consumerFactory.setValueDeserializer(valueParser);
        }
        return consumerFactory;
    }
}

