暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang实现mcp的sse传输方式

        访问本地服务的MCP server的实现都是基于 STDIO 的方式进行通信,这种方式是靠本地进程间的标准的输入输出协议实现通信的。但是通常我们现有的微服务都是web端的应用,STDIO 的方式在这种场景下并不适用,因此,MCP协议提供了另一种通信方式,即SSE (Server-Sent Events) 传输方式。 MCP的 SSE 传输是一种基于 HTTP 的通信机制,主要用于实现服务器到客户端的流式传输。
        下面我们实现一个echo的MCP SSE server
    package main
    import (
        "context"
        "fmt"
        "log"


        "github.com/mark3labs/mcp-go/mcp"
        "github.com/mark3labs/mcp-go/server"
    )
    type MCPServer struct {
        server *server.MCPServer
    }
    func NewMCPServer() *MCPServer {
        mcpServer := server.NewMCPServer(
            "example-server",
            "1.0.0",
            server.WithResourceCapabilities(truetrue),
            server.WithPromptCapabilities(true),
            server.WithToolCapabilities(true),
        )
        // Add echo tool
        mcpServer.AddTool(mcp.NewTool("echo",
            mcp.WithDescription("Echo back the input"),
            mcp.WithString("message",
                mcp.Required(),
                mcp.Description("Message to echo back"),
            ),
        ), echoHandler)
        return &MCPServer{
            server: mcpServer,
        }
    }
    func main() {
        s := NewMCPServer()
        sseServer := s.ServeSSE("localhost:8080")
        log.Printf("SSE server listening on :8080")
        if err := sseServer.Start(":8080"); err != nil {
            log.Fatalf("Server error: %v", err)
        }
    }
    func echoHandler(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        msg, ok := req.Params.Arguments["message"].(string)
        if !ok {
            return nil, fmt.Errorf("invalid message parameter")
        }
        return mcp.NewToolResultText(fmt.Sprintf("Echo: %s", msg)), nil
    }
    func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
        return server.NewSSEServer(s.server,
            server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
        )
    }
    我们解析message参数,然后返回。和STDIO 协议MCP servergolang实现mcp server的区别是使用了函数NewSSEServer而不是NewStdioServer,具体到函数内部,实现了两个http uri
      func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
          s := &SSEServer{
              server:                       server,
              sseEndpoint:                  "/sse",
              messageEndpoint:              "/message",
              useFullURLForMessageEndpoint: true,
          }
      启动服务后我们访问下这两个url
        curl 'http://localhost:8080/sse'
        event: endpoint
        data: http://localhost:8080/message?sessionId=91c61b81-a84d-465b-89e4-f9ebd244c958
        可以看到访问/sse的时候,返回了message路径的完整链接,并带了sessionId参数,每个session都不一样
          curl -X POST 'http://localhost:8080/message?sessionId=dd92c1c3-62b3-47d8-bf79-5be071839779'
          {"jsonrpc":"2.0","id":null,"error":{"code":-32700,"message":"Parse error"}}
          curl -X POST 'http://localhost:8080/message?sessionId=265887bb-7193-490c-a25c-6644aa6ec8cd' -H 'Content-Type: application/json' -d '{"jsonrpc":"2.0","id":null,"method":"tools/call","params":{"name":"echo","arguments":{"message":"Hello SSE!"}}}'
          请求后就是我们熟悉的mcp json-rpc协议,然后我们简单实现一个mcp client来使用它
            package main
            import (
                "context"
                "encoding/json"
                "fmt"
                "log"
                "time"
                "github.com/mark3labs/mcp-go/client"
                "github.com/mark3labs/mcp-go/mcp"
            )
            func main() {
                ctx := context.Background()
                client, err := client.NewSSEMCPClient("http://localhost:8080/sse")
                if err != nil {
                    log.Fatalf("Failed to create SSE MCP client: %v", err)
                }
                err = client.Start(ctx)
                if err != nil {
                    log.Fatalf("Failed to start SSE MCP client: %v", err)
                }
                // Initialize
                initRequest := mcp.InitializeRequest{}
                initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
                initRequest.Params.ClientInfo = mcp.Implementation{
                    Name:    "test-client",
                    Version: "1.0.0",
                }
                _, err = client.Initialize(ctx, initRequest)
                if err != nil {
                    log.Fatalf("Failed to Initialize SSE MCP client: %v", err)
                }
                request := mcp.CallToolRequest{
                    Request: mcp.Request{
                        Method: "tools/call",
                    },
                }
                arguments := map[string]interface{}{
                    "message""Hello SSE!",
                }
                request.Params.Name = "echo"
                request.Params.Arguments = arguments
                d, _ := json.Marshal(request)
                fmt.Println(string(d))
                // Test echo tool
                result, err := client.CallTool(context.Background(), request)
                if err != nil {
                    return
                }
                textContent := result.Content[0].(mcp.TextContent)
                fmt.Println(textContent.Text)
                time.Sleep(100 * time.Second)
            }
            返回如下
              {"method":"tools/call","params":{"name":"echo","arguments":{"message":"Hello SSE!"}}}
              EchoHello SSE!
              研究下源码我们发现client。Start其实是向url  http://localhost:8080/sse 发起 GET请求
                func (c *SSEMCPClient) Start(ctx context.Context) error {
                    req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL.String(), nil)
                然后解析服务端返回指向/message的POST 链接
                  func (c *SSEMCPClientreadSSE(reader io.ReadCloser) {
                        if event != "" && data != "" {
                               c.handleSSEEvent(event, data)
                         }
                  然后具体处理
                    func (c *SSEMCPClient) handleSSEEvent(event, data string) {
                        switch event {
                        case "endpoint":
                            endpoint, err := c.baseURL.Parse(data)
                            c.endpoint = endpoint
                        case "message":
                            for _, handler := range c.notifications {
                                    handler(notification)
                                }
                    最后看下CallTool调用
                      func (c *SSEMCPClient) CallTool(
                          ctx context.Context,
                          request mcp.CallToolRequest,
                      ) (*mcp.CallToolResult, error) {
                          response, err := c.sendRequest(ctx, "tools/call", request.Params)
                        func (c *SSEMCPClient) sendRequest(
                            ctx context.Context,
                            method string,
                            params interface{},
                        ) (*json.RawMessage, error) {
                            req, err := http.NewRequestWithContext(
                                ctx,
                                "POST",
                                c.endpoint.String(),
                                bytes.NewReader(requestBytes),
                            )
                        就是根据上面解析的message路径,发送请求。

                        文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论