上篇文章学习了 gRPC
名字解析器,今天学习 GRPC
负载均衡相关内容。
负载均衡
为了保证服务高可用和高性能,通常服务端会部署多个节点,这样即使某个节点发生故障,也不会导致整个服务不可用。如何让服务端的各个节点处理请求的数量均衡,就是负载均衡考虑的事情。根据负载均衡实现来区分,可以分为服务端负载均衡和客户端负载均衡。
服务端负载均衡
在客户端和服务端之间搭建一个代理服务,客户端 gRPC
请求发送给代理,由代理将请求转发给服务端。客户端不需要关心服务端的状态,也不需要知道负载均衡实现逻辑,完全由代理处理即可。常见的代理服务有Nginx
和 Envoy proxy
,如下图所示。

客户端负载均衡
不同于服务端负载均衡,它由客户端实现负载均衡逻辑,客户端需要知道服务端所有节点的状态,根据负载均衡策略,选择最优的节点转发请求。gRPC
就支持客户端负载均衡,gRPC-go
中内置了 pick_first
和 round_bin
两种算法。pick_first
尝试连接到第一个地址,如果连接失败才尝试连接下一个地址,round_bin
则按照轮询的方式连接服务端。具体使用方法见下面的示例代码。

服务端
package mainimport ("context""fmt""log""net""sync""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/status"ecpb "github.com/unendlichkeiten/private_projects/pb")var (addrs = []string{":9527", ":9528"})type ecServer struct {addr string}func (s *ecServer) UnaryEcho(ctx context.Context,req *ecpb.EchoRequest) (*ecpb.EchoResponse, error) {return &ecpb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr),}, nil}func (s *ecServer) ServerStreamingEcho(*ecpb.EchoRequest,ecpb.Echo_ServerStreamingEchoServer) error {return status.Errorf(codes.Unimplemented, "not implemented")}func (s *ecServer) ClientStreamingEcho(ecpb.Echo_ClientStreamingEchoServer) error {return status.Errorf(codes.Unimplemented, "not implemented")}func (s *ecServer) BidirectionalStreamingEcho(ecpb.Echo_BidirectionalStreamingEchoServer) error {return status.Errorf(codes.Unimplemented, "not implemented")}func startServer(addr string) {lis, err := net.Listen("tcp", addr)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()ecpb.RegisterEchoServer(s, &ecServer{addr: addr})log.Printf("serving on %s\n", addr)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func main() {var wg sync.WaitGroupfor _, addr := range addrs {wg.Add(1)go func(addr string) {defer wg.Done()startServer(addr)}(addr)}wg.Wait()}
客户端
package mainimport ("context""fmt""log""time"ecpb "github.com/unendlichkeiten/private_projects/pb""google.golang.org/grpc""google.golang.org/grpc/resolver")const (myScheme = "example"myServiceName = "lb.example.grpc.io")var addrs = []string{"localhost:9527", "localhost:9528"}func callUnaryEcho(c ecpb.EchoClient, message string) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()r, err := c.UnaryEcho(ctx, &ecpb.EchoRequest{Message: message})if err != nil {log.Fatalf("could not greet: %v", err)}fmt.Println(r.Message)}func makeRPCs(cc *grpc.ClientConn, n int) {hwc := ecpb.NewEchoClient(cc)for i := 0; i < n; i++ {callUnaryEcho(hwc, "this is examples/load_balancing")}}func main() {pickfirstConn, err := grpc.Dial(fmt.Sprintf("%s:///%s", myScheme, myServiceName),grpc.WithInsecure(),// 默认采用 pick_first 原则,不需要添加该选项// grpc.WithBalancerName("pick_first"),)if err != nil {log.Fatalf("did not connect: %v", err)}defer pickfirstConn.Close()log.Println("==== hamming pick_first ====")makeRPCs(pickfirstConn, 10)// Make another ClientConn with round_robin policy.roundrobinConn, err := grpc.Dial(fmt.Sprintf("%s:///%s", myScheme, myServiceName),grpc.WithBalancerName("round_robin"),grpc.WithInsecure(),)if err != nil {log.Fatalf("did not connect: %v", err)}defer roundrobinConn.Close()log.Println("==== hamming round_robin ====")makeRPCs(roundrobinConn, 10)}// Name resolver implementationtype exampleResolverBuilder struct{}func (*exampleResolverBuilder) Build(target resolver.Target,cc resolver.ClientConn,opts resolver.BuildOptions) (resolver.Resolver, error) {r := &exampleResolver{target: target,cc: cc,addrsStore: map[string][]string{myServiceName: addrs, // "lb.example.grpc.io": "localhost:50051", "localhost:50052"},}r.start()return r, nil}func (*exampleResolverBuilder) Scheme() string { return myScheme } // "example"type exampleResolver struct {target resolver.Targetcc resolver.ClientConnaddrsStore map[string][]string}func (r *exampleResolver) start() {addrStrs := r.addrsStore[r.target.Endpoint]addrs := make([]resolver.Address, len(addrStrs))for i, s := range addrStrs {addrs[i] = resolver.Address{Addr: s}}r.cc.UpdateState(resolver.State{Addresses: addrs})}func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*exampleResolver) Close() {}func init() {resolver.Register(&exampleResolverBuilder{})}
运行结果
客户端
hamming@192 client % go run main.go2022/08/18 21:54:22 ==== hamming pick_first ====this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9527)2022/08/18 21:54:22 ==== hamming round_robin ====this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9528)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9528)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9528)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9528)this is examples/load_balancing (from :9527)this is examples/load_balancing (from :9528)
服务端
hamming@192 server % go run main.go2022/08/18 21:54:08 serving on :95282022/08/18 21:54:08 serving on :9527
负载均衡源码分析(基于 grpc-go v1.36.0
)

