暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

GRPC SIXTH : 负载均衡的实现

无涯的计算机笔记 2022-08-21
393

上篇文章学习了 gRPC
名字解析器,今天学习 GRPC
负载均衡相关内容。

负载均衡

为了保证服务高可用和高性能,通常服务端会部署多个节点,这样即使某个节点发生故障,也不会导致整个服务不可用。如何让服务端的各个节点处理请求的数量均衡,就是负载均衡考虑的事情。根据负载均衡实现来区分,可以分为服务端负载均衡和客户端负载均衡。

  • 服务端负载均衡

在客户端和服务端之间搭建一个代理服务,客户端 gRPC
请求发送给代理,由代理将请求转发给服务端。客户端不需要关心服务端的状态,也不需要知道负载均衡实现逻辑,完全由代理处理即可。常见的代理服务有Nginx
Envoy proxy
,如下图所示。

  • 客户端负载均衡

不同于服务端负载均衡,它由客户端实现负载均衡逻辑,客户端需要知道服务端所有节点的状态,根据负载均衡策略,选择最优的节点转发请求。gRPC
就支持客户端负载均衡,gRPC-go
中内置了 pick_first
round_bin
两种算法。pick_first
尝试连接到第一个地址,如果连接失败才尝试连接下一个地址,round_bin
则按照轮询的方式连接服务端。具体使用方法见下面的示例代码。

  • 服务端

    package main


    import (
    "context"
    "fmt"
    "log"
    "net"
    "sync"


    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"


    ecpb "github.com/unendlichkeiten/private_projects/pb"
    )


    var (
    addrs = []string{":9527", ":9528"}
    )


    type ecServer struct {
    addr string
    }


    func (s *ecServer) UnaryEcho(
    ctx context.Context,
    req *ecpb.EchoRequest) (*ecpb.EchoResponse, error) {
    return &ecpb.EchoResponse{
    Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr),
    }, nil
    }
    func (s *ecServer) ServerStreamingEcho(
    *ecpb.EchoRequest,
    ecpb.Echo_ServerStreamingEchoServer) error {
    return status.Errorf(codes.Unimplemented, "not implemented")
    }
    func (s *ecServer) ClientStreamingEcho(ecpb.Echo_ClientStreamingEchoServer) error {
    return status.Errorf(codes.Unimplemented, "not implemented")
    }
    func (s *ecServer) BidirectionalStreamingEcho(
    ecpb.Echo_BidirectionalStreamingEchoServer) error {
    return status.Errorf(codes.Unimplemented, "not implemented")
    }


    func startServer(addr string) {
    lis, err := net.Listen("tcp", addr)
    if err != nil {
    log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    ecpb.RegisterEchoServer(s, &ecServer{addr: addr})
    log.Printf("serving on %s\n", addr)
    if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
    }
    }


    func main() {
    var wg sync.WaitGroup
    for _, addr := range addrs {
    wg.Add(1)
    go func(addr string) {
    defer wg.Done()
    startServer(addr)
    }(addr)
    }
    wg.Wait()
    }
    • 客户端

      package main


      import (
      "context"
      "fmt"
      "log"
      "time"


      ecpb "github.com/unendlichkeiten/private_projects/pb"
      "google.golang.org/grpc"
      "google.golang.org/grpc/resolver"
      )


      const (
      myScheme = "example"
      myServiceName = "lb.example.grpc.io"
      )


      var addrs = []string{"localhost:9527", "localhost:9528"}


      func callUnaryEcho(c ecpb.EchoClient, message string) {
      ctx, cancel := context.WithTimeout(context.Background(), time.Second)
      defer cancel()
      r, err := c.UnaryEcho(ctx, &ecpb.EchoRequest{Message: message})
      if err != nil {
      log.Fatalf("could not greet: %v", err)
      }
      fmt.Println(r.Message)
      }


      func makeRPCs(cc *grpc.ClientConn, n int) {
      hwc := ecpb.NewEchoClient(cc)
      for i := 0; i < n; i++ {
      callUnaryEcho(hwc, "this is examples/load_balancing")
      }
      }


      func main() {
      pickfirstConn, err := grpc.Dial(
      fmt.Sprintf("%s:///%s", myScheme, myServiceName),
      grpc.WithInsecure(),
      // 默认采用 pick_first 原则,不需要添加该选项
      // grpc.WithBalancerName("pick_first"),
      )
      if err != nil {
      log.Fatalf("did not connect: %v", err)
      }
      defer pickfirstConn.Close()


      log.Println("==== hamming pick_first ====")
      makeRPCs(pickfirstConn, 10)


      // Make another ClientConn with round_robin policy.
      roundrobinConn, err := grpc.Dial(
      fmt.Sprintf("%s:///%s", myScheme, myServiceName),
      grpc.WithBalancerName("round_robin"),
      grpc.WithInsecure(),
      )
      if err != nil {
      log.Fatalf("did not connect: %v", err)
      }
      defer roundrobinConn.Close()


      log.Println("==== hamming round_robin ====")
      makeRPCs(roundrobinConn, 10)
      }


      // Name resolver implementation


      type exampleResolverBuilder struct{}


      func (*exampleResolverBuilder) Build(
      target resolver.Target,
      cc resolver.ClientConn,
      opts resolver.BuildOptions) (resolver.Resolver, error) {
      r := &exampleResolver{
      target: target,
      cc: cc,
      addrsStore: map[string][]string{
      myServiceName: addrs, // "lb.example.grpc.io": "localhost:50051", "localhost:50052"
      },
      }
      r.start()
      return r, nil
      }
      func (*exampleResolverBuilder) Scheme() string { return myScheme } // "example"


      type exampleResolver struct {
      target resolver.Target
      cc resolver.ClientConn
      addrsStore map[string][]string
      }


      func (r *exampleResolver) start() {
      addrStrs := r.addrsStore[r.target.Endpoint]
      addrs := make([]resolver.Address, len(addrStrs))
      for i, s := range addrStrs {
      addrs[i] = resolver.Address{Addr: s}
      }
      r.cc.UpdateState(resolver.State{Addresses: addrs})
      }
      func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
      func (*exampleResolver) Close() {}


      func init() {
      resolver.Register(&exampleResolverBuilder{})
      }
      运行结果
      • 客户端

        hamming@192 client % go run main.go
        2022/08/18 21:54:22 ==== hamming pick_first ====
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9527)
        2022/08/18 21:54:22 ==== hamming round_robin ====
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9528)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9528)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9528)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9528)
        this is examples/load_balancing (from :9527)
        this is examples/load_balancing (from :9528)
        • 服务端

          hamming@192 server % go run main.go
          2022/08/18 21:54:08 serving on :9528
          2022/08/18 21:54:08 serving on :9527
          负载均衡源码分析(基于 grpc-go v1.36.0

          • 负载均衡器初始化

            // 客户端通过 grpc.Dial 配置负载均衡器
            roundrobinConn, err := grpc.Dial(
            fmt.Sprintf("%s:///%s", myScheme, myServiceName),
            // 声明负载均衡策略,默认使用的 pick_first 不需要显示声明
            grpc.WithBalancerName("round_robin"),
            grpc.WithInsecure(),
            )

            客户端 grpc.Dial
            方法声明负载均衡策略,调用 grpc.WithBalancerName
            方法返回一个绑定负载均衡器的接口。

              // diaoption.go
              func WithBalancerName(balancerName string) DialOption {
              // balancer.Get 获取 builder
              builder := balancer.Get(balancerName)
              if builder == nil {
              panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
              }
              // 下面的方法在后面的代码中会被用于连接参数设置
              return newFuncDialOption(func(o *dialOptions) {
              o.balancerBuilder = builder
              })
              }

              进入 balancer.Get
              方法,发现负载均衡 builer
              实际上是从负载均衡器 map
              中拿到的,如下:

                // balancer.go
                func Get(name string) Builder {
                if b, ok := m[strings.ToLower(name)]; ok {
                return b
                }
                return nil
                }

                那这个 map
                又是如何初始化的呢?前面我们配置的负载均衡策略是 roundrobin,我们进入这个包内看看。

                  // roundrobin.go
                  // init 函数会把负载均衡 builder 注册到上面的 m 中
                  func init() {
                  balancer.Register(newBuilder())
                  }


                  // newBuilder creates a new roundrobin balancer builder.
                  func newBuilder() balancer.Builder {
                  return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
                  }

                  知道了负载均衡器的 Builder
                  是怎么来的,我们看下实现负载均衡的过程。按照客户端调用过程进行分析,Dial -> DialContext
                  ,在 DialContext
                  中有这么一段代码。

                    // clientconn.go
                    // 创建一个解析器,这里和上篇文章分析过程一样。
                    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
                    if err != nil {
                    return nil, fmt.Errorf("failed to build resolver: %v", err)
                    }


                    // resolve_conn_wrapper.go
                    func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
                    // ...
                    // 创建解析器
                    ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
                    if err != nil {
                    return nil, err
                    }
                    // ...
                    }

                    这里我们使用的是自定义的解析器。

                      // main.go
                      func (*exampleResolverBuilder) Build(
                      target resolver.Target,
                      cc resolver.ClientConn,
                      opts resolver.BuildOptions) (resolver.Resolver, error) {
                      r := &exampleResolver{
                      target: target,
                      cc: cc,
                      addrsStore: map[string][]string{
                      myServiceName: addrs, // "lb.example.grpc.io": "localhost:50051", "localhost:50052"
                      },
                      }
                      // 看这里,负载均衡器实现实现入口
                      r.start()
                      return r, nil
                      }


                      func (r *exampleResolver) start() {
                      addrStrs := r.addrsStore[r.target.Endpoint]
                      addrs := make([]resolver.Address, len(addrStrs))
                      for i, s := range addrStrs {
                      addrs[i] = resolver.Address{Addr: s}
                      }
                      // 负载均衡器的初始化在这里面实现,State 很重要,记住
                      r.cc.UpdateState(resolver.State{Addresses: addrs})
                      }

                      进入 UpdateState
                      方法

                        func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
                        if ccr.done.HasFired() {
                        return
                        }
                        channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
                        if channelz.IsOn() {
                        ccr.addChannelzTraceEvent(s)
                        }
                        ccr.curState = s
                        // 看这里 ...
                        ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
                        }

                        这里没什么特殊处理,进入 updateResolverState
                        查看

                          // clientconn.go
                          func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
                          ...
                          if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
                          cc.maybeApplyDefaultServiceConfig(s.Addresses)
                          // TODO: do we need to apply a failing LB policy if there is no
                          // default, per the error handling design?
                          }
                          ...
                          }


                          func (cc *ClientConn) applyServiceConfigAndBalancer(
                          sc *ServiceConfig,
                          configSelector iresolver.ConfigSelector,
                          addrs []resolver.Address) {
                          ...
                          // 最开始初始化的时候 cc.dopts.balancerBuilder = nil
                          if cc.dopts.balancerBuilder == nil {
                          ...
                          cc.switchBalancer(newBalancerName)
                          } else if cc.balancerWrapper == nil {
                          ...
                          }
                          ...
                          }

                          进入 switchBalancer
                          方法我们看做了什么

                            // clientconn.go
                            func (cc *ClientConn) switchBalancer(name string) {
                            if strings.EqualFold(cc.curBalancerName, name) {
                            return
                            }


                            channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
                            if cc.dopts.balancerBuilder != nil {
                            channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
                            return
                            }
                            if cc.balancerWrapper != nil {
                            cc.balancerWrapper.close()
                            }


                            // 根据负载均衡策略的名字 round_roubin 获取对应的负载均衡器
                            builder := balancer.Get(name)
                            if builder == nil {
                            channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
                            channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
                            builder = newPickfirstBuilder()
                            } else {
                            channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
                            }


                            cc.curBalancerName = builder.Name()
                            // 创建负载均衡绑定器
                            cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
                            }

                            到这里负载均衡初始化就完成了。

                            • 请求调用过程

                            上面通过 Dial
                            方法创建连接后,我们看看客户端是如何请求的,直接看 main.go
                            中调用的 gRPC
                            方法 UnaryEcho

                              // echo.pb.go
                              func (c *echoClient) UnaryEcho(
                              ctx context.Context,
                              in *EchoRequest,
                              opts ...grpc.CallOption) (*EchoResponse, error) {
                              out := new(EchoResponse)
                              // 看这里
                              err := c.cc.Invoke(ctx, "/echo.Echo/UnaryEcho", in, out, opts...)
                              if err != nil {
                              return nil, err
                              }
                              return out, nil
                              }


                              // call.go
                              func (cc *ClientConn) Invoke(
                              ctx context.Context,
                              method string,
                              args, reply interface{},
                              opts ...CallOption) error {
                              // allow interceptor to see all applicable call options, which means those
                              // configured as defaults from dial option as well as per-call options
                              opts = combine(cc.dopts.callOptions, opts)


                              if cc.dopts.unaryInt != nil {
                              return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
                              }
                              return invoke(ctx, method, args, reply, cc, opts...)
                              }


                              // call.go
                              func invoke(
                              ctx context.Context,
                              method string,
                              req, reply interface{},
                              cc *ClientConn,
                              opts ...CallOption) error {
                              // 看这里
                              cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
                              if err != nil {
                              return err
                              }
                              if err := cs.SendMsg(req); err != nil {
                              return err
                              }
                              return cs.RecvMsg(reply)
                              }


                              // stream.go
                              func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
                              ...
                              if err := cs.newAttemptLocked(sh, trInfo); err != nil {
                              cs.finish(err)
                              return nil, err
                              }
                              ...
                              }


                              // stream.go
                              func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
                              ...
                              t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
                              if err != nil {
                              return err
                              }
                              if trInfo != nil {
                              trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
                              }
                              newAttempt.t = t
                              newAttempt.done = done
                              cs.attempt = newAttempt
                              return nil
                              }


                              // clinetconn.go
                              func (cc *ClientConn) getTransport(
                              ctx context.Context,
                              failfast bool,
                              method string) (
                              transport.ClientTransport,
                              func(balancer.DoneInfo), error) {
                              // 最后这里调用了负载均衡器的 pick 方法,我们用的是 round_robin
                              t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
                              Ctx: ctx,
                              FullMethodName: method,
                              })
                              if err != nil {
                              return nil, nil, toRPCErr(err)
                              }
                              return t, done, nil
                              }


                              // roundroubin.go
                              func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
                              p.mu.Lock()
                              sc := p.subConns[p.next]
                              p.next = (p.next + 1) % len(p.subConns)
                              p.mu.Unlock()
                              return balancer.PickResult{SubConn: sc}, nil
                              }

                              从最后的 Pick
                              方法我们确定了请求的最终地址,负载均衡的功能完成。

                              总结

                              整个过程可以概括为,首先客户端初始化名字解析器,在解析名字的过程会加载负载均衡器,请求过程中基于负载均衡器确定每次具体请求的地址,发起 connect
                              请求。

                              参考资料

                              • https://www.bookstack.cn/read/grpc-read/6-grpc%20%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1.md

                              • gRPC Up & Running by Kasun Indrasiri and Danesh Kuruppu

                              • 示例完整代码:https://github.com/unendlichkeiten/grpc_demo

                              关注公众号一起学习——无涯的计算机笔记


                              文章转载自无涯的计算机笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论