package inc.yukawa.chain.modules.main.service.user.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import inc.yukawa.chain.kafka.event.KafkaEventHandlerStream;
import inc.yukawa.chain.kafka.util.KafkaUtil;
import inc.yukawa.chain.modules.main.core.domain.user.User;
import inc.yukawa.chain.modules.main.core.event.user.UserEvent;
import inc.yukawa.chain.security.domain.Account;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:inc/yukawa/chain/modules/main/service/user/stream/UserEventStream.class */
public class UserEventStream extends KafkaEventHandlerStream<String, User, UserEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(UserEventStream.class);
    protected final NewTopic accountTopic;
    protected final Serde<Account> accountSerde;

    public UserEventStream(@Qualifier("main.UserStreamsProps") Properties properties, @Qualifier("main.UserDataTopic") NewTopic newTopic, @Qualifier("main.UserEventTopic") NewTopic newTopic2, @Qualifier("main.AccountTopic") NewTopic newTopic3, ObjectMapper objectMapper, Aggregator<Object, UserEvent, UserEvent> aggregator) {
        super(properties, newTopic, newTopic2, Serdes.String(), KafkaUtil.getSerDes(User.class, false, objectMapper), KafkaUtil.getSerDes(UserEvent.class, false, objectMapper), aggregator);
        this.accountTopic = newTopic3;
        this.accountSerde = KafkaUtil.getSerDes(Account.class, false, objectMapper);
    }

    protected KStream<String, UserEvent> eventPostProcess(KStream<String, UserEvent> kStream) {
        KStream<String, UserEvent> eventPostProcess = super.eventPostProcess(kStream);
        eventPostProcess.map((str, userEvent) -> {
            return this.eventHandler.toAccountEvent(str, userEvent);
        }).filter((str2, account) -> {
            return str2 != null;
        }).peek((str3, account2) -> {
            LOG.debug("[{}@{}] account: {}", new Object[]{this.accountTopic.name(), str3, account2});
        }).to(this.accountTopic.name(), Produced.with(Serdes.String(), this.accountSerde));
        return eventPostProcess;
    }
}
