暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:grpc 链接池(6)超时的设置和传递

        我们经常看到下面的日志:

    rpc error: code = DeadlineExceeded desc = context deadline exceeded

    我们需要思考两个问题:1,这个错误码来源是哪里?2,超时是如何设置和生效的?

            首先我们看下第一个问题:我们可以发现这段错误文案是golang源码里的错误文案:src/context/context.go

      var DeadlineExceeded error = deadlineExceededError{}
      func (deadlineExceededError) Error() string { return "context deadline exceeded" }

              什么时候会返回这个错误呢?同样是golang源码的context包里



        func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
        if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
        }


        dur := time.Until(d)
        if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
        }


        if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
        c.cancel(true, DeadlineExceeded)
        })
        }


        return c, func() { c.cancel(true, Canceled) }
        }
          func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
          return WithDeadline(parent, time.Now().Add(timeout))
          }

          了解了上面的背景后,我们就可以排查grpc-go的client在何时使用了WithTimeout 

          google.golang.org/grpc@v1.50.1/clientconn.go

            type ClientConn struct {
            dopts dialOptions
            }
              func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
              if cc.dopts.timeout > 0 {
              var cancel context.CancelFunc
              ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
              defer cancel()
              }
              }

              可以看到,在发起连接的时候会有,当server超过超时时间没有响应的时候就会报上面的错误。

                      第二个地方就是我们发送请求的时候,我先会获取一个连接

                func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
                cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
                  func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
                  var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
                  return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
                  }
                      rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
                    func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
                    if mc.Timeout != nil && *mc.Timeout >= 0 {
                    ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
                    } else {
                    ctx, cancel = context.WithCancel(ctx)
                    }

                    可以看到,如果方法配置了超时,在超时时间完成之前,没有响应,也会报错。

                            还有没有其它地方可以配置超时呢?答案是肯定的,Interceptor里我们也可以定义超时。下面就是我们常用的两种设置的超时的方法,分别是连接维度和请求方法维度。

                      clientConn, err := grpc.Dial(serverAddress, grpc.WithTimeout(5 * time.Second), grpc.WithInsecure())
                      if err != nil {
                      log.Println("Dial failed!")
                      return err
                      }
                        c := pb.NewGreeterClient(conn)
                        c.SayHello(context.Background(), &pb.HelloRequest{Name: "world"},
                        WithForcedTimeout(time.Duration(10)*time.Second))

                        那么上述设置是如何生效的?如何传递到服务端的呢?先看下

                        grpc.WithTimeout 源码位于google.golang.org/grpc@v1.50.1/dialoptions.go

                          func WithTimeout(d time.Duration) DialOption {
                          return newFuncDialOption(func(o *dialOptions) {
                          o.timeout = d
                          })
                          }

                          它修改了dialOptionstimeout

                            type dialOptions struct {
                            timeout time.Duration
                            }
                              type DialOption interface {
                              apply(*dialOptions)
                              }

                              dialOptionsClientConn的一个属性

                                type ClientConn struct {
                                dopts dialOptions
                                }

                                我们发起连接的时候用的就是这个属性上的timeout

                                  func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
                                  if cc.dopts.timeout > 0 {
                                  var cancel context.CancelFunc
                                  ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
                                  defer cancel()
                                  }
                                  }

                                          Interceptor是如何让超时生效的呢,逻辑更简单,我们看下它的定义,在发起真正调用之前先调用Interceptor,这个时候设置超时时间:

                                    func TimeoutInterceptor(t time.Duration) grpc.UnaryClientInterceptor {
                                    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
                                    invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
                                    timeout := t
                                    if v, ok := getForcedTimeout(opts); ok {
                                    timeout = v
                                    }
                                    if timeout <= 0 {
                                    return invoker(ctx, method, req, reply, cc, opts...)
                                    }
                                    ctx, cancel := context.WithTimeout(ctx, timeout)
                                    defer cancel()
                                    return invoker(ctx, method, req, reply, cc, opts...)
                                    }
                                    }
                                      func getForcedTimeout(callOptions []grpc.CallOption) (time.Duration, bool) {
                                      for _, opt := range callOptions {
                                      if co, ok := opt.(TimeoutCallOption); ok {
                                      return co.forcedTimeout, true
                                      }
                                      }
                                      return 0, false
                                      }

                                      而超时时间是我们发起调用的时候通过option传递下来的

                                        type TimeoutCallOption struct {
                                        grpc.EmptyCallOption
                                        forcedTimeout time.Duration
                                        }
                                        func WithForcedTimeout(forceTimeout time.Duration) TimeoutCallOption {
                                        return TimeoutCallOption{forcedTimeout: forceTimeout}
                                        }

                                        弄清楚客户端的超时时间是如何设置和生效的以后,服务端怎么保证,客户端超时以后,马上终止当前任务呢?回答这个问题之前,我们看下超时是如何传递的。首先,给出答案:grpc协议将超时时间放置在HTTP Header 请求头里面。客户端设置的超时时间为5秒,http2的header如下

                                          grpc-timeout: 4995884u

                                          其中u表示时间单位为纳秒,4995884u 约等于 5秒。然后服务端接收到该请求后,就可以根据这个时间计算出是否超时,由header 超时设置。

                                                  那么header是何时由client设置的,以及何时由服务端解析的呢?

                                          google.golang.org/grpc@v1.50.1/internal/transport/http2_client.go

                                          发起客户端请求的时候会调用

                                            func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
                                            headerFields, err := t.createHeaderFields(ctx, callHdr)

                                            内部我们可以看到,它从context里面取出超时截止时间,然后写入header "grpc-timeout"里面

                                              func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
                                              if dl, ok := ctx.Deadline(); ok {
                                              // Send out timeout regardless its value. The server can detect timeout context by itself.
                                              // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
                                              timeout := time.Until(dl)
                                              headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
                                              }

                                              解析的过程:

                                              google.golang.org/grpc@v1.50.1/internal/transport/handler_server.go

                                                func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
                                                if v := r.Header.Get("grpc-timeout"); v != "" {
                                                to, err := decodeTimeout(v)
                                                if err != nil {
                                                return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
                                                }
                                                st.timeoutSet = true
                                                st.timeout = to
                                                }

                                                if timeoutSet {
                                                s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
                                                else {
                                                s.ctx, s.cancel = context.WithCancel(t.ctx)
                                                }

                                                可以看到,首先从header里面取出超时时间,然后设置context.WithTimeout

                                                  func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
                                                  if ht.timeoutSet {
                                                  ctx, cancel = context.WithTimeout(ctx, ht.timeout)
                                                  } else {
                                                  ctx, cancel = context.WithCancel(ctx)
                                                  }

                                                  google.golang.org/grpc@v1.50.1/internal/transport/http2_server.go

                                                    func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
                                                    case "grpc-timeout":
                                                    timeoutSet = true
                                                    var err error
                                                    if timeout, err = decodeTimeout(hf.Value); err != nil {
                                                    headerError = true
                                                    }

                                                    文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论