golang 源码阅读之会议系统ion part I介绍了ion的系统架构和islb的代码,本篇将继续介绍ion的其他几个核心模块:
signal模块
代码位于cmd/signal/main.go,它首先解析了对应的配置文件,注册信号,然后启动了一个grpc server,配置位于:configs/sig.toml
[nats][signal.jwt]services = ["rtc", "room"]
main函数的代码如下:
func main() {go func()err := http.ListenAndServe(paddr, nil)sig, err := signal.NewSignal(conf)err = sig.Start()defer sig.Close()srv := grpc.NewServer(grpc.CustomCodec(nrpc.Codec()), nolint:staticcheckgrpc.UnknownServiceHandler(nproxy.TransparentLongConnectionHandler(sig.Director)))s := util.NewWrapperedGRPCWebServer(util.NewWrapperedServerOptions(addr, conf.Signal.GRPC.Cert, conf.Signal.GRPC.Key, true), srv)if err := s.Serve();
nrpc包的Codec函数代码位于:
github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/codec.go
提供了grpc的序列化和反序列化方法,供nats使用。信号相关代码位于:pkg/node/signal/signal.go
func NewSignal(conf Config) (*Signal, error)nc, err := util.NewNatsConn(conf.Nats.URL)ndc, err := dc.NewClient(nc)
里面提供了nats 的客户端链接封装
func (s *Signal) Director(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error)claims, err := auth.GetClaim(ctx, authConfig)for _, svc := range claims.Services {if strings.Contains(fullMethodName, "/"+svc+".") {allowed = truebreak}}cli, err := s.NewNatsRPCClient(svc, "*", parameters)
TransparentLongConnectionHandler代码位于:
github.com/cloudwebrtc/nats-grpc@v1.0.0/pkg/rpc/proxy/handler.go
func TransparentLongConnectionHandler(director StreamDirector) grpc.StreamHandler {streamer := &handler{director, false}return streamer.handler}
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) errorfullMethodName, ok := grpc.MethodFromServerStream(serverStream)outgoingCtx, clientConnIf, err := s.director(serverStream.Context(), fullMethodName)clientStream, err = nrpcClient.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName)clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, gpcClientConn, fullMethodName)s2cErrChan := s.forwardServerToClient(serverStream, clientStream)c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
获取所有的方法做代理转发,pkg/util/wrapped.go
func (s *WrapperedGRPCWebServer) Serve() errorgrpcweb.WithOriginFunc(s.makeHTTPOriginFunc(allowedOrigins)),grpcweb.WithWebsocketOriginFunc(s.makeWebsocketOriginFunc(allowedOrigins)),grpcweb.WithWebsocketPingInterval(s.options.WebsocketPingInterval),wrappedServer := grpcweb.WrapServer(s.GRPCServer, options...)handler := func(resp http.ResponseWriter, req *http.Request) {wrappedServer.ServeHTTP(resp, req)}tls, err := tls.Listen("tcp", addr, config)g.Go(func() error { return s.GRPCServer.Serve(grpcListener) })g.Go(func() error { return httpServer.Serve(httpListener) })g.Go(m.Serve)
添加一系列middlewar,然后监听服务。
room模块
app-room模块代码位于apps/room/main.go,它注册了room server和singnal server
func main() {node := room.New()err := node.Load(confFile)err = node.Start()defer node.Close()
具体实现代码位于:ion/apps/room/server/room.go
func (r *RoomServer) Start() error {err = r.Node.Start(r.conf.Nats.URL)ndc, err := natsDiscoveryClient.NewClient(r.NatsConn())r.natsConn = r.NatsConn()r.RoomService = *NewRoomService(r.conf.Redis)r.RoomSignalService = *NewRoomSignalService(&r.RoomService)room.RegisterRoomServiceServer(r.Node.ServiceRegistrar(), &r.RoomService)room.RegisterRoomSignalServer(r.Node.ServiceRegistrar(), &r.RoomSignalService)go func() {err := r.Node.KeepAlive(node)go func() {err := r.Node.Watch(proto.ServiceALL)
会议相关的代码位于:apps/conference/main.go
func main() {err := http.ListenAndServe(paddr, nil)r := runner.New(util.NewWrapperedServerOptions(addr, certFile, keyFile, true))err := r.AddService(runner.ServiceUnit{Service: room.New(),ConfigFile: roomConfFile,},runner.ServiceUnit{Service: sfu.New(),ConfigFile: sfuConfFile,},)
其中sfu定义在pkg/node/sfu/sfu.go
type SFU struct {ion.Nodes *SFUServicerunner.Serviceconf Config}
type RoomServer struct {// for standalone runningrunner.Service// grpc room serviceRoomServiceRoomSignalService// for distributed node runningion.NodenatsConn *nats.ConnnatsDiscoveryCli *natsDiscoveryClient.Client// configconf Config}
apps/room/server/room.go
文件定义了room相关的结构体
type RoomServer struct {// for standalone runningrunner.Service// grpc room serviceRoomServiceRoomSignalService// for distributed node runningion.NodenatsConn *nats.ConnnatsDiscoveryCli *natsDiscoveryClient.Client// configconf Config}
sfu模块
sfu模块的代码位于cmd/sfu/main.go
func main() {}err := conf.Load(confFile)err := http.ListenAndServe(paddr, nil)node := sfu.NewSFU()err := node.Start(conf);defer node.Close()
相关的结构体定义在 pkg/node/sfu/sfu.go
type SFU struct {ion.Nodes *SFUServicerunner.Serviceconf Config}
type Config struct {Global global `mapstructure:"global"`Log logConf `mapstructure:"log"`Nats natsConf `mapstructure:"nats"`isfu.Config}
它启动了一个rtc server:
func (s *SFU) Start(conf Config) error {err := s.Node.Start(conf.Nats.URL)s.s = NewSFUService(conf.Config)pb.RegisterRTCServer(s.Node.ServiceRegistrar(), s.s)go func() {err := s.Node.KeepAlive(node)go func() {err := s.Node.Watch(proto.ServiceALL)
其中node定义在pkg/ion/node.go:
type Node struct {// Node IDNID string// Nats Client Connnc *nats.Conn// gRPC Service Registrarnrpc *nrpc.Server// Service discovery clientndc *ndc.ClientnodeLock sync.RWMutex//neighbor nodesneighborNodes map[string]discovery.NodecliLock sync.RWMutexclis map[string]*nrpc.Client}
func (n *Node) ServiceRegistrar() grpc.ServiceRegistrar {return n.nrpc}
而 sfu service定义在pkg/node/sfu/service.go
type SFUService struct {rtc.UnimplementedRTCServersfu *ion_sfu.SFUmutex sync.RWMutexsigs map[string]rtc.RTC_SignalServer}
pkg/runner/runner.go
type Service interface {New() ServiceConfigBase() ConfigBaseStartGRPC(registrar grpc.ServiceRegistrar) errorClose()}
avp模块和auth模块并没有实现
cmd/avp/main.go
apps/auth/main.go
以上就是会议系统服务端的核心代码。