负载均衡器初始化
// 客户端通过 grpc.Dial 配置负载均衡器roundrobinConn, err := grpc.Dial(fmt.Sprintf("%s:///%s", myScheme, myServiceName),// 声明负载均衡策略,默认使用的 pick_first 不需要显示声明grpc.WithBalancerName("round_robin"),grpc.WithInsecure(),)
客户端 grpc.Dial
方法声明负载均衡策略,调用 grpc.WithBalancerName
方法返回一个绑定负载均衡器的接口。
// diaoption.gofunc WithBalancerName(balancerName string) DialOption {// balancer.Get 获取 builderbuilder := balancer.Get(balancerName)if builder == nil {panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))}// 下面的方法在后面的代码中会被用于连接参数设置return newFuncDialOption(func(o *dialOptions) {o.balancerBuilder = builder})}
进入 balancer.Get
方法,发现负载均衡 builer
实际上是从负载均衡器 map
中拿到的,如下:
// balancer.gofunc Get(name string) Builder {if b, ok := m[strings.ToLower(name)]; ok {return b}return nil}
那这个 map
又是如何初始化的呢?前面我们配置的负载均衡策略是 roundrobin,我们进入这个包内看看。
// roundrobin.go// init 函数会把负载均衡 builder 注册到上面的 m 中func init() {balancer.Register(newBuilder())}// newBuilder creates a new roundrobin balancer builder.func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})}
知道了负载均衡器的 Builder
是怎么来的,我们看下实现负载均衡的过程。按照客户端调用过程进行分析,Dial -> DialContext
,在 DialContext
中有这么一段代码。
// clientconn.go// 创建一个解析器,这里和上篇文章分析过程一样。rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}// resolve_conn_wrapper.gofunc newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {// ...// 创建解析器ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)if err != nil {return nil, err}// ...}
这里我们使用的是自定义的解析器。
// main.gofunc (*exampleResolverBuilder) Build(target resolver.Target,cc resolver.ClientConn,opts resolver.BuildOptions) (resolver.Resolver, error) {r := &exampleResolver{target: target,cc: cc,addrsStore: map[string][]string{myServiceName: addrs, // "lb.example.grpc.io": "localhost:50051", "localhost:50052"},}// 看这里,负载均衡器实现实现入口r.start()return r, nil}func (r *exampleResolver) start() {addrStrs := r.addrsStore[r.target.Endpoint]addrs := make([]resolver.Address, len(addrStrs))for i, s := range addrStrs {addrs[i] = resolver.Address{Addr: s}}// 负载均衡器的初始化在这里面实现,State 很重要,记住r.cc.UpdateState(resolver.State{Addresses: addrs})}
进入 UpdateState
方法
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {if ccr.done.HasFired() {return}channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)if channelz.IsOn() {ccr.addChannelzTraceEvent(s)}ccr.curState = s// 看这里 ...ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))}
这里没什么特殊处理,进入 updateResolverState
查看
// clientconn.gofunc (cc *ClientConn) updateResolverState(s resolver.State, err error) error {...if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {cc.maybeApplyDefaultServiceConfig(s.Addresses)// TODO: do we need to apply a failing LB policy if there is no// default, per the error handling design?}...}func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig,configSelector iresolver.ConfigSelector,addrs []resolver.Address) {...// 最开始初始化的时候 cc.dopts.balancerBuilder = nilif cc.dopts.balancerBuilder == nil {...cc.switchBalancer(newBalancerName)} else if cc.balancerWrapper == nil {...}...}
进入 switchBalancer
方法我们看做了什么
// clientconn.gofunc (cc *ClientConn) switchBalancer(name string) {if strings.EqualFold(cc.curBalancerName, name) {return}channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)if cc.dopts.balancerBuilder != nil {channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")return}if cc.balancerWrapper != nil {cc.balancerWrapper.close()}// 根据负载均衡策略的名字 round_roubin 获取对应的负载均衡器builder := balancer.Get(name)if builder == nil {channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)builder = newPickfirstBuilder()} else {channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)}cc.curBalancerName = builder.Name()// 创建负载均衡绑定器cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)}
到这里负载均衡初始化就完成了。
请求调用过程
上面通过 Dial
方法创建连接后,我们看看客户端是如何请求的,直接看 main.go
中调用的 gRPC
方法 UnaryEcho
// echo.pb.gofunc (c *echoClient) UnaryEcho(ctx context.Context,in *EchoRequest,opts ...grpc.CallOption) (*EchoResponse, error) {out := new(EchoResponse)// 看这里err := c.cc.Invoke(ctx, "/echo.Echo/UnaryEcho", in, out, opts...)if err != nil {return nil, err}return out, nil}// call.gofunc (cc *ClientConn) Invoke(ctx context.Context,method string,args, reply interface{},opts ...CallOption) error {// allow interceptor to see all applicable call options, which means those// configured as defaults from dial option as well as per-call optionsopts = combine(cc.dopts.callOptions, opts)if cc.dopts.unaryInt != nil {return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)}return invoke(ctx, method, args, reply, cc, opts...)}// call.gofunc invoke(ctx context.Context,method string,req, reply interface{},cc *ClientConn,opts ...CallOption) error {// 看这里cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply)}// stream.gofunc newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {...if err := cs.newAttemptLocked(sh, trInfo); err != nil {cs.finish(err)return nil, err}...}// stream.gofunc (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {...t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)if err != nil {return err}if trInfo != nil {trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())}newAttempt.t = tnewAttempt.done = donecs.attempt = newAttemptreturn nil}// clinetconn.gofunc (cc *ClientConn) getTransport(ctx context.Context,failfast bool,method string) (transport.ClientTransport,func(balancer.DoneInfo), error) {// 最后这里调用了负载均衡器的 pick 方法,我们用的是 round_robint, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{Ctx: ctx,FullMethodName: method,})if err != nil {return nil, nil, toRPCErr(err)}return t, done, nil}// roundroubin.gofunc (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {p.mu.Lock()sc := p.subConns[p.next]p.next = (p.next + 1) % len(p.subConns)p.mu.Unlock()return balancer.PickResult{SubConn: sc}, nil}
从最后的 Pick
方法我们确定了请求的最终地址,负载均衡的功能完成。
总结
整个过程可以概括为,首先客户端初始化名字解析器,在解析名字的过程会加载负载均衡器,请求过程中基于负载均衡器确定每次具体请求的地址,发起 connect
请求。
参考资料
https://www.bookstack.cn/read/grpc-read/6-grpc%20%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1.md
gRPC Up & Running by Kasun Indrasiri and Danesh Kuruppu
示例完整代码:https://github.com/unendlichkeiten/grpc_demo




