From ff473018204e1f51d8d419eef72efefc657bad74 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 17 Sep 2021 11:47:13 -0700 Subject: [PATCH] Implements protocol 3 speaker updates (#120) * Disallow AddTrack from participants that don't have the permission * Support protocol 3 speaker updates, client info * update protocol * Disallow AddTrack from participants that don't have the permission * increase wait time for GH to pass --- go.mod | 2 +- go.sum | 4 +- pkg/routing/interfaces.go | 14 +- pkg/routing/redisrouter.go | 26 +-- pkg/rtc/participant.go | 14 +- pkg/rtc/room.go | 64 +++++--- pkg/rtc/types/interfaces.go | 2 +- pkg/rtc/types/protocol_version.go | 5 + pkg/rtc/types/typesfakes/fake_participant.go | 158 +++++++++---------- pkg/service/roommanager.go | 6 +- pkg/service/rtcservice.go | 28 +++- test/client/client.go | 8 +- 12 files changed, 192 insertions(+), 139 deletions(-) diff --git a/go.mod b/go.mod index d8bce4dc5..5e81cb47f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.8.9 + github.com/livekit/protocol v0.9.0 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 25b27a1d0..bfdae964c 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.9 h1:Ih+tE5CXqdL1o6iXfZn+uBoddBXWceaOKVIun6aVuZM= github.com/livekit/ion-sfu v1.20.9/go.mod h1:g8hwobZI5fvX1RXvayf4ZXkgP7spV5YGE4yTSsumpB4= -github.com/livekit/protocol v0.8.9 h1:oZNXehaeuV906sbnKtc4SvOld0Bj7lTF0AerenwvP70= -github.com/livekit/protocol v0.8.9/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo= +github.com/livekit/protocol v0.9.0 h1:cxMmJAiFRvBVYa0Q0Rfd86BHSLG1zLyD+AE0paHoGAo= +github.com/livekit/protocol v0.9.0/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 7ff580cfe..b8058a28d 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -25,13 +25,13 @@ type MessageSource interface { } type ParticipantInit struct { - Identity string - Metadata string - Reconnect bool - Permission *livekit.ParticipantPermission - ProtocolVersion int32 - AutoSubscribe bool - Hidden bool + Identity string + Metadata string + Reconnect bool + Permission *livekit.ParticipantPermission + AutoSubscribe bool + Hidden bool + Client *livekit.ClientInfo } type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index ed7c74c87..364a20460 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -150,12 +150,12 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin Identity: pi.Identity, Metadata: pi.Metadata, // connection id is to allow the RTC node to identify where to route the message back to - ConnectionId: connectionId, - Reconnect: pi.Reconnect, - Permission: pi.Permission, - ProtocolVersion: pi.ProtocolVersion, - AutoSubscribe: pi.AutoSubscribe, - Hidden: pi.Hidden, + ConnectionId: connectionId, + Reconnect: pi.Reconnect, + Permission: pi.Permission, + AutoSubscribe: pi.AutoSubscribe, + Hidden: pi.Hidden, + Client: pi.Client, }) if err != nil { return @@ -219,13 +219,13 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK } pi := ParticipantInit{ - Identity: ss.Identity, - Metadata: ss.Metadata, - Reconnect: ss.Reconnect, - Permission: ss.Permission, - ProtocolVersion: ss.ProtocolVersion, - AutoSubscribe: ss.AutoSubscribe, - Hidden: ss.Hidden, + Identity: ss.Identity, + Metadata: ss.Metadata, + Reconnect: ss.Reconnect, + Permission: ss.Permission, + Client: ss.Client, + AutoSubscribe: ss.AutoSubscribe, + Hidden: ss.Hidden, } reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 702c2537b..da0c5149a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -321,6 +321,12 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { return } + if !p.CanPublish() { + logger.Warnw("no permission to publish track", nil, + "participant", p.Identity(), "pID", p.ID()) + return + } + ti := &livekit.TrackInfo{ Type: req.Type, Name: req.Name, @@ -526,14 +532,15 @@ func (p *ParticipantImpl) SendParticipantUpdate(participants []*livekit.Particip }) } -func (p *ParticipantImpl) SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error { +// SendSpeakerUpdate notifies participant changes to speakers. only send members that have changed since last update +func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error { if !p.IsReady() { return nil } return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Speaker{ - Speaker: &livekit.ActiveSpeakerUpdate{ + Message: &livekit.SignalResponse_SpeakersChanged{ + SpeakersChanged: &livekit.SpeakersChanged{ Speakers: speakers, }, }, @@ -769,6 +776,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } logger.Debugw("mediaTrack added", + "kind", track.Kind().String(), "participant", p.Identity(), "pID", p.ID(), "track", track.ID(), diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ae1fd8788..d525510e1 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -532,7 +532,8 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) { } } -func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { +// for protocol 2, send all active speakers +func (r *Room) sendActiveSpeakers(speakers []*livekit.SpeakerInfo) { dp := &livekit.DataPacket{ Kind: livekit.DataPacket_LOSSY, Value: &livekit.DataPacket_Speaker{ @@ -543,10 +544,17 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { } for _, p := range r.GetParticipants() { - if p.ProtocolVersion().HandlesDataPackets() { + if p.ProtocolVersion().HandlesDataPackets() && !p.ProtocolVersion().SupportsSpeakerChanged() { _ = p.SendDataPacket(dp) - } else { - _ = p.SendActiveSpeakers(speakers) + } + } +} + +// for protocol 3, send only changed updates +func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) { + for _, p := range r.GetParticipants() { + if p.ProtocolVersion().SupportsSpeakerChanged() { + _ = p.SendSpeakerUpdate(speakers) } } } @@ -562,15 +570,15 @@ func (r *Room) audioUpdateWorker() { activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel) } - var lastActiveSpeakers []*livekit.SpeakerInfo + lastActiveMap := make(map[string]*livekit.SpeakerInfo) for { if r.isClosed.Get() { return } - speakers := r.GetActiveSpeakers() + activeSpeakers := r.GetActiveSpeakers() if smoothValues != nil { - for _, speaker := range speakers { + for _, speaker := range activeSpeakers { sid := speaker.Sid level := smoothValues[sid] delete(smoothValues, sid) @@ -584,7 +592,7 @@ func (r *Room) audioUpdateWorker() { delete(smoothValues, sid) level += -level * smoothFactor if level > activeThreshold { - speakers = append(speakers, &livekit.SpeakerInfo{ + activeSpeakers = append(activeSpeakers, &livekit.SpeakerInfo{ Sid: sid, Level: level, Active: true, @@ -593,33 +601,45 @@ func (r *Room) audioUpdateWorker() { } // smoothValues map is drained, now repopulate it back - for _, speaker := range speakers { + for _, speaker := range activeSpeakers { smoothValues[speaker.Sid] = speaker.Level } - sort.Slice(speakers, func(i, j int) bool { - return speakers[i].Level > speakers[j].Level + sort.Slice(activeSpeakers, func(i, j int) bool { + return activeSpeakers[i].Level > activeSpeakers[j].Level }) } const invAudioLevelQuantization = 1.0 / AudioLevelQuantization - for _, speaker := range speakers { + for _, speaker := range activeSpeakers { speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization) } - // see if an update is needed - if len(speakers) == len(lastActiveSpeakers) { - for i, speaker := range speakers { - if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid { - r.sendSpeakerUpdates(speakers) - break - } + changedSpeakers := make([]*livekit.SpeakerInfo, 0, len(activeSpeakers)) + nextActiveMap := make(map[string]*livekit.SpeakerInfo, len(activeSpeakers)) + for _, speaker := range activeSpeakers { + prev := lastActiveMap[speaker.Sid] + if prev == nil || prev.Level != speaker.Level { + changedSpeakers = append(changedSpeakers, speaker) + } + nextActiveMap[speaker.Sid] = speaker + } + // changedSpeakers need to include previous speakers that are no longer speaking + for sid, speaker := range lastActiveMap { + if nextActiveMap[sid] == nil { + speaker.Level = 0 + speaker.Active = false + changedSpeakers = append(changedSpeakers, speaker) } - } else { - r.sendSpeakerUpdates(speakers) } - lastActiveSpeakers = speakers + // see if an update is needed + if len(changedSpeakers) > 0 { + r.sendActiveSpeakers(activeSpeakers) + r.sendSpeakerChanges(changedSpeakers) + } + + lastActiveMap = nextActiveMap time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond) } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 830dd50f8..ea0bd57d1 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -48,7 +48,7 @@ type Participant interface { RemoveSubscriber(peerId string) SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error SendParticipantUpdate(participants []*livekit.ParticipantInfo) error - SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error + SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error SetTrackMuted(trackId string, muted bool, fromAdmin bool) GetAudioLevel() (level uint8, active bool) diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index df74a1d33..c836e759d 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -20,3 +20,8 @@ func (v ProtocolVersion) HandlesDataPackets() bool { func (v ProtocolVersion) SubscriberAsPrimary() bool { return v > 2 } + +// SupportsSpeakerChanged - if client handles speaker info deltas, instead of a comprehensive list +func (v ProtocolVersion) SupportsSpeakerChanged() bool { + return v > 2 +} diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 0e06d6b1a..2ec93faa9 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -290,17 +290,6 @@ type FakeParticipant struct { removeSubscriberArgsForCall []struct { arg1 string } - SendActiveSpeakersStub func([]*livekit.SpeakerInfo) error - sendActiveSpeakersMutex sync.RWMutex - sendActiveSpeakersArgsForCall []struct { - arg1 []*livekit.SpeakerInfo - } - sendActiveSpeakersReturns struct { - result1 error - } - sendActiveSpeakersReturnsOnCall map[int]struct { - result1 error - } SendDataPacketStub func(*livekit.DataPacket) error sendDataPacketMutex sync.RWMutex sendDataPacketArgsForCall []struct { @@ -336,6 +325,17 @@ type FakeParticipant struct { sendParticipantUpdateReturnsOnCall map[int]struct { result1 error } + SendSpeakerUpdateStub func([]*livekit.SpeakerInfo) error + sendSpeakerUpdateMutex sync.RWMutex + sendSpeakerUpdateArgsForCall []struct { + arg1 []*livekit.SpeakerInfo + } + sendSpeakerUpdateReturns struct { + result1 error + } + sendSpeakerUpdateReturnsOnCall map[int]struct { + result1 error + } SetMetadataStub func(string) setMetadataMutex sync.RWMutex setMetadataArgsForCall []struct { @@ -1917,72 +1917,6 @@ func (fake *FakeParticipant) RemoveSubscriberArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeParticipant) SendActiveSpeakers(arg1 []*livekit.SpeakerInfo) error { - var arg1Copy []*livekit.SpeakerInfo - if arg1 != nil { - arg1Copy = make([]*livekit.SpeakerInfo, len(arg1)) - copy(arg1Copy, arg1) - } - fake.sendActiveSpeakersMutex.Lock() - ret, specificReturn := fake.sendActiveSpeakersReturnsOnCall[len(fake.sendActiveSpeakersArgsForCall)] - fake.sendActiveSpeakersArgsForCall = append(fake.sendActiveSpeakersArgsForCall, struct { - arg1 []*livekit.SpeakerInfo - }{arg1Copy}) - stub := fake.SendActiveSpeakersStub - fakeReturns := fake.sendActiveSpeakersReturns - fake.recordInvocation("SendActiveSpeakers", []interface{}{arg1Copy}) - fake.sendActiveSpeakersMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeParticipant) SendActiveSpeakersCallCount() int { - fake.sendActiveSpeakersMutex.RLock() - defer fake.sendActiveSpeakersMutex.RUnlock() - return len(fake.sendActiveSpeakersArgsForCall) -} - -func (fake *FakeParticipant) SendActiveSpeakersCalls(stub func([]*livekit.SpeakerInfo) error) { - fake.sendActiveSpeakersMutex.Lock() - defer fake.sendActiveSpeakersMutex.Unlock() - fake.SendActiveSpeakersStub = stub -} - -func (fake *FakeParticipant) SendActiveSpeakersArgsForCall(i int) []*livekit.SpeakerInfo { - fake.sendActiveSpeakersMutex.RLock() - defer fake.sendActiveSpeakersMutex.RUnlock() - argsForCall := fake.sendActiveSpeakersArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeParticipant) SendActiveSpeakersReturns(result1 error) { - fake.sendActiveSpeakersMutex.Lock() - defer fake.sendActiveSpeakersMutex.Unlock() - fake.SendActiveSpeakersStub = nil - fake.sendActiveSpeakersReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeParticipant) SendActiveSpeakersReturnsOnCall(i int, result1 error) { - fake.sendActiveSpeakersMutex.Lock() - defer fake.sendActiveSpeakersMutex.Unlock() - fake.SendActiveSpeakersStub = nil - if fake.sendActiveSpeakersReturnsOnCall == nil { - fake.sendActiveSpeakersReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.sendActiveSpeakersReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeParticipant) SendDataPacket(arg1 *livekit.DataPacket) error { fake.sendDataPacketMutex.Lock() ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)] @@ -2183,6 +2117,72 @@ func (fake *FakeParticipant) SendParticipantUpdateReturnsOnCall(i int, result1 e }{result1} } +func (fake *FakeParticipant) SendSpeakerUpdate(arg1 []*livekit.SpeakerInfo) error { + var arg1Copy []*livekit.SpeakerInfo + if arg1 != nil { + arg1Copy = make([]*livekit.SpeakerInfo, len(arg1)) + copy(arg1Copy, arg1) + } + fake.sendSpeakerUpdateMutex.Lock() + ret, specificReturn := fake.sendSpeakerUpdateReturnsOnCall[len(fake.sendSpeakerUpdateArgsForCall)] + fake.sendSpeakerUpdateArgsForCall = append(fake.sendSpeakerUpdateArgsForCall, struct { + arg1 []*livekit.SpeakerInfo + }{arg1Copy}) + stub := fake.SendSpeakerUpdateStub + fakeReturns := fake.sendSpeakerUpdateReturns + fake.recordInvocation("SendSpeakerUpdate", []interface{}{arg1Copy}) + fake.sendSpeakerUpdateMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) SendSpeakerUpdateCallCount() int { + fake.sendSpeakerUpdateMutex.RLock() + defer fake.sendSpeakerUpdateMutex.RUnlock() + return len(fake.sendSpeakerUpdateArgsForCall) +} + +func (fake *FakeParticipant) SendSpeakerUpdateCalls(stub func([]*livekit.SpeakerInfo) error) { + fake.sendSpeakerUpdateMutex.Lock() + defer fake.sendSpeakerUpdateMutex.Unlock() + fake.SendSpeakerUpdateStub = stub +} + +func (fake *FakeParticipant) SendSpeakerUpdateArgsForCall(i int) []*livekit.SpeakerInfo { + fake.sendSpeakerUpdateMutex.RLock() + defer fake.sendSpeakerUpdateMutex.RUnlock() + argsForCall := fake.sendSpeakerUpdateArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeParticipant) SendSpeakerUpdateReturns(result1 error) { + fake.sendSpeakerUpdateMutex.Lock() + defer fake.sendSpeakerUpdateMutex.Unlock() + fake.SendSpeakerUpdateStub = nil + fake.sendSpeakerUpdateReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeParticipant) SendSpeakerUpdateReturnsOnCall(i int, result1 error) { + fake.sendSpeakerUpdateMutex.Lock() + defer fake.sendSpeakerUpdateMutex.Unlock() + fake.SendSpeakerUpdateStub = nil + if fake.sendSpeakerUpdateReturnsOnCall == nil { + fake.sendSpeakerUpdateReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendSpeakerUpdateReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeParticipant) SetMetadata(arg1 string) { fake.setMetadataMutex.Lock() fake.setMetadataArgsForCall = append(fake.setMetadataArgsForCall, struct { @@ -2669,14 +2669,14 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.removeSubscribedTrackMutex.RUnlock() fake.removeSubscriberMutex.RLock() defer fake.removeSubscriberMutex.RUnlock() - fake.sendActiveSpeakersMutex.RLock() - defer fake.sendActiveSpeakersMutex.RUnlock() fake.sendDataPacketMutex.RLock() defer fake.sendDataPacketMutex.RUnlock() fake.sendJoinResponseMutex.RLock() defer fake.sendJoinResponseMutex.RUnlock() fake.sendParticipantUpdateMutex.RLock() defer fake.sendParticipantUpdateMutex.RUnlock() + fake.sendSpeakerUpdateMutex.RLock() + defer fake.sendSpeakerUpdateMutex.RUnlock() fake.setMetadataMutex.RLock() defer fake.setMetadataMutex.RUnlock() fake.setPermissionMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 7cce7c0d9..c7d65defc 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -227,10 +227,12 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi "room", roomName, "nodeID", r.currentNode.Id, "participant", pi.Identity, - "protocol", pi.ProtocolVersion, + "sdk", pi.Client.Sdk, + "sdkVersion", pi.Client.Version, + "protocol", pi.Client.Protocol, ) - pv := types.ProtocolVersion(pi.ProtocolVersion) + pv := types.ProtocolVersion(pi.Client.Protocol) rtcConf := *r.rtcConfig rtcConf.SetBufferFactory(room.GetBufferFactor()) participant, err = rtc.NewParticipant(rtc.ParticipantParams{ diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 6b390acec..e79ab7f8d 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strconv" "strings" @@ -66,7 +67,6 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, roomName := r.FormValue("room") reconnectParam := r.FormValue("reconnect") - protocolParam := r.FormValue("protocol") autoSubParam := r.FormValue("auto_subscribe") if onlyName != "" { @@ -79,13 +79,11 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, AutoSubscribe: true, Metadata: claims.Metadata, Hidden: claims.Video.Hidden, + Client: s.parseClientInfo(r.Form), } if autoSubParam != "" { pi.AutoSubscribe = boolValue(autoSubParam) } - if pv, err := strconv.Atoi(protocolParam); err == nil { - pi.ProtocolVersion = int32(pv) - } pi.Permission = permissionFromGrant(claims.Video) return roomName, pi, http.StatusOK, nil @@ -134,7 +132,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } sigConn := NewWSSignalConnection(conn) - if types.ProtocolVersion(pi.ProtocolVersion).SupportsProtobuf() { + if types.ProtocolVersion(pi.Client.Protocol).SupportsProtobuf() { sigConn.useJSON = false } @@ -201,3 +199,23 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } + +func (s *RTCService) parseClientInfo(values url.Values) *livekit.ClientInfo { + ci := &livekit.ClientInfo{} + if pv, err := strconv.Atoi(values.Get("protocol")); err == nil { + ci.Protocol = int32(pv) + } + sdkString := values.Get("sdk") + switch sdkString { + case "js": + ci.Sdk = livekit.ClientInfo_JS + case "ios": + ci.Sdk = livekit.ClientInfo_IOS + case "android": + ci.Sdk = livekit.ClientInfo_ANDROID + case "flutter": + ci.Sdk = livekit.ClientInfo_FLUTTER + } + ci.Version = values.Get("version") + return ci +} diff --git a/test/client/client.go b/test/client/client.go index 23d101f71..a61471221 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -540,15 +540,15 @@ func (c *RTCClient) ensurePublisherConnected() error { } dcOpen := utils.AtomicFlag{} - c.lossyDC.OnOpen(func() { + c.reliableDC.OnOpen(func() { dcOpen.TrySet(true) }) - if c.lossyDC.ReadyState() == webrtc.DataChannelStateOpen { + if c.reliableDC.ReadyState() == webrtc.DataChannelStateOpen { dcOpen.TrySet(true) } - // wait until connected - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // wait until connected, increase wait time since it takes more than 10s sometimes on GH + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select {