暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:分布式链路追踪

        在上一节搭完分布式追踪的采集展示链路后,这一节开始分析分析分布式链路追踪的核心源码。我们知道分布式追踪的原理是通过traceId串联调用链路上的所有服务和日志,每个服务都有一个自己的spanId,每一次rpc调用都需要生成一个子spanId,通过父子spanID的对应关系,构建一个有向无环图实现分布式追踪的。因此在业务代码的接入过程中需要实现如下功能,父子span关系的构建,父子span关系的传递(包括context内部传递和rpc服务之间的传递有可能跨协议比如http和grpc协议之间传递),rpc日志的采样,上报等等。每一个厂商都有自己的实现,opentrace定义了统一的标准接口,我们按照标准实现即可。在业务代码中实现包括四步:

1,定义tracer,包括采样配置和agent上报相关的配置,然后放入全局变量中。

2,服务端响应请求的时候解析传入的trace,放入context

3,发起下游调用的时候序列化trace,传递给下游

4,对于业务日志需要串联trace的地方,我们打印带context的日志,从context中提取trace和当前span的信息。

下面我们结合golang源码看下实现

    func main() {
    tracer, closer, err := middleware.NewTracer("rootTracerExample", "127.0.0.1:6831", false)
    defer closer.Close()
    if err != nil {
    panic(err)
    }
      opentracing.SetGlobalTracer(tracer)
    go grpc.Main()
    http.Main()
    }

    定义tracer

      import (
      "io"
      "time"


      "github.com/opentracing/opentracing-go"
      "github.com/uber/jaeger-client-go"
      jaegercfg "github.com/uber/jaeger-client-go/config"
      )


      var Tracer opentracing.Tracer


      // NewTracer 创建一个jaeger Tracer
      func NewTracer(servicename string, addr string, udp bool) (opentracing.Tracer, io.Closer, error) {
      cfg := jaegercfg.Configuration{
      ServiceName: servicename,
      Sampler: &jaegercfg.SamplerConfig{
      Type: jaeger.SamplerTypeConst, //固定采样
      Param: 1, //1全采样,0不采样
      },
      Reporter: &jaegercfg.ReporterConfig{
      LogSpans: true,
      BufferFlushInterval: 1 * time.Second,
      LocalAgentHostPort: addr, //"127.0.0.1:6831",
      },
      }


      sender, err := jaeger.NewUDPTransport(addr, 0)
      if err != nil {
      return nil, nil, err
      }
      if udp {
      reporter := jaeger.NewRemoteReporter(sender)
      // Initialize tracer with a logger and a metrics factory
      return cfg.NewTracer(
      jaegercfg.Reporter(reporter),
      )
      }
        return cfg.NewTracer()
      }

      为了演示完整效果,我们定义一个http服务和一个grpc服务,完成http调http+http调grpc+grpc调grpc

        syntax = "proto3";
        package test;
        option go_package = "learn/Jaeger/exp1/grpc";
        //定义服务
        service TestService {
        //注意:这里是returns 不是return
        rpc SayHello(Request) returns (Response){
        }
        rpc SayHello1(Request) returns (Response){
        }
        }
        //定义参数类型
        message Request {
        string message=1;
        }
        message Response {
        string message=1;
        }

        生成下代码

          % protoc --go-grpc_out=. learn/Jaeger/exp1/grpc/hello.proto
          % protoc --go_out=. learn/Jaeger/exp1/grpc/hello.proto

          定义grpc的服务端代码

            package grpc


            import (
            context "context"
            "fmt"
            "learn/learn/Jaeger/middleware"
            "log"
            "net"


            grpc "google.golang.org/grpc"
            )


            func Main() {
            srv := grpc.NewServer(grpc.UnaryInterceptor(middleware.TraceSpanServerInterceptor()))
            RegisterTestServiceServer(srv, &HelloService{})


            listener, err := net.Listen("tcp", ":8081")
            if err != nil {
            log.Fatalf("failed to listen: %v", err)
            }


            err = srv.Serve(listener)
            if err != nil {
            log.Fatalf("failed to serve: %v", err)
            }


            }


            type HelloService struct {
            }


            func (s *HelloService) mustEmbedUnimplementedTestServiceServer() {}


            func (s *HelloService) SayHello(ctx context.Context, r *Request) (*Response, error) {
            conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
            if err != nil {
            log.Fatalf("did not connect: %v", err)
            }
            defer conn.Close()
            client := NewTestServiceClient(conn)
            client.SayHello1(ctx, r)
            fmt.Println("SayHello", ctx)
            return &Response{}, nil
            }
            func (s *HelloService) SayHello1(ctx context.Context, r *Request) (*Response, error) {
            fmt.Println("SayHello1", ctx)
            return &Response{}, nil
            }


            实现一个简单的http服务

              package http


              import (
              "fmt"
              "log"
              "net/http"


              "learn/learn/Jaeger/middleware"


              mygrpc "learn/learn/Jaeger/exp1/grpc"


              "google.golang.org/grpc"
              )


              func Main() {
              mutx := http.NewServeMux()
              mutx.HandleFunc("/request1", request1)
              mutx.HandleFunc("/request2", request2)
              http.ListenAndServe(":8080", middleware.ServerTraceSpan(mutx))


              }


              func request1(w http.ResponseWriter, r *http.Request) {


              url := "http://localhost:8080/request2"


              bytes, err := middleware.ClientTraceSpan(r.Context(), "GET", url, nil)
              if err != nil {
              fmt.Fprint(w, err.Error())
              }
              fmt.Fprint(w, string(bytes))
              fmt.Println("request1", r.Context())
              }
              func request2(w http.ResponseWriter, r *http.Request) {
              conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
              if err != nil {
              log.Fatalf("did not connect: %v", err)
              }
              defer conn.Close()
              client := mygrpc.NewTestServiceClient(conn)
              client.SayHello(r.Context(), &mygrpc.Request{})
              fmt.Println("request2", r.Context())
              }


              我们通过middleware的方式实现trace的传递,对于gprc服务

                package middleware


                import (
                "context"
                "encoding/base64"
                "fmt"
                "strings"


                "github.com/opentracing/opentracing-go"
                "github.com/opentracing/opentracing-go/ext"
                "github.com/siddontang/go/log"
                "github.com/uber/jaeger-client-go"
                "google.golang.org/grpc"
                "google.golang.org/grpc/metadata"
                //"example/constants"
                )


                // TraceSpanClientInterceptor returns a grpc.UnaryServerInterceptor suitable
                // for use in a grpc.Dial() call.
                //
                // For example:
                //
                // conn, err := grpc.Dial(
                // address,
                // ..., (existing DialOptions)
                // grpc.WithUnaryInterceptor(rpc.TraceSpanClientInterceptor()),
                // )
                //
                // It writes current trace span to request metadata.
                func TraceSpanClientInterceptor() grpc.UnaryClientInterceptor {
                return func(
                ctx context.Context,
                method string, req, resp interface{},
                cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
                ) (err error) {
                span, ctx := opentracing.StartSpanFromContext(ctx, "RPC Client "+method)
                defer span.Finish()


                // Save current span context.
                md, ok := metadata.FromOutgoingContext(ctx)
                if !ok {
                md = metadata.Pairs()
                }
                if err = opentracing.GlobalTracer().Inject(
                span.Context(), opentracing.HTTPHeaders, metadataTextMap(md),
                ); err != nil {
                log.Errorf("Failed to inject trace span: %v", err)
                }
                return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
                }
                }


                // TraceSpanServerInterceptor returns a grpc.UnaryServerInterceptor suitable
                // for use in a grpc.NewServer call.
                //
                // For example:
                //
                // s := grpc.NewServer(
                // ..., (existing ServerOptions)
                // grpc.UnaryInterceptor(rpc.TraceSpanServerInterceptor()),
                // )
                //
                // It reads current trace span from request metadata.
                func TraceSpanServerInterceptor() grpc.UnaryServerInterceptor {
                return func(
                ctx context.Context,
                req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
                ) (resp interface{}, err error) {
                // Extract parent trace span.
                md, ok := metadata.FromIncomingContext(ctx)
                if !ok {
                md = metadata.Pairs()
                }
                parentSpanContext, err := opentracing.GlobalTracer().Extract(
                opentracing.HTTPHeaders, metadataTextMap(md),
                )
                switch err {
                case nil:
                case opentracing.ErrSpanContextNotFound:
                log.Info(ctx, "Parent span not found, will start new one.")
                default:
                log.Errorf("Failed to extract trace span: %v", err)
                }


                // Start new trace span.
                span := opentracing.StartSpan(
                "RPC Server "+info.FullMethod,
                ext.RPCServerOption(parentSpanContext),
                )
                defer span.Finish()
                ctx = opentracing.ContextWithSpan(ctx, span)


                // Set request ID for context.
                if sc, ok := span.Context().(jaeger.SpanContext); ok {
                ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
                }


                return handler(ctx, req)
                }
                }


                const (
                binHeaderSuffix = "_bin"
                )


                // metadataTextMap extends a metadata.MD to be an opentracing textmap
                type metadataTextMap metadata.MD


                // Set is a opentracing.TextMapReader interface that extracts values.
                func (m metadataTextMap) Set(key, val string) {
                // gRPC allows for complex binary values to be written.
                encodedKey, encodedVal := encodeKeyValue(key, val)
                // The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
                // we just override.
                m[encodedKey] = []string{encodedVal}
                }


                // ForeachKey is a opentracing.TextMapReader interface that extracts values.
                func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
                for k, vv := range m {
                for _, v := range vv {
                if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
                if err = callback(decodedKey, decodedVal); err != nil {
                return err
                }
                } else {
                return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
                }
                }
                }
                return nil
                }


                // encodeKeyValue encodes key and value qualified for transmission via gRPC.
                // note: copy pasted from private values of grpc.metadata
                func encodeKeyValue(k, v string) (string, string) {
                k = strings.ToLower(k)
                if strings.HasSuffix(k, binHeaderSuffix) {
                val := base64.StdEncoding.EncodeToString([]byte(v))
                v = string(val)
                }
                return k, v
                }

                由于官方默认包里只实现了bin,kv和httpHeader三种格式的carrier,因此对于grpc服务需要自己实现carrier。对于http服务实现如下

                  package middleware


                  import (
                  "context"
                  "fmt"
                  "io"
                  "io/ioutil"
                  "net/http"


                  "github.com/opentracing/opentracing-go"
                  "github.com/opentracing/opentracing-go/ext"
                  "github.com/uber/jaeger-client-go"
                  )


                  const TraceHeader = "Http-TraceHeader"


                  func ClientTraceSpan(ctx context.Context, method, url string, body io.Reader) (resBody []byte, err error) {
                  client := &http.Client{}
                  req, err := http.NewRequest(method, url, body)
                  if err != nil {
                  panic(err)
                  }


                  span, _ := opentracing.StartSpanFromContext(ctx, TraceHeader)
                    defer span.Finish()
                  ext.SpanKindRPCClient.Set(span)
                  ext.HTTPUrl.Set(span, url)
                  ext.HTTPMethod.Set(span, "GET")
                  span.Tracer().Inject(
                  span.Context(),
                  opentracing.HTTPHeaders,
                  opentracing.HTTPHeadersCarrier(req.Header),
                    )
                  ctx = opentracing.ContextWithSpan(ctx, span)
                  req.WithContext(ctx)
                  resp, err := client.Do(req)
                  if err != nil {
                  fmt.Println("请求错误:", err)
                    }
                  return ioutil.ReadAll(resp.Body)
                  }


                  //中间件
                  func ServerTraceSpan(next http.Handler) http.Handler {
                  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                  tracer := opentracing.GlobalTracer()
                      // 从ctx获取span
                  if parent := opentracing.SpanFromContext(r.Context()); parent != nil {
                  parentCtx := parent.Context()
                  // 获取opentracing中的全局tracer
                  if tracer := opentracing.GlobalTracer(); tracer != nil {
                  mySpan := tracer.StartSpan("my info", opentracing.ChildOf(parentCtx))
                  // 由于前面opentracing中的tracer是jaeger的,所以你这里转化为jaeger.SpanContext
                  if sc, ok := mySpan.Context().(jaeger.SpanContext); ok {
                  // 这里,就能获取traceid等信息了,可以放在日志里
                  w.Header().Set(TraceHeader, sc.TraceID().String())
                  }
                  defer mySpan.Finish()
                  }
                  }


                  spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
                  span := opentracing.StartSpan(
                  "RPC Server "+r.RequestURI,
                  ext.RPCServerOption(spanCtx),
                  )
                  defer span.Finish()
                  ctx := opentracing.ContextWithSpan(r.Context(), span)


                  if sc, ok := span.Context().(jaeger.SpanContext); ok {
                        ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
                        w.Header().Set(TraceHeader, sc.TraceID().String())
                      }
                  next.ServeHTTP(w, r.WithContext(ctx))
                  })
                  }

                  为了方便测试可以把traceID单独提出来放入httpheader里面,测试下

                    % curl -vi http://127.0.0.1:8080/request1
                    * Trying 127.0.0.1:8080...
                    * Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
                    > GET /request1 HTTP/1.1
                    > Host: 127.0.0.1:8080
                    > User-Agent: curl/7.79.1
                    > Accept: */*
                    >
                    * Mark bundle as not supporting multiuse
                    < HTTP/1.1 200 OK
                    HTTP/1.1 200 OK
                    < Http-Traceheader: 73f6efd73f361c12
                    Http-Traceheader: 73f6efd73f361c12
                    < Date: Sun, 23 Oct 2022 19:46:28 GMT
                    Date: Sun, 23 Oct 2022 19:46:28 GMT
                    < Content-Length: 0
                    Content-Length: 0


                    <
                    Connection #0 to host 127.0.0.1 left intact

                    效果如下

                            当然上述实现还是很粗糙的,比如为了方便使用默认的contextKey 传递trace信息,没有实现自定义的Extract和Inject方法,导致client和server各打印了一份trace信息。下一期在源码实现分析的时候介绍如何优化。

                    文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论