暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MCP源码分析:SSE

        启动一个sse的mcp server的过程前三步和stdio的流程一模一样:定义server描述,定义tool描述,将tool注册到server的map里
    mcpServer := server.NewMCPServer(
            "example-server",
            "1.0.0",
            server.WithResourceCapabilities(true, true),
            server.WithPromptCapabilities(true),
            server.WithToolCapabilities(true),
        )
        / Add echo tool
        mcpServer.AddTool(mcp.NewTool("echo",
            mcp.WithDescription("Echo back the input"),
            mcp.WithString("message",
                mcp.Required(),
                mcp.Description("Message to echo back"),
            ),
        ), echoHandler)
            后面不同的是下面两步
    4,定义SSE server
      sseServer := s.ServeSSE("localhost:8080")


        func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
            return server.NewSSEServer(s.server,
                server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
            )
        }
        5,启动服务
          if err := sseServer.Start(":8080"); err != nil {
                  创建SSEServer和StdioServer流程几乎一样,知不多多了endpoint和messagepoint两个常量,它是SSE的uri路径
            func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
                s := &SSEServer{
                    server:                       server,
                    sseEndpoint:                  "/sse",
                    messageEndpoint:              "/message",
                    useFullURLForMessageEndpoint: true,
                    keepAlive:                    false,
                    keepAliveInterval:            10 * time.Second,
                }
                    SSEServer没有用map来保存路径到工具的映射,里面封装了一个http server,因为它是基于http协议的长链接服务。
              type SSEServer struct {
                  server                       *MCPServer
                  baseURL                      string
                  basePath                     string
                  appendQueryToMessageEndpoint bool
                  useFullURLForMessageEndpoint bool
                  messageEndpoint              string
                  sseEndpoint                  string
                  sessions                     sync.Map
                  srv                          *http.Server
                  contextFunc                  HTTPContextFunc
                  dynamicBasePathFunc          DynamicBasePathFunc
                  keepAlive         bool
                  keepAliveInterval time.Duration
                  mu sync.RWMutex
              }
              它的start函数就是http的监听函数
                func (s *SSEServer) Start(addr stringerror {
                    s.mu.Lock()
                    if s.srv == nil {
                        s.srv = &http.Server{
                            Addr:    addr,
                            Handler: s,
                        }
                    } else {
                        if s.srv.Addr == "" {
                            s.srv.Addr = addr
                        } else if s.srv.Addr != addr {
                            return fmt.Errorf("conflicting listen address: WithHTTPServer(%q) vs Start(%q)", s.srv.Addr, addr)
                        }
                    }
                    srv := s.srv
                    s.mu.Unlock()
                    return srv.ListenAndServe()
                }
                /sse和/messages的路由注册函数如下
                  func (s *SSEServer) SSEHandler() http.Handler {
                      return http.HandlerFunc(s.handleSSE)
                  }
                    func (s *SSEServer) MessageHandler() http.Handler {
                        return http.HandlerFunc(s.handleMessage)
                    }
                    最后我们看看ServeHttp函数,可以看到,在函数内部直接判断了路径,如果路径匹配直接调用对应的处理函数。如果不是多租户的动态路径,我们没有必要注册路由。
                      func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
                          ssePath := s.CompleteSsePath()
                          if ssePath != "" && path == ssePath {
                              s.handleSSE(w, r)
                          messagePath := s.CompleteMessagePath()
                          if messagePath != "" && path == messagePath {
                              s.handleMessage(w, r)
                          http.NotFound(w, r)
                      接着看看handleSSE函数的具体实现,定义了消息的类型是text/event-dtream,每次调用生成一个uuid作为sessionId。通过定时器不断发送ping请求和clien维持长连接。把sessionId拼接到/messages的url给客户端返回。
                        func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
                            if r.Method != http.MethodGet {
                                http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
                                return
                            }
                            w.Header().Set("Content-Type""text/event-stream")
                            w.Header().Set("Cache-Control""no-cache")
                            w.Header().Set("Connection""keep-alive")
                            w.Header().Set("Access-Control-Allow-Origin""*")
                            
                            sessionID := uuid.New().String()
                            
                                   for {
                                        select {
                                        case <-ticker.C:
                                            message := mcp.JSONRPCRequest{
                                                JSONRPC: "2.0",
                                                ID:      session.requestID.Add(1),
                                                Request: mcp.Request{
                                                    Method: "ping",
                                                },
                                            }
                                            messageBytes, _ := json.Marshal(message)
                                            pingMsg := fmt.Sprintf("event: message\ndata:%s\n\n", messageBytes)
                                            select {
                                            case session.eventQueue <- pingMsg:
                                            
                            endpoint := s.GetMessageEndpointForClient(r, sessionID)
                            if s.appendQueryToMessageEndpoint && len(r.URL.RawQuery) > 0 {
                                endpoint += "&" + r.URL.RawQuery
                            }
                            fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", endpoint)
                                handleMessage方法,会根据用户请求中的sessionId去加载session信息,然后用json-rpc协议解析请求,然后启动协程来处理请求。
                          func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
                              if r.Method != http.MethodPost {
                                  s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
                                  return
                              }
                              sessionID := r.URL.Query().Get("sessionId")
                              if sessionID == "" {
                                  s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
                                  return
                              }
                              sessionI, ok := s.sessions.Load(sessionID)
                              if !ok {
                                  s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
                                  return
                              }
                              session := sessionI.(*sseSession)
                              
                              var rawMessage json.RawMessage
                              if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
                                  s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
                                  return
                              }
                              go func(ctx context.Context) {
                                  defer cancel()
                                  // Use the context that will be canceled when session is done
                                  // Process message through MCPServer
                                  response := s.server.HandleMessage(ctx, rawMessage)
                                  // Only send response if there is one (not for notifications)
                                  if response != nil {
                                      var message string
                                      if eventData, err := json.Marshal(response); err != nil {
                          最后处理消息的函数和stdio方式是同一个函数,根据不同method进行分发处理。
                            func (s *MCPServer) HandleMessage(
                                ctx context.Context,
                                message json.RawMessage,
                            ) mcp.JSONRPCMessage {
                                 switch baseMessage.Method {
                                 case mcp.MethodInitialize:

                            文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论