https://github.com/pion/ion是用go实现的会议系统,它基于go实现的消息队列nats 和redis,官网文档https://pionion.github.io/docs/awesome-ion/awesome-ion很简明扼要,本地搭建:
./scripts/deps_inst./scripts/all start./scripts/all status
初始化所有的依赖,然后启动服务。它同时提供了web app端https://github.com/pion/ion-app-web,启动它,就可以看到可以进行视频会议了
cd ion-app-webnpm inpm start
它主要包括下面几个部分
RTC: (Real-Time Communication) RTC system scenes: conference/live-broadcasting/voip..
Signal :( Signal server ) support signal logic
Room : ( Room server) support room logic
ISLB:(Intelligent server load balancing server) support node-discovery/load-bancing..
SFU: (Selective Forwarding Unit ) broadcasting media streams
AVP:( Audio video process server) Video Recoder/Audio Video Mixer/ AI Processor
MCU:( Multipoint Control Unit )Audio Video Mixer and broadcasting
下面首先分析下islb的源码,它的入口位置在cmd/islb/main.go,先看线main函数。
func main(){parse()node := islb.NewISLB()node.Start(conf)defer node.Close()}
解析配置,然后初始化islb模块,启动这个模块,在函数退出时关闭这个模块。它对应的配置文件在configs/islb.toml,主要配置依赖的队列系统nats和redis地址
natsredis
islb定义在pkg/node/islb/islb.go
type ISLB struct {ion.Nodes *islbServerregistry *Registryredis *db.Redis}
其中node定义在pkg/ion/node.go
type Node struct {// Node IDNID string// Nats Client Connnc *nats.Conn// gRPC Service Registrarnrpc *nrpc.Server// Service discovery clientndc *ndc.ClientnodeLock sync.RWMutex//neighbor nodesneighborNodes map[string]discovery.NodecliLock sync.RWMutexclis map[string]*nrpc.Client}
islbServer 定义在pkg/node/islb/server.go
type islbServer struct {islb.UnimplementedISLBServerredis *db.Redisislb *ISLBconf Config//watchers map[string]islb.ISLB_WatchISLBEventServer}
registry定义在pkg/node/islb/registry.go
type Registry struct {dc stringredis *db.Redisreg *registry.Registrymutex sync.Mutexnodes map[string]discovery.Node}
下面重点看下Start函数:
func (i *ISLB) Start(conf Config) errorerr = i.Node.Start(conf.Nats.URL)i.redis = db.NewRedis(conf.Redis)i.registry, err = NewRegistry(conf.Global.Dc, i.Node.NatsConn(), i.redis)i.s = newISLBServer(conf, i, i.redis)pb.RegisterISLBServer(i.Node.ServiceRegistrar(), i.s)go func() {err := i.Node.KeepAlive(node)return n.ndc.KeepAlive(node)go func() {err := i.Node.Watch(proto.ServiceALL)resp, err := n.ndc.Get(service, map[string]interface{}{})for _, node := range resp.Nodes {n.handleNeighborNodes(discovery.NodeUp, &node)}return n.ndc.Watch(context.Background(), service, n.handleNeighborNodes)
它实现了会议系统中服务发现的核心逻辑,首先初始化依赖的nats消息队列和redis,然后注册了ServiceRegistrar,最后启动了两个协程,分别发送保活信号和watch服务的变化,它获取所有的节点,根据节点的状态,来处理neighbor节点的增删。逻辑如下:
func (n *Node) handleNeighborNodes(state discovery.NodeState, node *discovery.Node)state == discovery.NodeUpn.neighborNodes[id] = *nodestate == discovery.NodeDowndelete(n.neighborNodes, id)
如果邻居启动,加入集合,如果邻居挂掉,从集合中删除。
保活协程里面是一个定时器,如果它退出,它会发送删除消息,否则发送更新消息,代码位于pkg/mod/github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/client/client.go
func (c *Client) KeepAlive(node discovery.Node) error {t := time.NewTicker(discovery.DefaultLivecycle)defer func() {c.sendAction(node, discovery.Delete)t.Stop()}()for {case <-t.C:c.sendAction(node, discovery.Update)
接下来看下sendAction是如何包装的:
func (c *Client) sendAction(node discovery.Node, action discovery.Action) errordata, err := util.Marshal(&discovery.Request{Action: action, Node: node,})msg, err := c.nc.Request(subj, data, time.Duration(time.Second*15))
它调用了Request发布一个消息并获取结果。
最后,我们重点看下NewRegistry干了什么,它的代码位置在pkg/node/islb/registry.go
func NewRegistry(dc string, nc *nats.Conn, redis *db.Redis) (*Registry, error)reg, err := registry.NewRegistry(nc, discovery.DefaultExpire)err = reg.Listen(r.handleNodeAction, r.handleGetNodes)
它初始化了一个NewRegistry 然后注册处理事件和获取节点两个handler
func (r *Registry) handleNodeAction(action discovery.Action, node discovery.Node) (bool, error)r.nodes[node.ID()] = node
func (r *Registry) handleGetNodes(service string, params map[string]interface{}) ([]discovery.Node, error)if service == proto.ServiceRTC {for _, key := range r.redis.Keys(mkey) {value := r.redis.Get(key)
从redis中获取所有的节点,然后存储到nodes对象中。其中的Listen函数是nats 客户端的一个入口,代码位于github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/registry/registry.go
func (s *Registry) Listen(handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error),handleGetNodes func(service string, params map[string]interface{}) ([]discovery.Node, error)) error {sub, err := s.nc.Subscribe(subj, func(msg *nats.Msg) {msgCh <- msg})case discovery.Save:handleNodeAction(req.Action, req.Node)case discovery.Update:ok, err := handleNodeAction(req.Action, req.Node);case discovery.Delete:case discovery.Get:handleGetNodes(req.Service, req.Params)s.nc.Publish(msg.Reply, data)go func() error {sub.Unsubscribe()t := time.NewTicker(time.Second * 1)if err := s.checkExpires(nodes, now, handleNodeAction); err != nil {err := handleNatsMsg(msg)
如果有消息到来就处理,处理函数对应了它的两个入参handler,并且检查过期的节点。






