From 6b63cd5b311876784db45904ec4c2a51064eeb52 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 27 Apr 2021 22:02:59 -0700 Subject: [PATCH] Revamp data channels (#32) * support for built-in data channels. active speaker updates via data channel * fixed tests * update to Pion 3.0.27 --- go.mod | 12 +- go.sum | 12 + magefile.go | 7 +- pkg/rtc/errors.go | 2 +- pkg/rtc/helper_test.go | 3 +- pkg/rtc/participant.go | 121 ++++-- pkg/rtc/room.go | 32 +- pkg/rtc/room_test.go | 83 +++-- pkg/rtc/types/interfaces.go | 2 + pkg/rtc/types/protocol_version.go | 6 + pkg/rtc/types/typesfakes/fake_participant.go | 113 ++++++ pkg/rtc/utils.go | 2 +- proto/livekit_models.pb.go | 1 + proto/livekit_rtc.pb.go | 371 +++++++++++++++---- version/version.go | 2 +- 15 files changed, 634 insertions(+), 135 deletions(-) diff --git a/go.mod b/go.mod index d4a02bab0..429d27c7a 100644 --- a/go.mod +++ b/go.mod @@ -11,19 +11,19 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.4.0 + github.com/livekit/protocol v0.5.0 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/pion/ice/v2 v2.1.6 + github.com/pion/ice/v2 v2.1.7 github.com/pion/ion-sfu v1.9.7 github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.6 - github.com/pion/rtp v1.6.2 + github.com/pion/rtp v1.6.5 github.com/pion/sdp/v3 v3.0.4 github.com/pion/stun v0.3.5 github.com/pion/turn/v2 v2.0.5 - github.com/pion/webrtc/v3 v3.0.23 + github.com/pion/webrtc/v3 v3.0.27 github.com/pkg/errors v0.9.1 github.com/prometheus/common v0.19.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect @@ -37,8 +37,8 @@ require ( go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b // indirect golang.org/x/mod v0.4.2 // indirect - golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 // indirect - golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect + golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a // indirect + golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect golang.org/x/tools v0.1.0 // indirect google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b diff --git a/go.sum b/go.sum index 92c280dda..00984d788 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v0.4.0 h1:s+dkTG2gVTD8bsM3QEqPn9JZgO/yukcMWDAA0xkZkDU= github.com/livekit/protocol v0.4.0/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= +github.com/livekit/protocol v0.5.0 h1:dcehG+icwEkHCutjb3erSpaElNa86OLVL0KKZbiajKo= +github.com/livekit/protocol v0.5.0/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -322,6 +324,8 @@ github.com/pion/ice/v2 v2.0.15/go.mod h1:ZIiVGevpgAxF/cXiIVmuIUtCb3Xs4gCzCbXB6+n github.com/pion/ice/v2 v2.1.4/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0= github.com/pion/ice/v2 v2.1.6 h1:PyqyUY9dTAjTrUmCq9qY6Yy3FagZlh5HQIVr27cwF6g= github.com/pion/ice/v2 v2.1.6/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0= +github.com/pion/ice/v2 v2.1.7 h1:FjgDfUNrVYTxQabJrkBX6ld12tvYbgzHenqPh3PJF6E= +github.com/pion/ice/v2 v2.1.7/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0= github.com/pion/interceptor v0.0.9/go.mod h1:dHgEP5dtxOTf21MObuBAjJeAayPxLUAZjerGH8Xr07c= github.com/pion/interceptor v0.0.12 h1:eC1iVneBIAQJEfaNAfDqAncJWhMDAnaXPRCJsltdokE= github.com/pion/interceptor v0.0.12/go.mod h1:qzeuWuD/ZXvPqOnxNcnhWfkCZ2e1kwwslicyyPnhoK4= @@ -340,6 +344,8 @@ github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo= github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI= +github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.12 h1:GsatLufywVruXbZZT1CKg+Jr8ZTkwiPnmUC/oO9+uuY= @@ -366,6 +372,8 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M github.com/pion/webrtc/v3 v3.0.10/go.mod h1:KdEZWLmBnxB2Qj4FtUb9vi1sIpqsHOisI7L6ggQBD0A= github.com/pion/webrtc/v3 v3.0.23 h1:Kp9uSekT03j9tdug3Zj3h5CYXwkvksLomiFM4yihyk4= github.com/pion/webrtc/v3 v3.0.23/go.mod h1:3rjilTvGQsmLzWgIVuHm8F3YbmV1Nff9EP6bwFxGwoA= +github.com/pion/webrtc/v3 v3.0.27 h1:cPQEFNFrRSMT11j9c9aTmXzL3ikKAFPE2kR0ZrQcviw= +github.com/pion/webrtc/v3 v3.0.27/go.mod h1:QpLDmsU5a/a05n230gRtxZRvfHhFzn9ukGUL2x4G5ic= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -574,6 +582,8 @@ golang.org/x/net v0.0.0-20210421230115-4e50805a0758 h1:aEpZnXcAmXkd6AvLb2OPt+EN1 golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 h1:0PC75Fz/kyMGhL0e1QnypqK2kQMqKt9csD1GnMJR+Zk= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a h1:njMmldwFTyDLqonHMagNXKBWptTBeDZOdblgaDsNEGQ= +golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -628,6 +638,8 @@ golang.org/x/sys v0.0.0-20210421221651-33663a62ff08/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 h1:dXfMednGJh/SUUFjTLsWJz3P+TQt9qnR11GgeI3vWKs= +golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/magefile.go b/magefile.go index 56600ff1e..59e365b20 100644 --- a/magefile.go +++ b/magefile.go @@ -161,14 +161,15 @@ func PublishDocker() error { return err } - latestImg := imageName + ":latest" - cmd = exec.Command("docker", "tag", versionImg, latestImg) + idx := strings.LastIndex(version.Version, ".") + minorImg := fmt.Sprintf("%s:v%s", imageName, version.Version[:idx]) + cmd = exec.Command("docker", "tag", versionImg, minorImg) connectStd(cmd) if err := cmd.Run(); err != nil { return err } - cmd = exec.Command("docker", "push", latestImg) + cmd = exec.Command("docker", "push", minorImg) connectStd(cmd) if err := cmd.Run(); err != nil { return err diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index b1e704568..c6092a436 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -8,5 +8,5 @@ var ( ErrMaxParticipantsExceeded = errors.New("room has exceeded its max participants") ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room") ErrUnexpectedOffer = errors.New("expected answer SDP, received offer") - ErrUnexpectedNegotiation = errors.New("client negotiation has not been granted") + ErrDataChannelUnavailable = errors.New("data channel is not available") ) diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index 15e89542a..802e025ab 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -7,11 +7,12 @@ import ( "github.com/livekit/protocol/utils" ) -func newMockParticipant(identity string) *typesfakes.FakeParticipant { +func newMockParticipant(identity string, protocol types.ProtocolVersion) *typesfakes.FakeParticipant { p := &typesfakes.FakeParticipant{} p.IDReturns(utils.NewGuid(utils.ParticipantPrefix)) p.IdentityReturns(identity) p.StateReturns(livekit.ParticipantInfo_JOINED) + p.ProtocolVersionReturns(protocol) p.SetMetadataStub = func(m string) { var f func(participant types.Participant) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7ab1d9741..996fc16fa 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -11,6 +11,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pkg/errors" + "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" @@ -22,8 +23,10 @@ import ( ) const ( - placeholderDataChannel = "_private" - sdBatchSize = 15 + lossyDataChannel = "_lossy" + reliableDataChannel = "_reliable" + privateDataChannel = "_private" + sdBatchSize = 15 ) type ParticipantImpl struct { @@ -39,6 +42,9 @@ type ParticipantImpl struct { state atomic.Value // livekit.ParticipantInfo_State rtcpCh chan []rtcp.Packet protocolVersion types.ProtocolVersion + // reliable and unreliable data channels + reliableDC *webrtc.DataChannel + lossyDC *webrtc.DataChannel // when first connected connectedAt time.Time @@ -64,6 +70,7 @@ type ParticipantImpl struct { onTrackUpdated func(types.Participant, types.PublishedTrack) onStateChange func(p types.Participant, oldState livekit.ParticipantInfo_State) onMetadataUpdate func(types.Participant) + onDataPacket func(types.Participant, *livekit.DataPacket) onClose func(types.Participant) } @@ -110,9 +117,9 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, }) p.publisher.pc.OnICEConnectionStateChange(p.handlePublisherICEStateChange) - p.publisher.pc.OnTrack(p.onMediaTrack) - p.subscriber.pc.OnDataChannel(p.onDataChannel) + p.publisher.pc.OnDataChannel(p.onDataChannel) + p.subscriber.OnOffer(p.onOffer) return p, nil @@ -206,6 +213,10 @@ func (p *ParticipantImpl) OnMetadataUpdate(callback func(types.Participant)) { p.onMetadataUpdate = callback } +func (p *ParticipantImpl) OnDataPacket(callback func(types.Participant, *livekit.DataPacket)) { + p.onDataPacket = callback +} + func (p *ParticipantImpl) OnClose(callback func(types.Participant)) { p.onClose = callback } @@ -285,8 +296,8 @@ func (p *ParticipantImpl) GetPublishedTracks() []types.PublishedTrack { return tracks } -// handles a client answer response, with subscriber PC, server initiates the offer -// and client answers +// HandleAnswer handles a client answer response, with subscriber PC, server initiates the +// offer and client answers func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { if sdp.Type != webrtc.SDPTypeAnswer { return ErrUnexpectedOffer @@ -454,6 +465,30 @@ func (p *ParticipantImpl) SendActiveSpeakers(speakers []*livekit.SpeakerInfo) er }) } +func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket) error { + if p.State() != livekit.ParticipantInfo_ACTIVE { + return ErrDataChannelUnavailable + } + + data, err := proto.Marshal(dp) + if err != nil { + return err + } + if dp.Kind == livekit.DataPacket_RELIABLE { + if p.reliableDC == nil { + return ErrDataChannelUnavailable + } + logger.Debugw("sending reliable packet", "participant", p.Identity()) + return p.reliableDC.Send(data) + } else { + if p.lossyDC == nil { + return ErrDataChannelUnavailable + } + logger.Debugw("sending lossy packet", "participant", p.Identity()) + return p.lossyDC.Send(data) + } +} + func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) { p.lock.RLock() defer p.lock.RUnlock() @@ -659,27 +694,39 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) { - if dc.Label() == placeholderDataChannel { - return + switch dc.Label() { + case reliableDataChannel: + p.reliableDC = dc + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + p.handleDataMessage(livekit.DataPacket_RELIABLE, msg.Data) + }) + case lossyDataChannel: + p.lossyDC = dc + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + p.handleDataMessage(livekit.DataPacket_LOSSY, msg.Data) + }) + case privateDataChannel: + // ignore + default: + logger.Debugw("dataChannel added", "participant", p.Identity(), "label", dc.Label()) + + if !p.CanPublish() { + logger.Warnw("no permission to publish dataTrack", + "participant", p.Identity()) + return + } + + // data channels have numeric ids, so we use its label to identify + ti := p.getPendingTrack(dc.Label(), livekit.TrackType_DATA, true) + if ti == nil { + return + } + + dt := NewDataTrack(ti.Sid, p.id, dc) + dt.name = ti.Name + + p.handleTrackPublished(dt) } - logger.Debugw("dataChannel added", "participant", p.Identity(), "label", dc.Label()) - - if !p.CanPublish() { - logger.Warnw("no permission to publish dataTrack", - "participant", p.Identity()) - return - } - - // data channels have numeric ids, so we use its label to identify - ti := p.getPendingTrack(dc.Label(), livekit.TrackType_DATA, true) - if ti == nil { - return - } - - dt := NewDataTrack(ti.Sid, p.id, dc) - dt.name = ti.Name - - p.handleTrackPublished(dt) } func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackType, deleteAfter bool) *livekit.TrackInfo { @@ -708,6 +755,28 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp return ti } +func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data []byte) { + dp := livekit.DataPacket{} + if err := proto.Unmarshal(data, &dp); err != nil { + logger.Warnw("could not parse data packet", "error", err) + return + } + + // trust the channel that it came in as the source of truth + dp.Kind = kind + + // only forward on user payloads + switch payload := dp.Value.(type) { + case *livekit.DataPacket_User: + if p.onDataPacket != nil { + payload.User.ParticipantSid = p.id + p.onDataPacket(p, &dp) + } + default: + logger.Warnw("received unsupported data packet", "payload", payload) + } +} + func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) { // fill in p.lock.Lock() diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 47d3e46f8..5f1fb02fc 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -153,6 +153,7 @@ func (r *Room) Join(participant types.Participant) error { }) participant.OnTrackUpdated(r.onTrackUpdated) participant.OnMetadataUpdate(r.onParticipantMetadataUpdate) + participant.OnDataPacket(r.onDataPacket) logger.Infow("new participant joined", "id", participant.ID(), "identity", participant.Identity(), @@ -192,9 +193,11 @@ func (r *Room) RemoveParticipant(identity string) { p.OnTrackUpdated(nil) p.OnTrackPublished(nil) p.OnStateChange(nil) + p.OnMetadataUpdate(nil) + p.OnDataPacket(nil) // close participant as well - p.Close() + _ = p.Close() if len(r.participants) == 0 { r.leftAt.Store(time.Now().Unix()) @@ -328,6 +331,18 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) { } } +func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) { + for _, op := range r.GetParticipants() { + if op.State() != livekit.ParticipantInfo_ACTIVE { + continue + } + if op.ID() == source.ID() { + continue + } + _ = op.SendDataPacket(dp) + } +} + func (r *Room) subscribeToExistingTracks(p types.Participant) { tracksAdded := 0 for _, op := range r.GetParticipants() { @@ -369,8 +384,21 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) { } func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { + dp := &livekit.DataPacket{ + Kind: livekit.DataPacket_LOSSY, + Value: &livekit.DataPacket_Speaker{ + Speaker: &livekit.ActiveSpeakerUpdate{ + Speakers: speakers, + }, + }, + } + for _, p := range r.GetParticipants() { - _ = p.SendActiveSpeakers(speakers) + if p.ProtocolVersion().HandlesDataPackets() { + _ = p.SendDataPacket(dp) + } else { + _ = p.SendActiveSpeakers(speakers) + } } } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index bd35652ec..d4b471d92 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -27,20 +27,20 @@ func init() { func TestJoinedState(t *testing.T) { t.Run("new room should return joinedAt 0", func(t *testing.T) { - rm := newRoomWithParticipants(t, 0) + rm := newRoomWithParticipants(t, testRoomOpts{num: 0}) assert.Equal(t, int64(0), rm.FirstJoinedAt()) assert.Equal(t, int64(0), rm.LastLeftAt()) }) t.Run("should be current time when a participant joins", func(t *testing.T) { s := time.Now().Unix() - rm := newRoomWithParticipants(t, 1) + rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) assert.Equal(t, s, rm.FirstJoinedAt()) assert.Equal(t, int64(0), rm.LastLeftAt()) }) t.Run("should be set when a participant leaves", func(t *testing.T) { - rm := newRoomWithParticipants(t, 1) + rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) p0 := rm.GetParticipants()[0] s := time.Now().Unix() rm.RemoveParticipant(p0.Identity()) @@ -48,7 +48,7 @@ func TestJoinedState(t *testing.T) { }) t.Run("LastLeftAt should not be set when there are still participants in the room", func(t *testing.T) { - rm := newRoomWithParticipants(t, 2) + rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) p0 := rm.GetParticipants()[0] rm.RemoveParticipant(p0.Identity()) assert.EqualValues(t, 0, rm.LastLeftAt()) @@ -57,8 +57,8 @@ func TestJoinedState(t *testing.T) { func TestRoomJoin(t *testing.T) { t.Run("joining returns existing participant data", func(t *testing.T) { - rm := newRoomWithParticipants(t, numParticipants) - pNew := newMockParticipant("new") + rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) + pNew := newMockParticipant("new", types.DefaultProtocol) rm.Join(pNew) @@ -72,8 +72,8 @@ func TestRoomJoin(t *testing.T) { t.Run("subscribe to existing channels upon join", func(t *testing.T) { numExisting := 3 - rm := newRoomWithParticipants(t, numExisting) - p := newMockParticipant("new") + rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting}) + p := newMockParticipant("new", types.DefaultProtocol) err := rm.Join(p) assert.NoError(t, err) @@ -96,7 +96,7 @@ func TestRoomJoin(t *testing.T) { }) t.Run("participant state change is broadcasted to others", func(t *testing.T) { - rm := newRoomWithParticipants(t, numParticipants) + rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) var changedParticipant types.Participant rm.OnParticipantChanged(func(participant types.Participant) { changedParticipant = participant @@ -125,9 +125,9 @@ func TestRoomJoin(t *testing.T) { }) t.Run("cannot exceed max participants", func(t *testing.T) { - rm := newRoomWithParticipants(t, 1) + rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) rm.MaxParticipants = 1 - p := newMockParticipant("second") + p := newMockParticipant("second", types.ProtocolVersion(0)) err := rm.Join(p) assert.Equal(t, rtc.ErrMaxParticipantsExceeded, err) @@ -166,7 +166,7 @@ func TestParticipantUpdate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - rm := newRoomWithParticipants(t, 3) + rm := newRoomWithParticipants(t, testRoomOpts{num: 3}) // remember how many times send has been called for each callCounts := make(map[string]int) for _, p := range rm.GetParticipants() { @@ -192,7 +192,7 @@ func TestParticipantUpdate(t *testing.T) { func TestRoomClosure(t *testing.T) { t.Run("room closes after participant leaves", func(t *testing.T) { - rm := newRoomWithParticipants(t, 1) + rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) isClosed := false rm.OnClose(func() { isClosed = true @@ -212,7 +212,7 @@ func TestRoomClosure(t *testing.T) { }) t.Run("room does not close before empty timeout", func(t *testing.T) { - rm := newRoomWithParticipants(t, 0) + rm := newRoomWithParticipants(t, testRoomOpts{num: 0}) isClosed := false rm.OnClose(func() { isClosed = true @@ -223,7 +223,7 @@ func TestRoomClosure(t *testing.T) { }) t.Run("room closes after empty timeout", func(t *testing.T) { - rm := newRoomWithParticipants(t, 0) + rm := newRoomWithParticipants(t, testRoomOpts{num: 0}) isClosed := false rm.OnClose(func() { isClosed = true @@ -238,7 +238,7 @@ func TestRoomClosure(t *testing.T) { func TestNewTrack(t *testing.T) { t.Run("new track should be added to ready participants", func(t *testing.T) { - rm := newRoomWithParticipants(t, 3) + rm := newRoomWithParticipants(t, testRoomOpts{num: 3}) participants := rm.GetParticipants() p0 := participants[0].(*typesfakes.FakeParticipant) p0.StateReturns(livekit.ParticipantInfo_JOINED) @@ -260,19 +260,33 @@ func TestNewTrack(t *testing.T) { func TestActiveSpeakers(t *testing.T) { t.Parallel() + getActiveSpeakerUpdates := func(p *typesfakes.FakeParticipant) []*livekit.ActiveSpeakerUpdate { + var updates []*livekit.ActiveSpeakerUpdate + numCalls := p.SendDataPacketCallCount() + for i := 0; i < numCalls; i++ { + dp := p.SendDataPacketArgsForCall(i) + switch val := dp.Value.(type) { + case *livekit.DataPacket_Speaker: + updates = append(updates, val.Speaker) + } + } + return updates + } + audioUpdateDuration := (audioUpdateInterval + 2) * time.Millisecond - t.Run("participant should not be getting audio updates", func(t *testing.T) { - rm := newRoomWithParticipants(t, 1) + t.Run("participant should not be getting audio updates (protocol 2)", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 1, protocol: types.DefaultProtocol}) p := rm.GetParticipants()[0].(*typesfakes.FakeParticipant) assert.Empty(t, rm.GetActiveSpeakers()) time.Sleep(audioUpdateDuration) - assert.Zero(t, p.SendActiveSpeakersCallCount()) + updates := getActiveSpeakerUpdates(p) + assert.Empty(t, updates) }) - t.Run("speakers should be sorted by loudness", func(t *testing.T) { - rm := newRoomWithParticipants(t, 2) + t.Run("speakers should be sorted by loudness (protocol 0)", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeParticipant) p2 := participants[1].(*typesfakes.FakeParticipant) @@ -285,8 +299,8 @@ func TestActiveSpeakers(t *testing.T) { assert.Equal(t, p2.ID(), speakers[1].Sid) }) - t.Run("participants are getting audio updates", func(t *testing.T) { - rm := newRoomWithParticipants(t, 2) + t.Run("participants are getting audio updates (protocol 2)", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol}) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeParticipant) time.Sleep(time.Millisecond) // let the first update cycle run @@ -301,25 +315,33 @@ func TestActiveSpeakers(t *testing.T) { // everyone should've received updates for _, op := range participants { op := op.(*typesfakes.FakeParticipant) - require.Equal(t, 1, op.SendActiveSpeakersCallCount()) + updates := getActiveSpeakerUpdates(op) + require.Len(t, updates, 1) } // after another cycle, we are not getting any new updates since unchanged time.Sleep(audioUpdateDuration) for _, op := range participants { op := op.(*typesfakes.FakeParticipant) - require.Equal(t, 1, op.SendActiveSpeakersCallCount()) + updates := getActiveSpeakerUpdates(op) + require.Len(t, updates, 1) } // no longer speaking, send update with empty items p.GetAudioLevelReturns(127, false) time.Sleep(audioUpdateDuration) - require.Equal(t, 2, p.SendActiveSpeakersCallCount()) - require.Empty(t, p.SendActiveSpeakersArgsForCall(1)) + updates := getActiveSpeakerUpdates(p) + require.Len(t, updates, 2) + require.Empty(t, updates[1].Speakers) }) } -func newRoomWithParticipants(t *testing.T, num int) *rtc.Room { +type testRoomOpts struct { + num int + protocol types.ProtocolVersion +} + +func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { rm := rtc.NewRoom( &livekit.Room{Name: "room"}, rtc.WebRTCConfig{}, @@ -332,10 +354,11 @@ func newRoomWithParticipants(t *testing.T, num int) *rtc.Room { }, audioUpdateInterval, ) - for i := 0; i < num; i++ { + for i := 0; i < opts.num; i++ { identity := fmt.Sprintf("p%d", i) - participant := newMockParticipant(identity) + participant := newMockParticipant(identity, opts.protocol) err := rm.Join(participant) + participant.StateReturns(livekit.ParticipantInfo_ACTIVE) assert.NoError(t, err) } return rm diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 15feb6d74..204e6c425 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -49,6 +49,7 @@ type Participant interface { SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error SendParticipantUpdate(participants []*livekit.ParticipantInfo) error SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error + SendDataPacket(packet *livekit.DataPacket) error SetTrackMuted(trackId string, muted bool) GetAudioLevel() (level uint8, noisy bool) @@ -66,6 +67,7 @@ type Participant interface { // OnTrackUpdated - one of its publishedTracks changed in status OnTrackUpdated(callback func(Participant, PublishedTrack)) OnMetadataUpdate(callback func(Participant)) + OnDataPacket(callback func(Participant, *livekit.DataPacket)) OnClose(func(Participant)) // package methods diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index 5798c480a..6611a4c55 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -2,6 +2,12 @@ package types type ProtocolVersion int +const DefaultProtocol = 2 + func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 } + +func (v ProtocolVersion) HandlesDataPackets() bool { + return v > 1 +} diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index cb6f69ec6..363bc5f10 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -206,6 +206,11 @@ type FakeParticipant struct { onCloseArgsForCall []struct { arg1 func(types.Participant) } + OnDataPacketStub func(func(types.Participant, *livekit.DataPacket)) + onDataPacketMutex sync.RWMutex + onDataPacketArgsForCall []struct { + arg1 func(types.Participant, *livekit.DataPacket) + } OnMetadataUpdateStub func(func(types.Participant)) onMetadataUpdateMutex sync.RWMutex onMetadataUpdateArgsForCall []struct { @@ -268,6 +273,17 @@ type FakeParticipant struct { sendActiveSpeakersReturnsOnCall map[int]struct { result1 error } + SendDataPacketStub func(*livekit.DataPacket) error + sendDataPacketMutex sync.RWMutex + sendDataPacketArgsForCall []struct { + arg1 *livekit.DataPacket + } + sendDataPacketReturns struct { + result1 error + } + sendDataPacketReturnsOnCall map[int]struct { + result1 error + } SendJoinResponseStub func(*livekit.Room, []types.Participant, []*livekit.ICEServer) error sendJoinResponseMutex sync.RWMutex sendJoinResponseArgsForCall []struct { @@ -1374,6 +1390,38 @@ func (fake *FakeParticipant) OnCloseArgsForCall(i int) func(types.Participant) { return argsForCall.arg1 } +func (fake *FakeParticipant) OnDataPacket(arg1 func(types.Participant, *livekit.DataPacket)) { + fake.onDataPacketMutex.Lock() + fake.onDataPacketArgsForCall = append(fake.onDataPacketArgsForCall, struct { + arg1 func(types.Participant, *livekit.DataPacket) + }{arg1}) + stub := fake.OnDataPacketStub + fake.recordInvocation("OnDataPacket", []interface{}{arg1}) + fake.onDataPacketMutex.Unlock() + if stub != nil { + fake.OnDataPacketStub(arg1) + } +} + +func (fake *FakeParticipant) OnDataPacketCallCount() int { + fake.onDataPacketMutex.RLock() + defer fake.onDataPacketMutex.RUnlock() + return len(fake.onDataPacketArgsForCall) +} + +func (fake *FakeParticipant) OnDataPacketCalls(stub func(func(types.Participant, *livekit.DataPacket))) { + fake.onDataPacketMutex.Lock() + defer fake.onDataPacketMutex.Unlock() + fake.OnDataPacketStub = stub +} + +func (fake *FakeParticipant) OnDataPacketArgsForCall(i int) func(types.Participant, *livekit.DataPacket) { + fake.onDataPacketMutex.RLock() + defer fake.onDataPacketMutex.RUnlock() + argsForCall := fake.onDataPacketArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeParticipant) OnMetadataUpdate(arg1 func(types.Participant)) { fake.onMetadataUpdateMutex.Lock() fake.onMetadataUpdateArgsForCall = append(fake.onMetadataUpdateArgsForCall, struct { @@ -1739,6 +1787,67 @@ func (fake *FakeParticipant) SendActiveSpeakersReturnsOnCall(i int, result1 erro }{result1} } +func (fake *FakeParticipant) SendDataPacket(arg1 *livekit.DataPacket) error { + fake.sendDataPacketMutex.Lock() + ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)] + fake.sendDataPacketArgsForCall = append(fake.sendDataPacketArgsForCall, struct { + arg1 *livekit.DataPacket + }{arg1}) + stub := fake.SendDataPacketStub + fakeReturns := fake.sendDataPacketReturns + fake.recordInvocation("SendDataPacket", []interface{}{arg1}) + fake.sendDataPacketMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) SendDataPacketCallCount() int { + fake.sendDataPacketMutex.RLock() + defer fake.sendDataPacketMutex.RUnlock() + return len(fake.sendDataPacketArgsForCall) +} + +func (fake *FakeParticipant) SendDataPacketCalls(stub func(*livekit.DataPacket) error) { + fake.sendDataPacketMutex.Lock() + defer fake.sendDataPacketMutex.Unlock() + fake.SendDataPacketStub = stub +} + +func (fake *FakeParticipant) SendDataPacketArgsForCall(i int) *livekit.DataPacket { + fake.sendDataPacketMutex.RLock() + defer fake.sendDataPacketMutex.RUnlock() + argsForCall := fake.sendDataPacketArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeParticipant) SendDataPacketReturns(result1 error) { + fake.sendDataPacketMutex.Lock() + defer fake.sendDataPacketMutex.Unlock() + fake.SendDataPacketStub = nil + fake.sendDataPacketReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeParticipant) SendDataPacketReturnsOnCall(i int, result1 error) { + fake.sendDataPacketMutex.Lock() + defer fake.sendDataPacketMutex.Unlock() + fake.SendDataPacketStub = nil + if fake.sendDataPacketReturnsOnCall == nil { + fake.sendDataPacketReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendDataPacketReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []types.Participant, arg3 []*livekit.ICEServer) error { var arg2Copy []types.Participant if arg2 != nil { @@ -2286,6 +2395,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.negotiateMutex.RUnlock() fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() + fake.onDataPacketMutex.RLock() + defer fake.onDataPacketMutex.RUnlock() fake.onMetadataUpdateMutex.RLock() defer fake.onMetadataUpdateMutex.RUnlock() fake.onStateChangeMutex.RLock() @@ -2304,6 +2415,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.removeSubscriberMutex.RUnlock() fake.sendActiveSpeakersMutex.RLock() defer fake.sendActiveSpeakersMutex.RUnlock() + fake.sendDataPacketMutex.RLock() + defer fake.sendDataPacketMutex.RUnlock() fake.sendJoinResponseMutex.RLock() defer fake.sendJoinResponseMutex.RUnlock() fake.sendParticipantUpdateMutex.RLock() diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 798ba9ce8..b5d195e2b 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -17,7 +17,7 @@ const ( trackIdSeparator = "|" ) -func UnpackStreamID(packed string) (peerId string, trackId string) { +func UnpackStreamID(packed string) (participantId string, trackId string) { parts := strings.Split(packed, trackIdSeparator) if len(parts) > 1 { return parts[0], packed[len(parts[0])+1:] diff --git a/proto/livekit_models.pb.go b/proto/livekit_models.pb.go index b9172b0bf..5cbd3c246 100644 --- a/proto/livekit_models.pb.go +++ b/proto/livekit_models.pb.go @@ -371,6 +371,7 @@ func (x *TrackInfo) GetMuted() bool { return false } +// old DataTrack message type DataMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/proto/livekit_rtc.pb.go b/proto/livekit_rtc.pb.go index ee1c4e265..827e0310b 100644 --- a/proto/livekit_rtc.pb.go +++ b/proto/livekit_rtc.pb.go @@ -115,6 +115,52 @@ func (VideoQuality) EnumDescriptor() ([]byte, []int) { return file_livekit_rtc_proto_rawDescGZIP(), []int{1} } +type DataPacket_Kind int32 + +const ( + DataPacket_RELIABLE DataPacket_Kind = 0 + DataPacket_LOSSY DataPacket_Kind = 1 +) + +// Enum value maps for DataPacket_Kind. +var ( + DataPacket_Kind_name = map[int32]string{ + 0: "RELIABLE", + 1: "LOSSY", + } + DataPacket_Kind_value = map[string]int32{ + "RELIABLE": 0, + "LOSSY": 1, + } +) + +func (x DataPacket_Kind) Enum() *DataPacket_Kind { + p := new(DataPacket_Kind) + *p = x + return p +} + +func (x DataPacket_Kind) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (DataPacket_Kind) Descriptor() protoreflect.EnumDescriptor { + return file_livekit_rtc_proto_enumTypes[2].Descriptor() +} + +func (DataPacket_Kind) Type() protoreflect.EnumType { + return &file_livekit_rtc_proto_enumTypes[2] +} + +func (x DataPacket_Kind) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use DataPacket_Kind.Descriptor instead. +func (DataPacket_Kind) EnumDescriptor() ([]byte, []int) { + return file_livekit_rtc_proto_rawDescGZIP(), []int{16, 0} +} + type SignalRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1244,6 +1290,152 @@ func (x *ICEServer) GetCredential() string { return "" } +// new DataPacket API +type DataPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Kind DataPacket_Kind `protobuf:"varint,1,opt,name=kind,proto3,enum=livekit.DataPacket_Kind" json:"kind,omitempty"` + // Types that are assignable to Value: + // *DataPacket_User + // *DataPacket_Speaker + Value isDataPacket_Value `protobuf_oneof:"value"` +} + +func (x *DataPacket) Reset() { + *x = DataPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_livekit_rtc_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DataPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DataPacket) ProtoMessage() {} + +func (x *DataPacket) ProtoReflect() protoreflect.Message { + mi := &file_livekit_rtc_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DataPacket.ProtoReflect.Descriptor instead. +func (*DataPacket) Descriptor() ([]byte, []int) { + return file_livekit_rtc_proto_rawDescGZIP(), []int{16} +} + +func (x *DataPacket) GetKind() DataPacket_Kind { + if x != nil { + return x.Kind + } + return DataPacket_RELIABLE +} + +func (m *DataPacket) GetValue() isDataPacket_Value { + if m != nil { + return m.Value + } + return nil +} + +func (x *DataPacket) GetUser() *UserPacket { + if x, ok := x.GetValue().(*DataPacket_User); ok { + return x.User + } + return nil +} + +func (x *DataPacket) GetSpeaker() *ActiveSpeakerUpdate { + if x, ok := x.GetValue().(*DataPacket_Speaker); ok { + return x.Speaker + } + return nil +} + +type isDataPacket_Value interface { + isDataPacket_Value() +} + +type DataPacket_User struct { + User *UserPacket `protobuf:"bytes,2,opt,name=user,proto3,oneof"` +} + +type DataPacket_Speaker struct { + Speaker *ActiveSpeakerUpdate `protobuf:"bytes,3,opt,name=speaker,proto3,oneof"` +} + +func (*DataPacket_User) isDataPacket_Value() {} + +func (*DataPacket_Speaker) isDataPacket_Value() {} + +type UserPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // participant ID of user that sent the message + ParticipantSid string `protobuf:"bytes,1,opt,name=participant_sid,json=participantSid,proto3" json:"participant_sid,omitempty"` + // user defined payload + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *UserPacket) Reset() { + *x = UserPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_livekit_rtc_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UserPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UserPacket) ProtoMessage() {} + +func (x *UserPacket) ProtoReflect() protoreflect.Message { + mi := &file_livekit_rtc_proto_msgTypes[17] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UserPacket.ProtoReflect.Descriptor instead. +func (*UserPacket) Descriptor() ([]byte, []int) { + return file_livekit_rtc_proto_rawDescGZIP(), []int{17} +} + +func (x *UserPacket) GetParticipantSid() string { + if x != nil { + return x.ParticipantSid + } + return "" +} + +func (x *UserPacket) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + var File_livekit_rtc_proto protoreflect.FileDescriptor var file_livekit_rtc_proto_rawDesc = []byte{ @@ -1391,17 +1583,34 @@ var file_livekit_rtc_proto_rawDesc = []byte{ 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, - 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x2a, 0x2d, 0x0a, 0x0c, 0x53, 0x69, - 0x67, 0x6e, 0x61, 0x6c, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x55, - 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x53, 0x55, 0x42, - 0x53, 0x43, 0x52, 0x49, 0x42, 0x45, 0x52, 0x10, 0x01, 0x2a, 0x2d, 0x0a, 0x0c, 0x56, 0x69, 0x64, - 0x65, 0x6f, 0x51, 0x75, 0x61, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x4f, 0x57, - 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4d, 0x45, 0x44, 0x49, 0x55, 0x4d, 0x10, 0x01, 0x12, 0x08, - 0x0a, 0x04, 0x48, 0x49, 0x47, 0x48, 0x10, 0x02, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, - 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x22, 0xc9, 0x01, 0x0a, 0x0a, 0x44, + 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x2c, 0x0a, 0x04, 0x6b, 0x69, 0x6e, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, + 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x4b, 0x69, 0x6e, + 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, + 0x55, 0x73, 0x65, 0x72, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x48, 0x00, 0x52, 0x04, 0x75, 0x73, + 0x65, 0x72, 0x12, 0x38, 0x0a, 0x07, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x65, 0x53, 0x70, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x48, 0x00, 0x52, 0x07, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x22, 0x1f, 0x0a, 0x04, + 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x4c, 0x49, 0x41, 0x42, 0x4c, 0x45, + 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x4c, 0x4f, 0x53, 0x53, 0x59, 0x10, 0x01, 0x42, 0x07, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x4f, 0x0a, 0x0a, 0x55, 0x73, 0x65, 0x72, 0x50, 0x61, + 0x63, 0x6b, 0x65, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, + 0x61, 0x6e, 0x74, 0x5f, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x53, 0x69, 0x64, 0x12, 0x18, 0x0a, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x2d, 0x0a, 0x0c, 0x53, 0x69, 0x67, 0x6e, 0x61, + 0x6c, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x55, 0x42, 0x4c, 0x49, + 0x53, 0x48, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x53, 0x55, 0x42, 0x53, 0x43, 0x52, + 0x49, 0x42, 0x45, 0x52, 0x10, 0x01, 0x2a, 0x2d, 0x0a, 0x0c, 0x56, 0x69, 0x64, 0x65, 0x6f, 0x51, + 0x75, 0x61, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x4f, 0x57, 0x10, 0x00, 0x12, + 0x0a, 0x0a, 0x06, 0x4d, 0x45, 0x44, 0x49, 0x55, 0x4d, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x48, + 0x49, 0x47, 0x48, 0x10, 0x02, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1416,65 +1625,71 @@ func file_livekit_rtc_proto_rawDescGZIP() []byte { return file_livekit_rtc_proto_rawDescData } -var file_livekit_rtc_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_livekit_rtc_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_livekit_rtc_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_livekit_rtc_proto_msgTypes = make([]protoimpl.MessageInfo, 18) var file_livekit_rtc_proto_goTypes = []interface{}{ (SignalTarget)(0), // 0: livekit.SignalTarget (VideoQuality)(0), // 1: livekit.VideoQuality - (*SignalRequest)(nil), // 2: livekit.SignalRequest - (*SignalResponse)(nil), // 3: livekit.SignalResponse - (*AddTrackRequest)(nil), // 4: livekit.AddTrackRequest - (*TrickleRequest)(nil), // 5: livekit.TrickleRequest - (*MuteTrackRequest)(nil), // 6: livekit.MuteTrackRequest - (*NegotiationRequest)(nil), // 7: livekit.NegotiationRequest - (*JoinResponse)(nil), // 8: livekit.JoinResponse - (*TrackPublishedResponse)(nil), // 9: livekit.TrackPublishedResponse - (*SessionDescription)(nil), // 10: livekit.SessionDescription - (*ParticipantUpdate)(nil), // 11: livekit.ParticipantUpdate - (*ActiveSpeakerUpdate)(nil), // 12: livekit.ActiveSpeakerUpdate - (*SpeakerInfo)(nil), // 13: livekit.SpeakerInfo - (*UpdateSubscription)(nil), // 14: livekit.UpdateSubscription - (*UpdateTrackSettings)(nil), // 15: livekit.UpdateTrackSettings - (*LeaveRequest)(nil), // 16: livekit.LeaveRequest - (*ICEServer)(nil), // 17: livekit.ICEServer - (TrackType)(0), // 18: livekit.TrackType - (*Room)(nil), // 19: livekit.Room - (*ParticipantInfo)(nil), // 20: livekit.ParticipantInfo - (*TrackInfo)(nil), // 21: livekit.TrackInfo + (DataPacket_Kind)(0), // 2: livekit.DataPacket.Kind + (*SignalRequest)(nil), // 3: livekit.SignalRequest + (*SignalResponse)(nil), // 4: livekit.SignalResponse + (*AddTrackRequest)(nil), // 5: livekit.AddTrackRequest + (*TrickleRequest)(nil), // 6: livekit.TrickleRequest + (*MuteTrackRequest)(nil), // 7: livekit.MuteTrackRequest + (*NegotiationRequest)(nil), // 8: livekit.NegotiationRequest + (*JoinResponse)(nil), // 9: livekit.JoinResponse + (*TrackPublishedResponse)(nil), // 10: livekit.TrackPublishedResponse + (*SessionDescription)(nil), // 11: livekit.SessionDescription + (*ParticipantUpdate)(nil), // 12: livekit.ParticipantUpdate + (*ActiveSpeakerUpdate)(nil), // 13: livekit.ActiveSpeakerUpdate + (*SpeakerInfo)(nil), // 14: livekit.SpeakerInfo + (*UpdateSubscription)(nil), // 15: livekit.UpdateSubscription + (*UpdateTrackSettings)(nil), // 16: livekit.UpdateTrackSettings + (*LeaveRequest)(nil), // 17: livekit.LeaveRequest + (*ICEServer)(nil), // 18: livekit.ICEServer + (*DataPacket)(nil), // 19: livekit.DataPacket + (*UserPacket)(nil), // 20: livekit.UserPacket + (TrackType)(0), // 21: livekit.TrackType + (*Room)(nil), // 22: livekit.Room + (*ParticipantInfo)(nil), // 23: livekit.ParticipantInfo + (*TrackInfo)(nil), // 24: livekit.TrackInfo } var file_livekit_rtc_proto_depIdxs = []int32{ - 10, // 0: livekit.SignalRequest.offer:type_name -> livekit.SessionDescription - 10, // 1: livekit.SignalRequest.answer:type_name -> livekit.SessionDescription - 5, // 2: livekit.SignalRequest.trickle:type_name -> livekit.TrickleRequest - 4, // 3: livekit.SignalRequest.add_track:type_name -> livekit.AddTrackRequest - 6, // 4: livekit.SignalRequest.mute:type_name -> livekit.MuteTrackRequest - 14, // 5: livekit.SignalRequest.subscription:type_name -> livekit.UpdateSubscription - 15, // 6: livekit.SignalRequest.track_setting:type_name -> livekit.UpdateTrackSettings - 16, // 7: livekit.SignalRequest.leave:type_name -> livekit.LeaveRequest - 8, // 8: livekit.SignalResponse.join:type_name -> livekit.JoinResponse - 10, // 9: livekit.SignalResponse.answer:type_name -> livekit.SessionDescription - 10, // 10: livekit.SignalResponse.offer:type_name -> livekit.SessionDescription - 5, // 11: livekit.SignalResponse.trickle:type_name -> livekit.TrickleRequest - 11, // 12: livekit.SignalResponse.update:type_name -> livekit.ParticipantUpdate - 9, // 13: livekit.SignalResponse.track_published:type_name -> livekit.TrackPublishedResponse - 12, // 14: livekit.SignalResponse.speaker:type_name -> livekit.ActiveSpeakerUpdate - 16, // 15: livekit.SignalResponse.leave:type_name -> livekit.LeaveRequest - 18, // 16: livekit.AddTrackRequest.type:type_name -> livekit.TrackType + 11, // 0: livekit.SignalRequest.offer:type_name -> livekit.SessionDescription + 11, // 1: livekit.SignalRequest.answer:type_name -> livekit.SessionDescription + 6, // 2: livekit.SignalRequest.trickle:type_name -> livekit.TrickleRequest + 5, // 3: livekit.SignalRequest.add_track:type_name -> livekit.AddTrackRequest + 7, // 4: livekit.SignalRequest.mute:type_name -> livekit.MuteTrackRequest + 15, // 5: livekit.SignalRequest.subscription:type_name -> livekit.UpdateSubscription + 16, // 6: livekit.SignalRequest.track_setting:type_name -> livekit.UpdateTrackSettings + 17, // 7: livekit.SignalRequest.leave:type_name -> livekit.LeaveRequest + 9, // 8: livekit.SignalResponse.join:type_name -> livekit.JoinResponse + 11, // 9: livekit.SignalResponse.answer:type_name -> livekit.SessionDescription + 11, // 10: livekit.SignalResponse.offer:type_name -> livekit.SessionDescription + 6, // 11: livekit.SignalResponse.trickle:type_name -> livekit.TrickleRequest + 12, // 12: livekit.SignalResponse.update:type_name -> livekit.ParticipantUpdate + 10, // 13: livekit.SignalResponse.track_published:type_name -> livekit.TrackPublishedResponse + 13, // 14: livekit.SignalResponse.speaker:type_name -> livekit.ActiveSpeakerUpdate + 17, // 15: livekit.SignalResponse.leave:type_name -> livekit.LeaveRequest + 21, // 16: livekit.AddTrackRequest.type:type_name -> livekit.TrackType 0, // 17: livekit.TrickleRequest.target:type_name -> livekit.SignalTarget - 19, // 18: livekit.JoinResponse.room:type_name -> livekit.Room - 20, // 19: livekit.JoinResponse.participant:type_name -> livekit.ParticipantInfo - 20, // 20: livekit.JoinResponse.other_participants:type_name -> livekit.ParticipantInfo - 17, // 21: livekit.JoinResponse.ice_servers:type_name -> livekit.ICEServer - 21, // 22: livekit.TrackPublishedResponse.track:type_name -> livekit.TrackInfo - 20, // 23: livekit.ParticipantUpdate.participants:type_name -> livekit.ParticipantInfo - 13, // 24: livekit.ActiveSpeakerUpdate.speakers:type_name -> livekit.SpeakerInfo + 22, // 18: livekit.JoinResponse.room:type_name -> livekit.Room + 23, // 19: livekit.JoinResponse.participant:type_name -> livekit.ParticipantInfo + 23, // 20: livekit.JoinResponse.other_participants:type_name -> livekit.ParticipantInfo + 18, // 21: livekit.JoinResponse.ice_servers:type_name -> livekit.ICEServer + 24, // 22: livekit.TrackPublishedResponse.track:type_name -> livekit.TrackInfo + 23, // 23: livekit.ParticipantUpdate.participants:type_name -> livekit.ParticipantInfo + 14, // 24: livekit.ActiveSpeakerUpdate.speakers:type_name -> livekit.SpeakerInfo 1, // 25: livekit.UpdateSubscription.quality:type_name -> livekit.VideoQuality 1, // 26: livekit.UpdateTrackSettings.quality:type_name -> livekit.VideoQuality - 27, // [27:27] is the sub-list for method output_type - 27, // [27:27] is the sub-list for method input_type - 27, // [27:27] is the sub-list for extension type_name - 27, // [27:27] is the sub-list for extension extendee - 0, // [0:27] is the sub-list for field type_name + 2, // 27: livekit.DataPacket.kind:type_name -> livekit.DataPacket.Kind + 20, // 28: livekit.DataPacket.user:type_name -> livekit.UserPacket + 13, // 29: livekit.DataPacket.speaker:type_name -> livekit.ActiveSpeakerUpdate + 30, // [30:30] is the sub-list for method output_type + 30, // [30:30] is the sub-list for method input_type + 30, // [30:30] is the sub-list for extension type_name + 30, // [30:30] is the sub-list for extension extendee + 0, // [0:30] is the sub-list for field type_name } func init() { file_livekit_rtc_proto_init() } @@ -1676,6 +1891,30 @@ func file_livekit_rtc_proto_init() { return nil } } + file_livekit_rtc_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DataPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_livekit_rtc_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UserPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_livekit_rtc_proto_msgTypes[0].OneofWrappers = []interface{}{ (*SignalRequest_Offer)(nil), @@ -1697,13 +1936,17 @@ func file_livekit_rtc_proto_init() { (*SignalResponse_Speaker)(nil), (*SignalResponse_Leave)(nil), } + file_livekit_rtc_proto_msgTypes[16].OneofWrappers = []interface{}{ + (*DataPacket_User)(nil), + (*DataPacket_Speaker)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_livekit_rtc_proto_rawDesc, - NumEnums: 2, - NumMessages: 16, + NumEnums: 3, + NumMessages: 18, NumExtensions: 0, NumServices: 0, }, diff --git a/version/version.go b/version/version.go index 9e158c506..6e73a937d 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -const Version = "0.8.5" +const Version = "0.9.0"