package inc.yukawa.chain.kafka.connect.admin;

import inc.yukawa.chain.base.core.domain.result.EditResult;
import inc.yukawa.chain.base.core.domain.result.ResultDetail;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:inc/yukawa/chain/kafka/connect/admin/ConnectClientWebFlux.class */
public class ConnectClientWebFlux implements ConnectClient {
    private static final Logger log = LoggerFactory.getLogger(ConnectClientWebFlux.class);
    private final WebClient webClient;

    public ConnectClientWebFlux(WebClient webClient) {
        this.webClient = webClient;
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Flux<String> findConnectorNames() {
        return this.webClient.get().uri("/connectors", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToFlux(String.class);
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<Map<String, Object>> loadConnector(String str) {
        return this.webClient.get().uri("/connectors/" + str, new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<Map<String, Object>> loadConnectorConfig(String str) {
        return this.webClient.get().uri("/connectors/" + str + "/config", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<Map<String, Object>> loadConnectorStatus(String str) {
        return this.webClient.get().uri("/connectors/" + str + "/status", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Flux<Map<String, Object>> findConnectorTasks(String str) {
        return this.webClient.get().uri("/connectors/" + str + "/tasks", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToFlux(ParameterizedTypeReference.forType(Map.class));
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> createConnector(String str, Map<String, Object> map) {
        log.debug("createConnector: {} {}", str, map);
        HashMap hashMap = new HashMap();
        hashMap.put("name", str);
        hashMap.put("config", map);
        long currentTimeMillis = System.currentTimeMillis();
        return this.webClient.post().uri("/connectors", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).contentType(MediaType.APPLICATION_JSON).syncBody(hashMap).exchange().flatMap(clientResponse -> {
            return clientResponse.toEntity(Map.class);
        }).map(responseEntity -> {
            Map map2 = (Map) responseEntity.getBody();
            log.trace("createConnector: entity {}", map2);
            EditResult editResult = new EditResult("editConnector", String.class);
            if (map2 == null) {
                editResult.addMessage(ResultDetail.error("no response.body", responseEntity.getStatusCodeValue(), str));
            } else if (responseEntity.getStatusCode().isError()) {
                editResult.addMessage(ResultDetail.error(map2.get("message"), responseEntity.getStatusCodeValue(), str));
            } else {
                editResult.setKey(str);
            }
            editResult.setTook(Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return editResult;
        });
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> updateConnector(String str, Map<String, Object> map) {
        throw new UnsupportedOperationException("ConnectClientWebFlux.updateConnector not implemented");
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> deleteConnector(String str) {
        log.debug("deleteConnector: {}", str);
        return this.webClient.delete().uri("/connectors/" + str, new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).exchange().map(clientResponse -> {
            EditResult editResult = new EditResult("deleteConnector", String.class);
            HttpStatus statusCode = clientResponse.statusCode();
            if (statusCode.isError()) {
                editResult.addMessage(ResultDetail.error(statusCode.name(), statusCode.value(), str));
            } else {
                editResult.setKey(str);
            }
            return editResult;
        });
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> pauseConnector(String str) {
        return this.webClient.put().uri("/connectors/" + str + "/pause", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).exchange().map(clientResponse -> {
            EditResult editResult = new EditResult("pauseConnector", String.class);
            HttpStatus statusCode = clientResponse.statusCode();
            if (statusCode.isError()) {
                editResult.addMessage(ResultDetail.error(statusCode.name(), statusCode.value(), str));
            } else {
                editResult.setKey(str);
            }
            return editResult;
        });
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> restartConnector(String str) {
        return this.webClient.put().uri("/connectors/" + str + "/restart", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).exchange().map(clientResponse -> {
            EditResult editResult = new EditResult("restartConnector", String.class);
            HttpStatus statusCode = clientResponse.statusCode();
            if (statusCode.isError()) {
                editResult.addMessage(ResultDetail.error(statusCode.name(), statusCode.value(), str));
            } else {
                editResult.setKey(str);
            }
            return editResult;
        });
    }

    @Override // inc.yukawa.chain.kafka.connect.admin.ConnectClient
    public Mono<EditResult> resumeConnector(String str) {
        return this.webClient.put().uri("/connectors/" + str + "/resume", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).exchange().map(clientResponse -> {
            EditResult editResult = new EditResult("resumeConnector", String.class);
            HttpStatus statusCode = clientResponse.statusCode();
            if (statusCode.isError()) {
                editResult.addMessage(ResultDetail.error(statusCode.name(), statusCode.value(), str));
            } else {
                editResult.setKey(str);
            }
            return editResult;
        });
    }
}
