在上一节搭完分布式追踪的采集展示链路后,这一节开始分析分析分布式链路追踪的核心源码。我们知道分布式追踪的原理是通过traceId串联调用链路上的所有服务和日志,每个服务都有一个自己的spanId,每一次rpc调用都需要生成一个子spanId,通过父子spanID的对应关系,构建一个有向无环图实现分布式追踪的。因此在业务代码的接入过程中需要实现如下功能,父子span关系的构建,父子span关系的传递(包括context内部传递和rpc服务之间的传递有可能跨协议比如http和grpc协议之间传递),rpc日志的采样,上报等等。每一个厂商都有自己的实现,opentrace定义了统一的标准接口,我们按照标准实现即可。在业务代码中实现包括四步:
1,定义tracer,包括采样配置和agent上报相关的配置,然后放入全局变量中。
2,服务端响应请求的时候解析传入的trace,放入context
3,发起下游调用的时候序列化trace,传递给下游
4,对于业务日志需要串联trace的地方,我们打印带context的日志,从context中提取trace和当前span的信息。
下面我们结合golang源码看下实现
func main() {tracer, closer, err := middleware.NewTracer("rootTracerExample", "127.0.0.1:6831", false)defer closer.Close()if err != nil {panic(err)}opentracing.SetGlobalTracer(tracer)go grpc.Main()http.Main()}
定义tracer
import ("io""time""github.com/opentracing/opentracing-go""github.com/uber/jaeger-client-go"jaegercfg "github.com/uber/jaeger-client-go/config")var Tracer opentracing.Tracer// NewTracer 创建一个jaeger Tracerfunc NewTracer(servicename string, addr string, udp bool) (opentracing.Tracer, io.Closer, error) {cfg := jaegercfg.Configuration{ServiceName: servicename,Sampler: &jaegercfg.SamplerConfig{Type: jaeger.SamplerTypeConst, //固定采样Param: 1, //1全采样,0不采样},Reporter: &jaegercfg.ReporterConfig{LogSpans: true,BufferFlushInterval: 1 * time.Second,LocalAgentHostPort: addr, //"127.0.0.1:6831",},}sender, err := jaeger.NewUDPTransport(addr, 0)if err != nil {return nil, nil, err}if udp {reporter := jaeger.NewRemoteReporter(sender)// Initialize tracer with a logger and a metrics factoryreturn cfg.NewTracer(jaegercfg.Reporter(reporter),)}return cfg.NewTracer()}
为了演示完整效果,我们定义一个http服务和一个grpc服务,完成http调http+http调grpc+grpc调grpc
syntax = "proto3";package test;option go_package = "learn/Jaeger/exp1/grpc";//定义服务service TestService {//注意:这里是returns 不是returnrpc SayHello(Request) returns (Response){}rpc SayHello1(Request) returns (Response){}}//定义参数类型message Request {string message=1;}message Response {string message=1;}
生成下代码
% protoc --go-grpc_out=. learn/Jaeger/exp1/grpc/hello.proto% protoc --go_out=. learn/Jaeger/exp1/grpc/hello.proto
定义grpc的服务端代码
package grpcimport (context "context""fmt""learn/learn/Jaeger/middleware""log""net"grpc "google.golang.org/grpc")func Main() {srv := grpc.NewServer(grpc.UnaryInterceptor(middleware.TraceSpanServerInterceptor()))RegisterTestServiceServer(srv, &HelloService{})listener, err := net.Listen("tcp", ":8081")if err != nil {log.Fatalf("failed to listen: %v", err)}err = srv.Serve(listener)if err != nil {log.Fatalf("failed to serve: %v", err)}}type HelloService struct {}func (s *HelloService) mustEmbedUnimplementedTestServiceServer() {}func (s *HelloService) SayHello(ctx context.Context, r *Request) (*Response, error) {conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := NewTestServiceClient(conn)client.SayHello1(ctx, r)fmt.Println("SayHello", ctx)return &Response{}, nil}func (s *HelloService) SayHello1(ctx context.Context, r *Request) (*Response, error) {fmt.Println("SayHello1", ctx)return &Response{}, nil}
实现一个简单的http服务
package httpimport ("fmt""log""net/http""learn/learn/Jaeger/middleware"mygrpc "learn/learn/Jaeger/exp1/grpc""google.golang.org/grpc")func Main() {mutx := http.NewServeMux()mutx.HandleFunc("/request1", request1)mutx.HandleFunc("/request2", request2)http.ListenAndServe(":8080", middleware.ServerTraceSpan(mutx))}func request1(w http.ResponseWriter, r *http.Request) {url := "http://localhost:8080/request2"bytes, err := middleware.ClientTraceSpan(r.Context(), "GET", url, nil)if err != nil {fmt.Fprint(w, err.Error())}fmt.Fprint(w, string(bytes))fmt.Println("request1", r.Context())}func request2(w http.ResponseWriter, r *http.Request) {conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := mygrpc.NewTestServiceClient(conn)client.SayHello(r.Context(), &mygrpc.Request{})fmt.Println("request2", r.Context())}
我们通过middleware的方式实现trace的传递,对于gprc服务
package middlewareimport ("context""encoding/base64""fmt""strings""github.com/opentracing/opentracing-go""github.com/opentracing/opentracing-go/ext""github.com/siddontang/go/log""github.com/uber/jaeger-client-go""google.golang.org/grpc""google.golang.org/grpc/metadata"//"example/constants")// TraceSpanClientInterceptor returns a grpc.UnaryServerInterceptor suitable// for use in a grpc.Dial() call.//// For example://// conn, err := grpc.Dial(// address,// ..., (existing DialOptions)// grpc.WithUnaryInterceptor(rpc.TraceSpanClientInterceptor()),// )//// It writes current trace span to request metadata.func TraceSpanClientInterceptor() grpc.UnaryClientInterceptor {return func(ctx context.Context,method string, req, resp interface{},cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,) (err error) {span, ctx := opentracing.StartSpanFromContext(ctx, "RPC Client "+method)defer span.Finish()// Save current span context.md, ok := metadata.FromOutgoingContext(ctx)if !ok {md = metadata.Pairs()}if err = opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, metadataTextMap(md),); err != nil {log.Errorf("Failed to inject trace span: %v", err)}return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)}}// TraceSpanServerInterceptor returns a grpc.UnaryServerInterceptor suitable// for use in a grpc.NewServer call.//// For example://// s := grpc.NewServer(// ..., (existing ServerOptions)// grpc.UnaryInterceptor(rpc.TraceSpanServerInterceptor()),// )//// It reads current trace span from request metadata.func TraceSpanServerInterceptor() grpc.UnaryServerInterceptor {return func(ctx context.Context,req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,) (resp interface{}, err error) {// Extract parent trace span.md, ok := metadata.FromIncomingContext(ctx)if !ok {md = metadata.Pairs()}parentSpanContext, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, metadataTextMap(md),)switch err {case nil:case opentracing.ErrSpanContextNotFound:log.Info(ctx, "Parent span not found, will start new one.")default:log.Errorf("Failed to extract trace span: %v", err)}// Start new trace span.span := opentracing.StartSpan("RPC Server "+info.FullMethod,ext.RPCServerOption(parentSpanContext),)defer span.Finish()ctx = opentracing.ContextWithSpan(ctx, span)// Set request ID for context.if sc, ok := span.Context().(jaeger.SpanContext); ok {ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())}return handler(ctx, req)}}const (binHeaderSuffix = "_bin")// metadataTextMap extends a metadata.MD to be an opentracing textmaptype metadataTextMap metadata.MD// Set is a opentracing.TextMapReader interface that extracts values.func (m metadataTextMap) Set(key, val string) {// gRPC allows for complex binary values to be written.encodedKey, encodedVal := encodeKeyValue(key, val)// The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append// we just override.m[encodedKey] = []string{encodedVal}}// ForeachKey is a opentracing.TextMapReader interface that extracts values.func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {for k, vv := range m {for _, v := range vv {if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {if err = callback(decodedKey, decodedVal); err != nil {return err}} else {return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)}}}return nil}// encodeKeyValue encodes key and value qualified for transmission via gRPC.// note: copy pasted from private values of grpc.metadatafunc encodeKeyValue(k, v string) (string, string) {k = strings.ToLower(k)if strings.HasSuffix(k, binHeaderSuffix) {val := base64.StdEncoding.EncodeToString([]byte(v))v = string(val)}return k, v}
由于官方默认包里只实现了bin,kv和httpHeader三种格式的carrier,因此对于grpc服务需要自己实现carrier。对于http服务实现如下
package middlewareimport ("context""fmt""io""io/ioutil""net/http""github.com/opentracing/opentracing-go""github.com/opentracing/opentracing-go/ext""github.com/uber/jaeger-client-go")const TraceHeader = "Http-TraceHeader"func ClientTraceSpan(ctx context.Context, method, url string, body io.Reader) (resBody []byte, err error) {client := &http.Client{}req, err := http.NewRequest(method, url, body)if err != nil {panic(err)}span, _ := opentracing.StartSpanFromContext(ctx, TraceHeader)defer span.Finish()ext.SpanKindRPCClient.Set(span)ext.HTTPUrl.Set(span, url)ext.HTTPMethod.Set(span, "GET")span.Tracer().Inject(span.Context(),opentracing.HTTPHeaders,opentracing.HTTPHeadersCarrier(req.Header),)ctx = opentracing.ContextWithSpan(ctx, span)req.WithContext(ctx)resp, err := client.Do(req)if err != nil {fmt.Println("请求错误:", err)}return ioutil.ReadAll(resp.Body)}//中间件func ServerTraceSpan(next http.Handler) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {tracer := opentracing.GlobalTracer()// 从ctx获取spanif parent := opentracing.SpanFromContext(r.Context()); parent != nil {parentCtx := parent.Context()// 获取opentracing中的全局tracerif tracer := opentracing.GlobalTracer(); tracer != nil {mySpan := tracer.StartSpan("my info", opentracing.ChildOf(parentCtx))// 由于前面opentracing中的tracer是jaeger的,所以你这里转化为jaeger.SpanContextif sc, ok := mySpan.Context().(jaeger.SpanContext); ok {// 这里,就能获取traceid等信息了,可以放在日志里w.Header().Set(TraceHeader, sc.TraceID().String())}defer mySpan.Finish()}}spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))span := opentracing.StartSpan("RPC Server "+r.RequestURI,ext.RPCServerOption(spanCtx),)defer span.Finish()ctx := opentracing.ContextWithSpan(r.Context(), span)if sc, ok := span.Context().(jaeger.SpanContext); ok {ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())w.Header().Set(TraceHeader, sc.TraceID().String())}next.ServeHTTP(w, r.WithContext(ctx))})}
为了方便测试可以把traceID单独提出来放入httpheader里面,测试下
% curl -vi http://127.0.0.1:8080/request1* Trying 127.0.0.1:8080...* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)> GET /request1 HTTP/1.1> Host: 127.0.0.1:8080> User-Agent: curl/7.79.1> Accept: */*>* Mark bundle as not supporting multiuse< HTTP/1.1 200 OKHTTP/1.1 200 OK< Http-Traceheader: 73f6efd73f361c12Http-Traceheader: 73f6efd73f361c12< Date: Sun, 23 Oct 2022 19:46:28 GMTDate: Sun, 23 Oct 2022 19:46:28 GMT< Content-Length: 0Content-Length: 0<* Connection #0 to host 127.0.0.1 left intact
效果如下

当然上述实现还是很粗糙的,比如为了方便使用默认的contextKey 传递trace信息,没有实现自定义的Extract和Inject方法,导致client和server各打印了一份trace信息。下一期在源码实现分析的时候介绍如何优化。






