暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码阅读:livego直播系统

在分析源码之前,先搭建一个直播系统:

直播服务器

https://github.com/gwuhaolin/livego

播放站点

https://github.com/Bilibili/flv.js/

推流

https://github.com/obsproject/obs-studio

首先启动直播服务器

    ./livego --flv_dir=./data --level=debug

    1,在启动livego服务后默认会监听以下端口:

    8090端口:用于控制台,通过HTTP请求可查看与控制直播房间的推拉流

    1935端口:用于RTMP推拉流,目前貌似只能通过RTMP方式推流

    7001端口:用于FLV拉流

    7002端口:用于HLS拉流

    2,创建直播房间:

    请求:http://你的服务器地址:8090/control/get?room=房间名字(房间名字你自己自定义)

    成功响应:{“status”:200,“data”:一段与房间名对应的MD5秘钥}

      http://127.0.0.1:8090/control/get?room=xiazemin
      {
      status: 200,
      data: "rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk"
      }

      3,启动推流服务器

      配置文件livego.yaml里可以看到默认的app名字,串流密码输入room名字,这里设置成xiazemin

        server:
        - appname: live

        4,启动网页服务器,拉取视频内容

          <!DOCTYPE html>
          <html lang="en">
          <head>
          <meta charset="UTF-8">
          <meta name="viewport" content="width=device-width, initial-scale=1.0">
          <title>课程直播</title>
          </head>
          <body>
          <script src="flv.min.js"></script>
          <video id="videoElement" autoplay controls loop preload muted></video>
          <script>
          if (flvjs.isSupported()) {
          var videoElement = document.getElementById('videoElement');
          var flvPlayer = flvjs.createPlayer({
          type: 'flv',
          isLive: true,
          url: 'http://127.0.0.1:7001/live/rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk.flv'
          });
          flvPlayer.attachMediaElement(videoElement);
          flvPlayer.load();
          flvPlayer.play();
          document.body.addEventListener('mousedown', function(){
          var vdo = $("video")[0]; //jquery
          vdo.muted = false;
          }, false);
          }
          </script>
          </body>
          </html>


          下载我们依赖的flv播放库

             https://cdn.bootcdn.net/ajax/libs/flv.js/1.6.1/flv.min.js

            启动一个静态服务器,返回上述网页

              package main


              import (
              "fmt"
              "net/http"
              )


              //http://newbt.net/ms/vdisk/show_bbs.php?id=56A46991BF52F1A048DB5F0350D74D01&page=0
              func handler(w http.ResponseWriter, r *http.Request) {
              w.Header().Add("Access-Control-Allow-Origin", "*")
              w.Header().Add("Access-Control-Allow-Methods", "GET, DELETE, HEAD, OPTIONS,POST,PUT,PATCH")
              w.Header().Add("Access-Control-Allow-Headers", "x-requested-with,content-type")
              w.Header().Add("Access-Control-Allow-Credentials", "true")
              http.FileServer(http.Dir("./")).ServeHTTP(w, r)
              return
              }


              //https://stackoverflow.com/questions/69435888/how-to-ensure-cors-response-header-values-are-valid-when-querying-mongodb-in-n


              func main() {
              http.HandleFunc("/", handler)
              if err := http.ListenAndServe(":8089", nil); err != nil {
              fmt.Println(err)
              }
              }


              至此直播服务器搭建完毕。

              livego还提供了其他推拉流管理的接口和统计信息相关的接口

                http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin


                {
                status: 400,
                data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"
                }
                  http://127.0.0.1:8090/stat/livestat


                  {
                  status: 200,
                  data: {
                  publishers: [
                  {
                  key: "live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",
                  url: "rtmp://localhost:1935/live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",
                  stream_id: 1,
                  video_total_bytes: 1940374,
                  video_speed: 1465,
                  audio_total_bytes: 206931,
                  audio_speed: 161
                  }
                  ],
                  players: null
                  }
                  }
                    http://127.0.0.1:8090/control/push?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin


                    {
                    status: 200,
                    data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"
                    }
                      http://127.0.0.1:8090/control/pull?&oper=stop&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin
                      {
                      status: 400,
                      data: "<h1>push url stop rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"
                      }

                              下面我们开始分析源码,整体包括三个部分,推流,拉流,推拉流管理。我从livego的入口开始main.go的main函数:创建了RTMP stream,然后粉笔起了拉流服务hls和httpflv,然后起了控制台服务器,最后起了推流服务器:

                              rtmpServer = rtmp.NewRtmpServer(stream, nil)


                        rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)
                        rtmpServer.Serve(rtmpListen)
                              func startHls() *hls.Server 
                          hlsServer := hls.NewServer()
                          go func() {
                          hlsServer.Serve(hlsListen)
                            func startHTTPFlv(stream *rtmp.RtmpStream) 


                            hdlServer := httpflv.NewServer(stream)
                            go func() {
                            hdlServer.Serve(flvListen)
                              func startAPI(stream *rtmp.RtmpStream) 


                              opServer := api.NewServer(stream, rtmpAddr)
                              go func() {
                              opServer.Serve(opListen)
                                  stream := rtmp.NewRtmpStream()
                                hlsServer = startHls()
                                startHTTPFlv(stream)
                                startAPI(stream)
                                startRtmp(stream, hlsServer)

                                推流是基于rtmp协议的,底层是基于tcp协议的,在循环中监听连接,每个连接起一个协程:

                                protocol/rtmp/rtmp.go

                                  func (s *Server) Serve(listener net.Listener) (err error) 
                                  netconn, err = listener.Accept()
                                  conn := core.NewConn(netconn, 4*1024)
                                  go s.handleConn(conn)
                                    func (s *Server) handleConn(conn *core.Conn) error 
                                    connServer := core.NewConnServer(conn)
                                    appname, name, _ := connServer.GetInfo()
                                    key, err := configure.RoomKeys.GetKey(name)
                                    channel, err := configure.RoomKeys.GetChannel(name)
                                    pushlist, ret := configure.GetStaticPushUrlList(appname);
                                    s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))

                                    本质上做的工作就是把数据从输入channel copy到输出channel,数据存储在sync.Map里面:

                                    protocol/rtmp/stream.go

                                      type RtmpStream struct {
                                      streams *sync.Map //key
                                      }
                                        func (rs *RtmpStream) HandleReader(r av.ReadCloser) 


                                        i, ok := rs.streams.Load(info.Key)
                                        ns := NewStream()
                                        rs.streams.Store(info.Key, ns)
                                        stream.AddReader(r)
                                          func (rs *RtmpStream) HandleWriter(w av.WriteCloser)


                                          item, ok := rs.streams.Load(info.Key)
                                          rs.streams.Store(info.Key, s)
                                          s.AddWriter(w)
                                            func (rs *RtmpStream) CheckAlive()


                                            rs.streams.Delete(key)

                                            HLS(Http Live Streaming传输内容包括两部分:一是M3U8描述文件,二是TS媒体文件。TS媒体文件中的视频必须是H264编码,音频必须是AAC或MP3编码。protocol/hls/hls.go

                                              type Server struct {
                                              listener net.Listener
                                              conns *sync.Map
                                              }
                                                func (server *Server) Serve(listener net.Listener) error


                                                mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
                                                server.handle(w, r)
                                                })
                                                  func (server *Server) handle(w http.ResponseWriter, r *http.Request) 


                                                  path.Base(r.URL.Path) == "crossdomain.xml"
                                                  switch path.Ext(r.URL.Path)
                                                  case ".m3u8":
                                                  key, _ := server.parseM3u8(r.URL.Path)
                                                  conn := server.getConn(key)
                                                  tsCache := conn.GetCacheInc()
                                                  body, err := tsCache.GenM3U8PlayList()
                                                  w.Header().Set("Access-Control-Allow-Origin", "*")
                                                  w.Write(body)
                                                  case ".ts":
                                                  key, _ := server.parseTs(r.URL.Path)
                                                  conn := server.getConn(key)
                                                  tsCache := conn.GetCacheInc()
                                                  item, err := tsCache.GetItem(r.URL.Path)

                                                  我们前面搭建的直播系统是基于http flv 协议的:protocol/httpflv/server.go

                                                    type Server struct {
                                                    handler av.Handler
                                                    }
                                                      func (server *Server) Serve(l net.Listener) error


                                                      mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
                                                      server.handleConn(w, r)
                                                      })
                                                      mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {
                                                      server.getStream(w, r)
                                                      })
                                                        func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) 


                                                        path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")
                                                        paths := strings.SplitN(path, "/", 2)
                                                        msgs := server.getStreams(w, r)
                                                        for _, item := range msgs.Publishers {
                                                        if item.Key == path {
                                                        include = true
                                                        writer := NewFLVWriter(paths[0], paths[1], url, w)
                                                        server.handler.HandleWriter(writer)
                                                          func (server *Server) getStream(w http.ResponseWriter, r *http.Request) 


                                                          msgs := server.getStreams(w, r)
                                                          w.Write(resp)
                                                            func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams


                                                            rtmpStream := server.handler.(*rtmp.RtmpStream)
                                                            msgs := new(streams)
                                                            rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
                                                            msg := stream{key.(string), s.GetReader().Info().UID}
                                                            msgs.Publishers = append(msgs.Publishers, msg)
                                                            rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
                                                            ws := val.(*rtmp.Stream).GetWs()
                                                            ws.Range(func(k, v interface{}) bool {
                                                            msg := stream{key.(string), pw.GetWriter().Info().UID}
                                                            msgs.Players = append(msgs.Players, msg)

                                                            protocol/hls/cache.go

                                                              func (tcCacheItem *TSCacheItem) GenM3U8PlayList() ([]byte, error)
                                                              w.Write(m3u8body.Bytes())
                                                                func (tcCacheItem *TSCacheItem) GetItem(key string) (TSItem, error)


                                                                item, ok := tcCacheItem.lm[key]
                                                                  type TSCacheItem struct {


                                                                  id string
                                                                  num int
                                                                  lock sync.RWMutex
                                                                  ll *list.List
                                                                  lm map[string]TSItem
                                                                  }

                                                                  protocol/httpflv/writer.go

                                                                    type FLVWriter struct {
                                                                    Uid string
                                                                    av.RWBaser
                                                                    app, title, url string
                                                                    buf []byte
                                                                    closed bool
                                                                    closedChan chan struct{}
                                                                    ctx http.ResponseWriter
                                                                    packetQueue chan *av.Packet
                                                                    }

                                                                    av/av.go

                                                                      type Handler interface {
                                                                      HandleReader(ReadCloser)
                                                                      HandleWriter(WriteCloser)
                                                                      }

                                                                      直播服务的管理控制接口实现在protocol/api/api.go:

                                                                        type Server struct {
                                                                        handler av.Handler
                                                                        session map[string]*rtmprelay.RtmpRelay
                                                                        rtmpAddr string
                                                                        }

                                                                        它也实现了一个静态文件服务器,如果有静态文件,可以放在static里面:

                                                                          func (s *Server) Serve(l net.Listener) error 


                                                                             mux.Handle("/statics/", http.StripPrefix("/statics/", http.FileServer(http.Dir("statics"))))
                                                                             mux.HandleFunc("/control/push"func(w http.ResponseWriter, r *http.Request) {
                                                                          s.handlePush(w, r)
                                                                          })
                                                                          mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) {
                                                                          s.handlePull(w, r)
                                                                          })
                                                                          mux.HandleFunc("/control/get", func(w http.ResponseWriter, r *http.Request) {
                                                                          s.handleGet(w, r)
                                                                          })
                                                                          mux.HandleFunc("/control/reset", func(w http.ResponseWriter, r *http.Request) {
                                                                          s.handleReset(w, r)
                                                                          })
                                                                            mux.HandleFunc("/control/delete"func(w http.ResponseWriter, r *http.Request) {
                                                                              s.handleDelete(w, r)
                                                                          })
                                                                            mux.HandleFunc("/stat/livestat"func(w http.ResponseWriter, r *http.Request) {
                                                                              s.GetLiveStatics(w, r)
                                                                          })
                                                                            func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) 


                                                                            //http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456
                                                                            oper == "stop"
                                                                            pushRtmprelay, found := s.session[keyString]
                                                                            pushRtmprelay.Stop()
                                                                            delete(s.session, keyString)
                                                                            else
                                                                            pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
                                                                            err = pushRtmprelay.Start()
                                                                                        s.session[keyString] = pushRtmprelay      
                                                                              func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) 


                                                                              //http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456
                                                                              oper == "stop"
                                                                              pullRtmprelay, found := s.session[keyString]
                                                                              pullRtmprelay.Stop()
                                                                              else
                                                                              pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
                                                                              err = pullRtmprelay.Start()
                                                                              s.session[keyString] = pullRtmprelay
                                                                                func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) 
                                                                                //http://127.0.0.1:8090/control/get?room=ROOM_NAME
                                                                                          msg, err := configure.RoomKeys.GetKey(room)
                                                                                  func (s *Server) handleReset(w http.ResponseWriter, r *http.Request) 
                                                                                  //http://127.0.0.1:8090/control/reset?room=ROOM_NAME
                                                                                  msg, err := configure.RoomKeys.SetKey(room)
                                                                                    func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request)
                                                                                    //http://127.0.0.1:8090/control/delete?room=ROOM_NAME
                                                                                    configure.RoomKeys.DeleteChannel(room)
                                                                                      func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request)
                                                                                      http://127.0.0.1:8090/stat/livestat
                                                                                      rtmpStream := server.handler.(*rtmp.RtmpStream)
                                                                                      rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
                                                                                      v := s.GetReader().(*rtmp.VirReader)
                                                                                      msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,
                                                                                      v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}
                                                                                      rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
                                                                                      v := pw.GetWriter().(*rtmp.VirWriter)
                                                                                      msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,
                                                                                      v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}
                                                                                      roomInfo, exists := (rtmpStream.GetStreams()).Load(room)
                                                                                      http.Serve(l, JWTMiddleware(mux))
                                                                                      ErrorHandler: func(w http.ResponseWriter, r *http.Request, err string) {
                                                                                      res := &Response{
                                                                                      w: w,
                                                                                      Status: 403,
                                                                                      Data: err,
                                                                                      }
                                                                                      res.SendJson()
                                                                                      },

                                                                                      protocol/rtmp/rtmprelay/rtmprelay.go

                                                                                        type RtmpRelay struct {
                                                                                        PlayUrl string
                                                                                        PublishUrl string
                                                                                        cs_chan chan core.ChunkStream
                                                                                        sndctrl_chan chan string
                                                                                        connectPlayClient *core.ConnClient
                                                                                        connectPublishClient *core.ConnClient
                                                                                        startflag bool
                                                                                        }
                                                                                          func (self *RtmpRelay) Start() error 


                                                                                          self.connectPlayClient = core.NewConnClient()
                                                                                          self.connectPublishClient = core.NewConnClient()
                                                                                          err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY)
                                                                                          err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH)
                                                                                          go self.rcvPlayChunkStream()
                                                                                          go self.sendPublishChunkStream()
                                                                                            func (self *RtmpRelay) rcvPlayChunkStream() {
                                                                                            err := self.connectPlayClient.Read(&rc)
                                                                                            r := bytes.NewReader(rc.Data)
                                                                                            vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)
                                                                                            self.cs_chan <- rc
                                                                                              func (self *RtmpRelay) sendPublishChunkStream() 


                                                                                              self.connectPublishClient.Write(rc)
                                                                                              self.connectPublishClient.Close(nil)

                                                                                              configure/channel.go

                                                                                                var RoomKeys = &RoomKeysType{
                                                                                                localCache: cache.New(cache.NoExpiration, 0),
                                                                                                }
                                                                                                  type RoomKeysType struct {


                                                                                                  redisCli *redis.Client
                                                                                                  localCache *cache.Cache
                                                                                                  }
                                                                                                    func (r *RoomKeysType) SetKey(channel string) (key string, err error) 


                                                                                                    key = uid.RandStringRunes(48)
                                                                                                    if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {
                                                                                                    err = r.redisCli.Set(channel, key, 0).Err()
                                                                                                    err = r.redisCli.Set(key, channel, 0).Err()
                                                                                                      func (r *RoomKeysType) DeleteChannel(channel string) bool 


                                                                                                      r.redisCli.Del(channel).Err()
                                                                                                      key, ok := r.localCache.Get(channel)
                                                                                                      r.localCache.Delete(channel)
                                                                                                      r.localCache.Delete(key.(string))


                                                                                                      文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                                      评论