From f962fef2c8fe9f08e1206901bbf4aa216b496ade Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 28 Jan 2022 08:34:13 +0530 Subject: [PATCH] Apply subscribed track settsings on add (#379) --- pkg/rtc/participant.go | 57 +++-- pkg/rtc/signalhandler.go | 12 +- pkg/rtc/types/interfaces.go | 17 +- .../typesfakes/fake_local_participant.go | 215 +++++++----------- pkg/rtc/uptrackmanager.go | 14 +- 5 files changed, 141 insertions(+), 174 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 85a83d8f1..9ffe35479 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -93,8 +93,10 @@ type ParticipantImpl struct { *UpTrackManager - // tracks the current participant is subscribed to, map of sid => DownTrack + // tracks the current participant is subscribed to, map of trackID => types.SubscribedTrack subscribedTracks map[livekit.TrackID]types.SubscribedTrack + // track settings of tracks the current participant is subscribed to, map of trackID => types.SubscribedTrack + subscribedTracksSettings map[livekit.TrackID]*livekit.UpdateTrackSettings // keeps track of disallowed tracks disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID // keep track of other publishers identities that we are subscribed to @@ -121,13 +123,14 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { // TODO: check to ensure params are valid, id and identity can't be empty p := &ParticipantImpl{ - params: params, - rtcpCh: make(chan []rtcp.Packet, 50), - pliThrottle: newPLIThrottle(params.ThrottleConfig), - pendingTracks: make(map[string]*pendingTrackInfo), - subscribedTracks: make(map[livekit.TrackID]types.SubscribedTrack), - disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), - connectedAt: time.Now(), + params: params, + rtcpCh: make(chan []rtcp.Packet, 50), + pliThrottle: newPLIThrottle(params.ThrottleConfig), + pendingTracks: make(map[string]*pendingTrackInfo), + subscribedTracks: make(map[livekit.TrackID]types.SubscribedTrack), + subscribedTracksSettings: make(map[livekit.TrackID]*livekit.UpdateTrackSettings), + disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), + connectedAt: time.Now(), } p.migrateState.Store(types.MigrateStateInit) p.state.Store(livekit.ParticipantInfo_JOINING) @@ -792,20 +795,20 @@ func (p *ParticipantImpl) SubscriberPC() *webrtc.PeerConnection { return p.subscriber.pc } -func (p *ParticipantImpl) GetSubscribedTrack(sid livekit.TrackID) types.SubscribedTrack { - p.lock.RLock() - defer p.lock.RUnlock() - return p.subscribedTracks[sid] -} +func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error { + p.lock.Lock() + p.subscribedTracksSettings[trackID] = settings -func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { - p.lock.RLock() - defer p.lock.RUnlock() - subscribed := make([]types.SubscribedTrack, 0, len(p.subscribedTracks)) - for _, st := range p.subscribedTracks { - subscribed = append(subscribed, st) + subTrack := p.subscribedTracks[trackID] + if subTrack == nil { + p.lock.Unlock() + p.params.Logger.Warnw("could not find subscribed track", nil, "trackID", trackID) + return errors.New("could not find subscribed track") } - return subscribed + p.lock.Unlock() + + subTrack.UpdateSubscriberSettings(settings) + return nil } // AddSubscribedTrack adds a track to the participant's subscribed list @@ -816,11 +819,17 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { "track", subTrack.ID()) p.lock.Lock() p.subscribedTracks[subTrack.ID()] = subTrack + settings := p.subscribedTracksSettings[subTrack.ID()] p.lock.Unlock() subTrack.OnBind(func() { p.subscriber.AddTrack(subTrack) }) + + if settings != nil { + subTrack.UpdateSubscriberSettings(settings) + } + p.subscribedTo.Store(subTrack.PublisherID(), struct{}{}) } @@ -842,7 +851,15 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) numRemaining++ } } + + // + // NOTE + // subscribedTrackSettings should not deleted on removal as it is needed if corresponding publisher migrated + // LK-TODO: find a way to clean these up + // + p.lock.Unlock() + if numRemaining == 0 { p.subscribedTo.Delete(subTrack.PublisherID()) diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 5ce7a4eee..64d95bf32 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -58,17 +58,13 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant } case *livekit.SignalRequest_TrackSetting: for _, sid := range livekit.StringsAsTrackIDs(msg.TrackSetting.TrackSids) { - subTrack := participant.GetSubscribedTrack(sid) - if subTrack == nil { - pLogger.Warnw("unable to find SubscribedTrack", nil, - "track", sid) + err := participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting) + if err != nil { + pLogger.Errorw("failed to update subscribed track settings", err, "trackID", sid) continue } - // find quality for published track - pLogger.Debugw("updating track settings", - "settings", msg.TrackSetting) - subTrack.UpdateSubscriberSettings(msg.TrackSetting) + pLogger.Debugw("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting) } case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 3b8e00fd6..ad30b88b4 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "time" "github.com/livekit/protocol/auth" @@ -33,6 +34,19 @@ const ( MigrateStateComplete ) +func (m MigrateState) String() string { + switch m { + case MigrateStateInit: + return "MIGRATE_STATE_INIT" + case MigrateStateSync: + return "MIGRATE_STATE_SYNC" + case MigrateStateComplete: + return "MIGRATE_STATE_COMPLETE" + default: + return fmt.Sprintf("%d", int(m)) + } +} + //counterfeiter:generate . Participant type Participant interface { ID() livekit.ParticipantID @@ -102,8 +116,7 @@ type LocalParticipant interface { ICERestart() error AddSubscribedTrack(st SubscribedTrack) RemoveSubscribedTrack(st SubscribedTrack) - GetSubscribedTrack(sid livekit.TrackID) SubscribedTrack - GetSubscribedTracks() []SubscribedTrack + UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []livekit.ParticipantID diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index db38a1164..4b629c104 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -189,27 +189,6 @@ type FakeLocalParticipant struct { getSubscribedParticipantsReturnsOnCall map[int]struct { result1 []livekit.ParticipantID } - GetSubscribedTrackStub func(livekit.TrackID) types.SubscribedTrack - getSubscribedTrackMutex sync.RWMutex - getSubscribedTrackArgsForCall []struct { - arg1 livekit.TrackID - } - getSubscribedTrackReturns struct { - result1 types.SubscribedTrack - } - getSubscribedTrackReturnsOnCall map[int]struct { - result1 types.SubscribedTrack - } - GetSubscribedTracksStub func() []types.SubscribedTrack - getSubscribedTracksMutex sync.RWMutex - getSubscribedTracksArgsForCall []struct { - } - getSubscribedTracksReturns struct { - result1 []types.SubscribedTrack - } - getSubscribedTracksReturnsOnCall map[int]struct { - result1 []types.SubscribedTrack - } HandleAnswerStub func(webrtc.SessionDescription) error handleAnswerMutex sync.RWMutex handleAnswerArgsForCall []struct { @@ -574,6 +553,18 @@ type FakeLocalParticipant struct { updateSubscribedQualityReturnsOnCall map[int]struct { result1 error } + UpdateSubscribedTrackSettingsStub func(livekit.TrackID, *livekit.UpdateTrackSettings) error + updateSubscribedTrackSettingsMutex sync.RWMutex + updateSubscribedTrackSettingsArgsForCall []struct { + arg1 livekit.TrackID + arg2 *livekit.UpdateTrackSettings + } + updateSubscribedTrackSettingsReturns struct { + result1 error + } + updateSubscribedTrackSettingsReturnsOnCall map[int]struct { + result1 error + } UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, func(participantID livekit.ParticipantID) types.LocalParticipant) error updateSubscriptionPermissionMutex sync.RWMutex updateSubscriptionPermissionArgsForCall []struct { @@ -1533,120 +1524,6 @@ func (fake *FakeLocalParticipant) GetSubscribedParticipantsReturnsOnCall(i int, }{result1} } -func (fake *FakeLocalParticipant) GetSubscribedTrack(arg1 livekit.TrackID) types.SubscribedTrack { - fake.getSubscribedTrackMutex.Lock() - ret, specificReturn := fake.getSubscribedTrackReturnsOnCall[len(fake.getSubscribedTrackArgsForCall)] - fake.getSubscribedTrackArgsForCall = append(fake.getSubscribedTrackArgsForCall, struct { - arg1 livekit.TrackID - }{arg1}) - stub := fake.GetSubscribedTrackStub - fakeReturns := fake.getSubscribedTrackReturns - fake.recordInvocation("GetSubscribedTrack", []interface{}{arg1}) - fake.getSubscribedTrackMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) GetSubscribedTrackCallCount() int { - fake.getSubscribedTrackMutex.RLock() - defer fake.getSubscribedTrackMutex.RUnlock() - return len(fake.getSubscribedTrackArgsForCall) -} - -func (fake *FakeLocalParticipant) GetSubscribedTrackCalls(stub func(livekit.TrackID) types.SubscribedTrack) { - fake.getSubscribedTrackMutex.Lock() - defer fake.getSubscribedTrackMutex.Unlock() - fake.GetSubscribedTrackStub = stub -} - -func (fake *FakeLocalParticipant) GetSubscribedTrackArgsForCall(i int) livekit.TrackID { - fake.getSubscribedTrackMutex.RLock() - defer fake.getSubscribedTrackMutex.RUnlock() - argsForCall := fake.getSubscribedTrackArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeLocalParticipant) GetSubscribedTrackReturns(result1 types.SubscribedTrack) { - fake.getSubscribedTrackMutex.Lock() - defer fake.getSubscribedTrackMutex.Unlock() - fake.GetSubscribedTrackStub = nil - fake.getSubscribedTrackReturns = struct { - result1 types.SubscribedTrack - }{result1} -} - -func (fake *FakeLocalParticipant) GetSubscribedTrackReturnsOnCall(i int, result1 types.SubscribedTrack) { - fake.getSubscribedTrackMutex.Lock() - defer fake.getSubscribedTrackMutex.Unlock() - fake.GetSubscribedTrackStub = nil - if fake.getSubscribedTrackReturnsOnCall == nil { - fake.getSubscribedTrackReturnsOnCall = make(map[int]struct { - result1 types.SubscribedTrack - }) - } - fake.getSubscribedTrackReturnsOnCall[i] = struct { - result1 types.SubscribedTrack - }{result1} -} - -func (fake *FakeLocalParticipant) GetSubscribedTracks() []types.SubscribedTrack { - fake.getSubscribedTracksMutex.Lock() - ret, specificReturn := fake.getSubscribedTracksReturnsOnCall[len(fake.getSubscribedTracksArgsForCall)] - fake.getSubscribedTracksArgsForCall = append(fake.getSubscribedTracksArgsForCall, struct { - }{}) - stub := fake.GetSubscribedTracksStub - fakeReturns := fake.getSubscribedTracksReturns - fake.recordInvocation("GetSubscribedTracks", []interface{}{}) - fake.getSubscribedTracksMutex.Unlock() - if stub != nil { - return stub() - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) GetSubscribedTracksCallCount() int { - fake.getSubscribedTracksMutex.RLock() - defer fake.getSubscribedTracksMutex.RUnlock() - return len(fake.getSubscribedTracksArgsForCall) -} - -func (fake *FakeLocalParticipant) GetSubscribedTracksCalls(stub func() []types.SubscribedTrack) { - fake.getSubscribedTracksMutex.Lock() - defer fake.getSubscribedTracksMutex.Unlock() - fake.GetSubscribedTracksStub = stub -} - -func (fake *FakeLocalParticipant) GetSubscribedTracksReturns(result1 []types.SubscribedTrack) { - fake.getSubscribedTracksMutex.Lock() - defer fake.getSubscribedTracksMutex.Unlock() - fake.GetSubscribedTracksStub = nil - fake.getSubscribedTracksReturns = struct { - result1 []types.SubscribedTrack - }{result1} -} - -func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result1 []types.SubscribedTrack) { - fake.getSubscribedTracksMutex.Lock() - defer fake.getSubscribedTracksMutex.Unlock() - fake.GetSubscribedTracksStub = nil - if fake.getSubscribedTracksReturnsOnCall == nil { - fake.getSubscribedTracksReturnsOnCall = make(map[int]struct { - result1 []types.SubscribedTrack - }) - } - fake.getSubscribedTracksReturnsOnCall[i] = struct { - result1 []types.SubscribedTrack - }{result1} -} - func (fake *FakeLocalParticipant) HandleAnswer(arg1 webrtc.SessionDescription) error { fake.handleAnswerMutex.Lock() ret, specificReturn := fake.handleAnswerReturnsOnCall[len(fake.handleAnswerArgsForCall)] @@ -3656,6 +3533,68 @@ func (fake *FakeLocalParticipant) UpdateSubscribedQualityReturnsOnCall(i int, re }{result1} } +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettings(arg1 livekit.TrackID, arg2 *livekit.UpdateTrackSettings) error { + fake.updateSubscribedTrackSettingsMutex.Lock() + ret, specificReturn := fake.updateSubscribedTrackSettingsReturnsOnCall[len(fake.updateSubscribedTrackSettingsArgsForCall)] + fake.updateSubscribedTrackSettingsArgsForCall = append(fake.updateSubscribedTrackSettingsArgsForCall, struct { + arg1 livekit.TrackID + arg2 *livekit.UpdateTrackSettings + }{arg1, arg2}) + stub := fake.UpdateSubscribedTrackSettingsStub + fakeReturns := fake.updateSubscribedTrackSettingsReturns + fake.recordInvocation("UpdateSubscribedTrackSettings", []interface{}{arg1, arg2}) + fake.updateSubscribedTrackSettingsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsCallCount() int { + fake.updateSubscribedTrackSettingsMutex.RLock() + defer fake.updateSubscribedTrackSettingsMutex.RUnlock() + return len(fake.updateSubscribedTrackSettingsArgsForCall) +} + +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsCalls(stub func(livekit.TrackID, *livekit.UpdateTrackSettings) error) { + fake.updateSubscribedTrackSettingsMutex.Lock() + defer fake.updateSubscribedTrackSettingsMutex.Unlock() + fake.UpdateSubscribedTrackSettingsStub = stub +} + +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsArgsForCall(i int) (livekit.TrackID, *livekit.UpdateTrackSettings) { + fake.updateSubscribedTrackSettingsMutex.RLock() + defer fake.updateSubscribedTrackSettingsMutex.RUnlock() + argsForCall := fake.updateSubscribedTrackSettingsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsReturns(result1 error) { + fake.updateSubscribedTrackSettingsMutex.Lock() + defer fake.updateSubscribedTrackSettingsMutex.Unlock() + fake.UpdateSubscribedTrackSettingsStub = nil + fake.updateSubscribedTrackSettingsReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsReturnsOnCall(i int, result1 error) { + fake.updateSubscribedTrackSettingsMutex.Lock() + defer fake.updateSubscribedTrackSettingsMutex.Unlock() + fake.UpdateSubscribedTrackSettingsStub = nil + if fake.updateSubscribedTrackSettingsReturnsOnCall == nil { + fake.updateSubscribedTrackSettingsReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateSubscribedTrackSettingsReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 func(participantID livekit.ParticipantID) types.LocalParticipant) error { fake.updateSubscriptionPermissionMutex.Lock() ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)] @@ -3818,10 +3757,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getResponseSinkMutex.RUnlock() fake.getSubscribedParticipantsMutex.RLock() defer fake.getSubscribedParticipantsMutex.RUnlock() - fake.getSubscribedTrackMutex.RLock() - defer fake.getSubscribedTrackMutex.RUnlock() - fake.getSubscribedTracksMutex.RLock() - defer fake.getSubscribedTracksMutex.RUnlock() fake.handleAnswerMutex.RLock() defer fake.handleAnswerMutex.RUnlock() fake.handleOfferMutex.RLock() @@ -3908,6 +3843,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.updateMediaLossMutex.RUnlock() fake.updateSubscribedQualityMutex.RLock() defer fake.updateSubscribedQualityMutex.RUnlock() + fake.updateSubscribedTrackSettingsMutex.RLock() + defer fake.updateSubscribedTrackSettingsMutex.RUnlock() fake.updateSubscriptionPermissionMutex.RLock() defer fake.updateSubscriptionPermissionMutex.RUnlock() fake.updateVideoLayersMutex.RLock() diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 71aad804a..f81815d82 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -148,11 +148,11 @@ func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted b currentMuted := track.IsMuted() track.SetMuted(muted) - if currentMuted != track.IsMuted() && u.onTrackUpdated != nil { - u.params.Logger.Debugw("mute status changed", - "track", trackID, - "muted", track.IsMuted()) - u.onTrackUpdated(track, false) + if currentMuted != track.IsMuted() { + u.params.Logger.Debugw("mute status changed", "track", trackID, "muted", track.IsMuted()) + if u.onTrackUpdated != nil { + u.onTrackUpdated(track, false) + } } } @@ -215,6 +215,10 @@ func (u *UpTrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVide } track.UpdateVideoLayers(updateVideoLayers.Layers) + if u.onTrackUpdated != nil { + u.onTrackUpdated(track, false) + } + return nil }