/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ml.engine.algorithms.remote;

import com.google.gson.Gson;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.TokenBucket;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.common.agent.MLToolSpec;
import org.opensearch.ml.common.connector.Connector;
import org.opensearch.ml.common.connector.McpStreamableHttpConnector;
import org.opensearch.ml.common.exception.MLException;
import org.opensearch.ml.common.input.MLInput;
import org.opensearch.ml.common.model.MLGuard;
import org.opensearch.ml.common.output.model.ModelTensors;
import org.opensearch.ml.common.transport.MLTaskResponse;
import org.opensearch.ml.common.utils.StringUtils;
import org.opensearch.ml.engine.algorithms.remote.AbstractConnectorExecutor;
import org.opensearch.ml.engine.algorithms.remote.ExecutionContext;
import org.opensearch.ml.engine.algorithms.remote.streaming.StreamPredictActionListener;
import org.opensearch.ml.engine.annotation.ConnectorExecutor;
import org.opensearch.ml.engine.tools.McpStreamableHttpTool;
import org.opensearch.script.ScriptService;
import org.opensearch.transport.client.Client;

@ConnectorExecutor(value="mcp_streamable_http")
public class McpStreamableHttpConnectorExecutor
extends AbstractConnectorExecutor {
    @Generated
    private static final Logger log = LogManager.getLogger(McpStreamableHttpConnectorExecutor.class);
    private McpStreamableHttpConnector connector;

    public McpStreamableHttpConnectorExecutor(Connector connector) {
        super.initialize(connector);
        this.connector = (McpStreamableHttpConnector)connector;
    }

    public List<MLToolSpec> getMcpToolSpecs() {
        String mcpServerUrl = this.connector.getUrl();
        String endpoint = Optional.ofNullable(this.connector.getParameters()).map(params -> (String)params.get("endpoint")).orElse("/mcp/");
        ArrayList<MLToolSpec> mcpToolSpecs = new ArrayList<MLToolSpec>();
        try {
            Duration connectionTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getConnectionTimeout().intValue());
            Duration readTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getReadTimeout().intValue());
            Consumer<HttpRequest.Builder> headerConfig = builder -> {
                if (this.connector.getDecryptedHeaders() != null) {
                    for (Map.Entry entry : this.connector.getDecryptedHeaders().entrySet()) {
                        builder.header((String)entry.getKey(), (String)entry.getValue());
                    }
                }
            };
            HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder((String)mcpServerUrl).endpoint(endpoint).customizeClient(clientBuilder -> {
                clientBuilder.connectTimeout(connectionTimeout);
                clientBuilder.followRedirects(HttpClient.Redirect.NORMAL);
            }).customizeRequest(headerConfig).build();
            McpSyncClient client = McpClient.sync((McpClientTransport)transport).requestTimeout(readTimeout).capabilities(McpSchema.ClientCapabilities.builder().roots(Boolean.valueOf(false)).build()).build();
            client.initialize();
            McpSchema.ListToolsResult tools = client.listTools();
            Gson gson = new Gson();
            String json = gson.toJson((Object)tools, McpSchema.ListToolsResult.class);
            Map map = (Map)gson.fromJson(json, Map.class);
            List mcpTools = (List)map.get("tools");
            for (Object tool : mcpTools) {
                Map toolMap = (Map)tool;
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.put("input_schema", StringUtils.toJson(toolMap.get("inputSchema")));
                String description = toolMap.containsKey("description") ? StringUtils.processTextDoc((String)toolMap.get("description").toString()) : McpStreamableHttpTool.DEFAULT_DESCRIPTION;
                MLToolSpec mlToolSpec = MLToolSpec.builder().type("McpStreamableHttpTool").name(toolMap.get("name").toString()).description(description).attributes(attributes).build();
                mlToolSpec.addRuntimeResource("mcp_sync_client", (Object)client);
                mcpToolSpecs.add(mlToolSpec);
            }
            return mcpToolSpecs;
        }
        catch (Exception e) {
            throw new MLException("Unexpected error while getting MCP tools", (Throwable)e);
        }
    }

    @Override
    public ScriptService getScriptService() {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public TokenBucket getRateLimiter() {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public Map<String, TokenBucket> getUserRateLimiterMap() {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public MLGuard getMlGuard() {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public Client getClient() {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public Logger getLogger() {
        return log;
    }

    @Override
    public void invokeRemoteService(String action, MLInput mlInput, Map<String, String> parameters, String payload, ExecutionContext executionContext, ActionListener<Tuple<Integer, ModelTensors>> actionListener) {
        throw new UnsupportedOperationException("Not implemented.");
    }

    @Override
    public void invokeRemoteServiceStream(String action, MLInput mlInput, Map<String, String> parameters, String payload, ExecutionContext executionContext, StreamPredictActionListener<MLTaskResponse, ?> actionListener) {
        throw new UnsupportedOperationException("Streaming is not supported for MCP connector.");
    }

    @Generated
    public McpStreamableHttpConnector getConnector() {
        return this.connector;
    }
}

