暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang 源码阅读之会议系统ion part II

golang 源码阅读之会议系统ion part I介绍了ion的系统架构和islb的代码,本篇将继续介绍ion的其他几个核心模块:

signal模块

       代码位于cmd/signal/main.go,它首先解析了对应的配置文件,注册信号,然后启动了一个grpc server,配置位于:configs/sig.toml

    [nats]
    [signal.jwt]
    services = ["rtc", "room"]

    main函数的代码如下:

      func main() {
      go func()
      err := http.ListenAndServe(paddr, nil)
      sig, err := signal.NewSignal(conf)
      err = sig.Start()
      defer sig.Close()
      srv := grpc.NewServer(
      grpc.CustomCodec(nrpc.Codec()), nolint:staticcheck
      grpc.UnknownServiceHandler(nproxy.TransparentLongConnectionHandler(sig.Director)))


      s := util.NewWrapperedGRPCWebServer(util.NewWrapperedServerOptions(
      addr, conf.Signal.GRPC.Cert, conf.Signal.GRPC.Key, true), srv)
      if err := s.Serve();

              nrpc包的Codec函数代码位于:

      github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/codec.go

      提供了grpc的序列化和反序列化方法,供nats使用。信号相关代码位于:pkg/node/signal/signal.go

        func NewSignal(conf Config) (*Signal, error) 
        nc, err := util.NewNatsConn(conf.Nats.URL)
        ndc, err := dc.NewClient(nc)

        里面提供了nats 的客户端链接封装

          func (s *Signal) Director(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error)
          claims, err := auth.GetClaim(ctx, authConfig)
          for _, svc := range claims.Services {
          if strings.Contains(fullMethodName, "/"+svc+".") {
          allowed = true
          break
          }
          }
          cli, err := s.NewNatsRPCClient(svc, "*", parameters)

          TransparentLongConnectionHandler代码位于:

          github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/proxy/handler.go

            func TransparentLongConnectionHandler(director StreamDirector) grpc.StreamHandler {
            streamer := &handler{director, false}
            return streamer.handler
            }
              func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error 
              fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
              outgoingCtx, clientConnIf, err := s.director(serverStream.Context(), fullMethodName)
              clientStream, err = nrpcClient.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName)
              clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, gpcClientConn, fullMethodName)
              s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
              c2sErrChan := s.forwardClientToServer(clientStream, serverStream)

              获取所有的方法做代理转发,pkg/util/wrapped.go

                func (s *WrapperedGRPCWebServer) Serve() error 
                grpcweb.WithOriginFunc(s.makeHTTPOriginFunc(allowedOrigins)),
                grpcweb.WithWebsocketOriginFunc(s.makeWebsocketOriginFunc(allowedOrigins)),
                grpcweb.WithWebsocketPingInterval(s.options.WebsocketPingInterval),


                wrappedServer := grpcweb.WrapServer(s.GRPCServer, options...)
                handler := func(resp http.ResponseWriter, req *http.Request) {
                wrappedServer.ServeHTTP(resp, req)
                }
                tls, err := tls.Listen("tcp", addr, config)
                  g.Go(func() error { return s.GRPCServer.Serve(grpcListener) })
                g.Go(func() error { return httpServer.Serve(httpListener) })
                g.Go(m.Serve)

                添加一系列middlewar,然后监听服务。

                room模块

                app-room模块代码位于apps/room/main.go,它注册了room server和singnal server

                  func main() {
                  node := room.New()
                  err := node.Load(confFile)
                        err = node.Start()
                  defer node.Close()

                  具体实现代码位于:ion/apps/room/server/room.go

                    func (r *RoomServer) Start() error {
                    err = r.Node.Start(r.conf.Nats.URL)
                    ndc, err := natsDiscoveryClient.NewClient(r.NatsConn())
                    r.natsConn = r.NatsConn()
                    r.RoomService = *NewRoomService(r.conf.Redis)
                    r.RoomSignalService = *NewRoomSignalService(&r.RoomService)
                    room.RegisterRoomServiceServer(r.Node.ServiceRegistrar(), &r.RoomService)
                    room.RegisterRoomSignalServer(r.Node.ServiceRegistrar(), &r.RoomSignalService)
                    go func() {
                    err := r.Node.KeepAlive(node)
                    go func() {
                        err := r.Node.Watch(proto.ServiceALL)

                    会议相关的代码位于:apps/conference/main.go

                      func main() {
                      err := http.ListenAndServe(paddr, nil)
                      r := runner.New(util.NewWrapperedServerOptions(addr, certFile, keyFile, true))
                      err := r.AddService(
                      runner.ServiceUnit{
                      Service: room.New(),
                      ConfigFile: roomConfFile,
                      },
                      runner.ServiceUnit{
                      Service: sfu.New(),
                      ConfigFile: sfuConfFile,
                      },
                      )

                      其中sfu定义在pkg/node/sfu/sfu.go

                        type SFU struct {
                        ion.Node
                        s *SFUService
                        runner.Service
                        conf Config
                        }
                          type RoomServer struct {
                          // for standalone running
                          runner.Service




                          // grpc room service
                          RoomService
                          RoomSignalService




                          // for distributed node running
                          ion.Node
                          natsConn *nats.Conn
                          natsDiscoveryCli *natsDiscoveryClient.Client




                          // config
                          conf Config
                          }

                          apps/room/server/room.go

                          文件定义了room相关的结构体

                            type RoomServer struct {
                            // for standalone running
                            runner.Service




                            // grpc room service
                            RoomService
                            RoomSignalService




                            // for distributed node running
                            ion.Node
                            natsConn *nats.Conn
                            natsDiscoveryCli *natsDiscoveryClient.Client




                            // config
                            conf Config
                            }

                            sfu模块

                            sfu模块的代码位于cmd/sfu/main.go

                              func main() {}
                              err := conf.Load(confFile)
                              err := http.ListenAndServe(paddr, nil)
                              node := sfu.NewSFU()
                              err := node.Start(conf);
                              defer node.Close()

                              相关的结构体定义在 pkg/node/sfu/sfu.go

                                type SFU struct {
                                ion.Node
                                s *SFUService
                                runner.Service
                                conf Config
                                }
                                  type Config struct {
                                  Global global `mapstructure:"global"`
                                  Log logConf `mapstructure:"log"`
                                  Nats natsConf `mapstructure:"nats"`
                                  isfu.Config
                                  }

                                  它启动了一个rtc server:

                                    func (s *SFU) Start(conf Config) error {
                                    err := s.Node.Start(conf.Nats.URL)
                                    s.s = NewSFUService(conf.Config)
                                    pb.RegisterRTCServer(s.Node.ServiceRegistrar(), s.s)
                                    go func() {
                                    err := s.Node.KeepAlive(node)
                                    go func() {
                                    err := s.Node.Watch(proto.ServiceALL)

                                    其中node定义在pkg/ion/node.go:

                                      type Node struct {
                                      // Node ID
                                      NID string
                                      // Nats Client Conn
                                      nc *nats.Conn
                                      // gRPC Service Registrar
                                      nrpc *nrpc.Server
                                      // Service discovery client
                                      ndc *ndc.Client




                                      nodeLock sync.RWMutex
                                      //neighbor nodes
                                      neighborNodes map[string]discovery.Node




                                      cliLock sync.RWMutex
                                      clis map[string]*nrpc.Client
                                      }
                                        func (n *Node) ServiceRegistrar() grpc.ServiceRegistrar {
                                        return n.nrpc
                                        }

                                        而 sfu service定义在pkg/node/sfu/service.go

                                          type SFUService struct {
                                          rtc.UnimplementedRTCServer
                                          sfu *ion_sfu.SFU
                                          mutex sync.RWMutex
                                          sigs map[string]rtc.RTC_SignalServer
                                          }

                                          pkg/runner/runner.go

                                            type Service interface {
                                            New() Service
                                            ConfigBase() ConfigBase
                                            StartGRPC(registrar grpc.ServiceRegistrar) error
                                            Close()
                                            }

                                            avp模块和auth模块并没有实现

                                            cmd/avp/main.go

                                            apps/auth/main.go

                                            以上就是会议系统服务端的核心代码。


                                            文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论