暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang 源码阅读之会议系统ion part IV

        在介绍了后端代码和前端的webapp之后,我们开始介绍前后端结合的部分https://github.com/pion/ion-sdk-js,它主要在proto生成的ts代码基础上做了一些封装,提供完整的接口供webapp使用。

proto
首先看下proto是如何生成的,在ion-sdk-js/Makefile文件中,我们可以看到相关代码:

    proto:
    mkdir -p src/_library
    #sudo npm i -g ts-protoc-gen@0.15.0
    protoc ./ion/proto/ion/ion.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
    protoc ./ion/proto/rtc/rtc.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
    protoc ./ion/apps/room/proto/room.proto -I./ion --plugin=protoc-gen-ts=/usr/local/bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/_library --ts_out=service=grpc-web:./src/_library
    mkdir -p lib
    cp -rf ./src/_library lib

    可以看到它主要是用了3个proto文件

    ./ion/proto/ion/ion.proto

    ./ion/proto/rtc/rtc.proto

    ./ion/apps/room/proto/room.proto

    生成的代码路径在ion-sdk-js/src/_library/proto/路径和ion-sdk-js/src/_library/apps/room/proto/路径下面下面。

    我们先看下room对应的proto文件ion/apps/room/proto/room.proto

      syntax = "proto3";


      option go_package = "github.com/pion/ion/apps/room/proto";


      package room;


      service RoomService {
      // Manager API
      // Room API
      rpc CreateRoom(CreateRoomRequest) returns (CreateRoomReply) {}
      rpc UpdateRoom(UpdateRoomRequest) returns (UpdateRoomReply) {}
      rpc EndRoom(EndRoomRequest) returns (EndRoomReply) {}
      rpc GetRooms(GetRoomsRequest) returns (GetRoomsReply) {}


      // Peer API
      rpc AddPeer(AddPeerRequest) returns (AddPeerReply) {}
      rpc UpdatePeer(UpdatePeerRequest) returns (UpdatePeerReply) {}
      rpc RemovePeer(RemovePeerRequest) returns (RemovePeerReply) {}
      rpc GetPeers(GetPeersRequest) returns (GetPeersReply) {}
      }


      service RoomSignal {
      // Signal
      rpc Signal(stream Request) returns (stream Reply) {}
      }


      enum ErrorType {
      None = 0;
      UnkownError = 1;
      PermissionDenied = 2;
      ServiceUnavailable = 3;
      RoomLocked = 4;
      PasswordRequired = 5;
      RoomAlreadyExist = 6;
      RoomNotExist = 7;
      InvalidParams = 8;
      PeerAlreadyExist = 9;
      PeerNotExist = 10;
      }


      message Error {
      ErrorType code = 1;
      string reason = 2;
      }


      message Request {
      oneof payload {
      JoinRequest join = 1;
      LeaveRequest leave = 2;
      SendMessageRequest sendMessage = 3;
      }
      }


      message Reply {
      oneof payload {
      JoinReply join = 1;
      LeaveReply leave = 2;
      SendMessageReply sendMessage = 3;
      PeerEvent Peer = 4;
      Message message = 5;
      Disconnect disconnect = 6;
      Room room = 7;
      }
      }


      message CreateRoomRequest {
      Room room = 1;
      }


      message CreateRoomReply {
      bool success = 1;
      Error error = 2;
      }


      message DeleteRoomRequest {
      string sid = 1;
      }


      message DeleteRoomReply {
      bool success = 1;
      Error error = 2;
      }


      message JoinRequest {
      Peer peer = 1;
      string password = 2;
      }


      message Room {
      string sid = 1;
      string name = 2;
      bool lock = 3;
      string password = 4;
      string description = 5;
      uint32 maxPeers = 6;
      }


      message JoinReply {
      bool success = 1;
      Error error = 2;
      Role role = 3;
      Room room = 4;
      }


      message LeaveRequest {
      string sid = 1;
      string uid = 2;
      }


      message LeaveReply {
      bool success = 1;
      Error error = 2;
      }


      enum Role {
      Host = 0;
      Guest = 1;
      }


      enum Protocol {
      ProtocolUnknown = 0;
      WebRTC = 1;
      SIP = 2;
      RTMP = 3;
      RTSP = 4;
      }


      message Peer {
      enum Direction {
      INCOMING = 0;
      OUTGOING = 1;
      BILATERAL = 2;
      }
      string sid = 1;
      string uid = 2;
      string displayName = 3;
      bytes extraInfo = 4;
      string destination = 5; rtsp/rtmp/sip url
      Role role = 6;
      Protocol protocol = 7;
      string avatar = 8;
      Direction direction = 9;
      string vendor = 10;
      }


      message AddPeerRequest {
      Peer peer = 1;
      }


      message AddPeerReply {
      bool success = 1;
      Error error = 2;
      }


      message GetPeersRequest { string sid = 1; }


      message GetPeersReply {
      bool success = 1;
      Error error = 2;
      repeated Peer Peers = 3;
      }


      message Message {
      string from = 1; UUID of the sending Peer.
      string to = 2; UUID of the receiving Peer.
      string type = 3; MIME content-type of the message, usually text/plain.
      bytes payload = 4; Payload message contents.
      }


      message SendMessageRequest {
      string sid = 1;
      Message message = 2;
      }


      message SendMessageReply {
      bool success = 1;
      Error error = 2;
      }


      message Disconnect {
      string sid = 1;
      string reason = 2;
      }


      enum PeerState {
      JOIN = 0;
      UPDATE = 1;
      LEAVE = 2;
      }


      message PeerEvent {
      Peer Peer = 1;
      PeerState state = 2;
      }


      message UpdateRoomRequest {
      Room room = 1;
      }


      message UpdateRoomReply {
      bool success = 1;
      Error error = 2;
      }


      message EndRoomRequest {
      string sid = 1;
      string reason = 2;
      bool delete = 3;
      }


      message EndRoomReply {
      bool success = 1;
      Error error = 2;
      }


      message GetRoomsRequest {


      }


      message GetRoomsReply {
      bool success = 1;
      Error error = 2;
      repeated Room rooms = 3;
      }


      message UpdatePeerRequest {
      Peer peer = 1;
      }


      message UpdatePeerReply {
      bool success = 1;
      Error error = 2;
      }


      message RemovePeerRequest {
      string sid = 1;
      string uid = 2;
      }


      message RemovePeerReply {
      bool success = 1;
      Error error = 2;
      }


      可以看到主要包含了房间的增删改查和peer的增删改查。其他两个类似。

      client封装

              查看client封装了哪些接口,我们从webpack.config.js这个文件开始,它包含了4个入口

        ./src/index.ts
        ./src/connector/index.ts
        ./src/signal/grpc-web-impl.ts
        ./src/signal/json-rpc-impl.ts

        其中信号处理封装了grpc和json-rpc两个版本的实现。

        ./src/index.ts文件很简单,它引入了client stream 和 signal并暴露出去。

          import Client from './client';
          import { LocalStream, RemoteStream, Constraints, Layer } from './stream';
          import { Signal, Trickle } from './signal';
          export { Client, LocalStream, RemoteStream, Constraints, Signal, Trickle, Layer };

          在ion-sdk-js/src/client.ts文件里定义了client类,它包含了我们需要的大部份函数

            export default class Client {
            transports?: Transports<Role, Transport>;
            private config: Configuration;
            private signal: Signal;


            ontrack?: (track: MediaStreamTrack, stream: RemoteStream) => void;
            ondatachannel?: (ev: RTCDataChannelEvent) => void;
            onspeaker?: (ev: string[]) => void;
            onerrnegotiate?: (
            role: Role,
            err: Error,
            offer?: RTCSessionDescriptionInit,
            answer?: RTCSessionDescriptionInit,
            ) => void;
            onactivelayer?: (al: ActiveLayer) => void;


            constructor(
            signal: Signal,
            config: Configuration = {
            codec: 'vp8',
            iceServers: [
            {
            urls: ['stun:stun.l.google.com:19302', 'stun:stun1.l.google.com:19302'],
            },
            ],
            },
            ) {
            this.signal = signal;
            this.config = config;


            signal.onnegotiate = this.negotiate.bind(this);
            signal.ontrickle = this.trickle.bind(this);
            }


            async join(sid: string, uid: string) {
            this.transports = {
            [Role.pub]: new Transport(Role.pub, this.signal, this.config),
            [Role.sub]: new Transport(Role.sub, this.signal, this.config),
            };


            this.transports[Role.sub].pc.ontrack = (ev: RTCTrackEvent) => {
            const stream = ev.streams[0];
            const remote = makeRemote(stream, this.transports![Role.sub]);


            if (this.ontrack) {
            this.ontrack(ev.track, remote);
            }
            };


            const apiReady = new Promise<void>((resolve) => {
            this.transports![Role.sub].pc.ondatachannel = (ev: RTCDataChannelEvent) => {
            if (ev.channel.label === API_CHANNEL) {
            this.transports![Role.sub].api = ev.channel;
            this.transports![Role.pub].api = ev.channel;
            ev.channel.onmessage = (e) => {
            try {
            const msg = JSON.parse(e.data);
            this.processChannelMessage(msg);
            } catch (err) {
            /* tslint:disable-next-line:no-console */
            console.error(err);
            }
            };
            resolve();
            return;
            }


            if (this.ondatachannel) {
            this.ondatachannel(ev);
            }
            };
            });


            const offer = await this.transports[Role.pub].pc.createOffer();
            await this.transports[Role.pub].pc.setLocalDescription(offer);
            const answer = await this.signal.join(sid, uid, offer);
            await this.transports[Role.pub].pc.setRemoteDescription(answer);
            this.transports[Role.pub].candidates.forEach((c) => this.transports![Role.pub].pc.addIceCandidate(c));
            this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);


            return apiReady;
            }


            leave() {
            if (this.transports) {
            Object.values(this.transports).forEach((t) => t.pc.close());
            delete this.transports;
            }
            }


            getPubStats(selector?: MediaStreamTrack) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }
            return this.transports[Role.pub].pc.getStats(selector);
            }


            getSubStats(selector?: MediaStreamTrack) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }
            return this.transports[Role.sub].pc.getStats(selector);
            }


            publish(stream: LocalStream, encodingParams?: RTCRtpEncodingParameters[]) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }
            stream.publish(this.transports[Role.pub], encodingParams);
            }


            restartIce() {
            this.renegotiate(true);
            }


            createDataChannel(label: string) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }
            return this.transports[Role.pub].pc.createDataChannel(label);
            }


            close() {
            if (this.transports) {
            Object.values(this.transports).forEach((t) => t.pc.close());
            }
            this.signal.close();
            }


            private trickle({ candidate, target }: Trickle) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }
            if (this.transports[target].pc.remoteDescription) {
            this.transports[target].pc.addIceCandidate(candidate);
            } else {
            this.transports[target].candidates.push(candidate);
            }
            }


            private async negotiate(description: RTCSessionDescriptionInit) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }


            let answer: RTCSessionDescriptionInit | undefined;
            try {
            await this.transports[Role.sub].pc.setRemoteDescription(description);
            this.transports[Role.sub].candidates.forEach((c) => this.transports![Role.sub].pc.addIceCandidate(c));
            this.transports[Role.sub].candidates = [];
            answer = await this.transports[Role.sub].pc.createAnswer();
            await this.transports[Role.sub].pc.setLocalDescription(answer);
            this.signal.answer(answer);
            } catch (err) {
            /* tslint:disable-next-line:no-console */
            console.error(err);
            if (this.onerrnegotiate) this.onerrnegotiate(Role.sub, err, description, answer);
            }
            }


            private onNegotiationNeeded() {
            this.renegotiate(false);
            }


            private async renegotiate(iceRestart: boolean) {
            if (!this.transports) {
            throw Error(ERR_NO_SESSION);
            }


            let offer: RTCSessionDescriptionInit | undefined;
            let answer: RTCSessionDescriptionInit | undefined;
            try {
            offer = await this.transports[Role.pub].pc.createOffer({ iceRestart });
            await this.transports[Role.pub].pc.setLocalDescription(offer);
            answer = await this.signal.offer(offer);
            await this.transports[Role.pub].pc.setRemoteDescription(answer);
            } catch (err) {
            /* tslint:disable-next-line:no-console */
            console.error(err);
            if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);
            }
            }


            private processChannelMessage(msg: any) {
            if (msg.method !== undefined && msg.params !== undefined) {
            switch (msg.method) {
            case 'audioLevels':
            if (this.onspeaker) {
            this.onspeaker(msg.params);
            }
            break;
            case 'activeLayer':
            if (this.onactivelayer) {
            this.onactivelayer(msg.params);
            }
            break;
            default:
            // do nothing
            }
            } else {
            // legacy channel message - payload contains audio levels
            if (this.onspeaker) {
            this.onspeaker(msg);
            }
            }
            }
            }

            在ion-sdk-js/src/connector/index.ts里面包装了后端的各种链接方式,包括ion,room和rtc。

              import { Configuration } from '../client';
              import { LocalStream, RemoteStream, Constraints } from '../stream';
              import { Connector, Service } from './ion';
              import { RTC, Subscription } from './rtc';
              import { Room, JoinResult, Peer, PeerState, PeerEvent, Role, Protocol, Direction, Message, RoomInfo, Disconnect } from './room';


              export {
              Configuration,
              LocalStream,
              RemoteStream,
              Constraints,
              Connector,
              Service,
              RTC,
              Room,
              JoinResult,
              Peer,
              PeerState,
              PeerEvent,
              Role,
              Protocol,
              Direction,
              Message,
              RoomInfo,
              Disconnect,
              Subscription
              };


              在ion-sdk-js/src/signal/index.ts里定义了singal接口,同一文件夹下定义了grpc和json-rpc两个实现

                import { Trickle } from '../client';
                export { Trickle };


                export interface Signal {
                onnegotiate?: (jsep: RTCSessionDescriptionInit) => void;
                ontrickle?: (trickle: Trickle) => void;


                join(sid: string, uid: null | string, offer: RTCSessionDescriptionInit): Promise<RTCSessionDescriptionInit>;
                offer(offer: RTCSessionDescriptionInit): Promise<RTCSessionDescriptionInit>;
                answer(answer: RTCSessionDescriptionInit): void;
                trickle(trickle: Trickle): void;
                close(): void;
                }


                总的来说,整个sdk还是比较轻量的,仅仅包含了对proto文件生成的ts的一个简单的包装。


                文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论