/*
 * Decompiled with CFR 0.152.
 */
package inc.yukawa.chain.kafka.connect.admin;

import inc.yukawa.chain.base.core.domain.result.EditResult;
import inc.yukawa.chain.base.core.domain.result.ResultDetail;
import inc.yukawa.chain.kafka.connect.admin.ConnectClient;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ConnectClientWebFlux
implements ConnectClient {
    private static final Logger log = LoggerFactory.getLogger(ConnectClientWebFlux.class);
    private final WebClient webClient;

    public ConnectClientWebFlux(WebClient webClient) {
        this.webClient = webClient;
    }

    @Override
    public Flux<String> findConnectorNames() {
        return this.webClient.get().uri("/connectors", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToFlux(String.class);
    }

    @Override
    public Mono<Map<String, Object>> loadConnector(String name) {
        return this.webClient.get().uri("/connectors/" + name, new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override
    public Mono<Map<String, Object>> loadConnectorConfig(String name) {
        return this.webClient.get().uri("/connectors/" + name + "/config", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override
    public Mono<Map<String, Object>> loadConnectorStatus(String name) {
        return this.webClient.get().uri("/connectors/" + name + "/status", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToMono(ParameterizedTypeReference.forType(Map.class));
    }

    @Override
    public Flux<Map<String, Object>> findConnectorTasks(String name) {
        return this.webClient.get().uri("/connectors/" + name + "/tasks", new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().bodyToFlux(ParameterizedTypeReference.forType(Map.class));
    }

    @Override
    public Mono<EditResult> createConnector(String name, Map<String, Object> config) {
        log.debug("createConnector: {} {}", (Object)name, config);
        HashMap<String, Object> req = new HashMap<String, Object>();
        req.put("name", name);
        req.put("config", config);
        long started = System.currentTimeMillis();
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.post().uri("/connectors", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).contentType(MediaType.APPLICATION_JSON).bodyValue(req).retrieve().toEntity(Map.class).map(entity -> {
            Map body = (Map)entity.getBody();
            log.trace("createConnector: entity {}", (Object)body);
            EditResult result = new EditResult("editConnector", String.class);
            if (body == null) {
                result.addMessage(ResultDetail.error((Object)"no response.body", (int)entity.getStatusCode().value(), (String)name));
            } else if (entity.getStatusCode().isError()) {
                result.addMessage(ResultDetail.error(body.get("message"), (int)entity.getStatusCode().value(), (String)name));
            } else {
                result.setKey(name);
            }
            result.setTook(Long.valueOf(System.currentTimeMillis() - started));
            return result;
        });
    }

    @Override
    public Mono<EditResult> updateConnector(String name, Map<String, Object> config) {
        throw new UnsupportedOperationException("ConnectClientWebFlux.updateConnector not implemented");
    }

    @Override
    public Mono<EditResult> deleteConnector(String name) {
        log.debug("deleteConnector: {}", (Object)name);
        return this.webClient.delete().uri("/connectors/" + name, new Object[0]).accept(new MediaType[]{MediaType.APPLICATION_JSON}).retrieve().toBodilessEntity().map(response -> {
            EditResult result = new EditResult("deleteConnector", String.class);
            HttpStatusCode status = response.getStatusCode();
            if (status.isError()) {
                result.addMessage(ResultDetail.error((Object)status, (int)status.value(), (String)name));
            } else {
                result.setKey(name);
            }
            return result;
        });
    }

    @Override
    public Mono<EditResult> pauseConnector(String name) {
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.put().uri("/connectors/" + name + "/pause", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).retrieve().toBodilessEntity().map(response -> {
            EditResult result = new EditResult("pauseConnector", String.class);
            HttpStatusCode status = response.getStatusCode();
            if (status.isError()) {
                result.addMessage(ResultDetail.error((Object)status, (int)status.value(), (String)name));
            } else {
                result.setKey(name);
            }
            return result;
        });
    }

    @Override
    public Mono<EditResult> restartConnector(String name) {
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.put().uri("/connectors/" + name + "/restart", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).retrieve().toBodilessEntity().map(response -> {
            EditResult result = new EditResult("restartConnector", String.class);
            HttpStatusCode status = response.getStatusCode();
            if (status.isError()) {
                result.addMessage(ResultDetail.error((Object)status, (int)status.value(), (String)name));
            } else {
                result.setKey(name);
            }
            return result;
        });
    }

    @Override
    public Mono<EditResult> resumeConnector(String name) {
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.put().uri("/connectors/" + name + "/resume", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).retrieve().toBodilessEntity().map(response -> {
            EditResult result = new EditResult("resumeConnector", String.class);
            HttpStatusCode status = response.getStatusCode();
            if (status.isError()) {
                result.addMessage(ResultDetail.error((Object)status, (int)status.value(), (String)name));
            } else {
                result.setKey(name);
            }
            return result;
        });
    }
}

