From 35b8319b08cc749ad6500ea71dbc7df8cf74450f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 29 Apr 2023 09:18:07 +0530 Subject: [PATCH] Remove disallowed subscriptions on close. (#1668) With subscription manager, there is no need to tell a publisher about a subscriber going away. Before subscription manager, the up track manager of a participant (i. e. the publisher side) was holding a list of pending subscriptions for its published tracks and that had to be cleaned up if one of the subscriber goes away. That is not the case any more. Also set publisherID early so that subscription permission update has the right publisherID. In fact, saw an empty ID in the logs and saw that we still have the disallowed subscription handling which is not necessary any more. --- pkg/rtc/participant.go | 24 +++---------------- pkg/rtc/room.go | 14 ----------- pkg/rtc/subscriptionmanager.go | 3 ++- pkg/rtc/types/interfaces.go | 2 +- .../typesfakes/fake_local_participant.go | 12 +++++----- pkg/service/roommanager.go | 4 +--- 6 files changed, 13 insertions(+), 46 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fe8bc3c57..793defe93 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -126,8 +126,6 @@ type ParticipantImpl struct { *UpTrackManager *SubscriptionManager - // tracks and participants that this participant isn't allowed to subscribe to - disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -163,7 +161,7 @@ type ParticipantImpl struct { migrateState atomic.Value // types.MigrateState - onClose func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + onClose func(types.LocalParticipant) onClaimsChanged func(participant types.LocalParticipant) onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) @@ -187,7 +185,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { rtcpCh: make(chan []rtcp.Packet, 100), pendingTracks: make(map[string]*pendingTrackInfo), pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo), - disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), rttUpdatedAt: time.Now(), cachedDownTracks: make(map[livekit.TrackID]*downTrackState), @@ -496,7 +493,7 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *li p.lock.Unlock() } -func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { +func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) { p.lock.Lock() p.onClose = callback p.lock.Unlock() @@ -684,13 +681,6 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.UpTrackManager.Close(!sendLeave) - p.lock.Lock() - disallowedSubscriptions := make(map[livekit.TrackID]livekit.ParticipantID) - for trackID, publisherID := range p.disallowedSubscriptions { - disallowedSubscriptions[trackID] = publisherID - } - p.lock.Unlock() - p.updateState(livekit.ParticipantInfo_DISCONNECTED) // ensure this is synchronized @@ -699,7 +689,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea onClose := p.onClose p.lock.RUnlock() if onClose != nil { - onClose(p, disallowedSubscriptions) + onClose(p) } // Close peer connections without blocking participant Close. If peer connections are gathering candidates @@ -976,14 +966,6 @@ func (p *ParticipantImpl) onTrackUnsubscribed(subTrack types.SubscribedTrack) { } func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) { - p.lock.Lock() - if allowed { - delete(p.disallowedSubscriptions, trackID) - } else { - p.disallowedSubscriptions[trackID] = publisherID - } - p.lock.Unlock() - p.params.Logger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ba5d5db44..72cbb206f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -533,20 +533,6 @@ func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, return nil } -func (r *Room) RemoveDisallowedSubscriptions(sub types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { - for trackID, publisherID := range disallowedSubscriptions { - pub := r.GetParticipantByID(publisherID) - if pub == nil { - continue - } - - track := pub.GetPublishedTrack(trackID) - if track != nil { - track.RemoveSubscriber(sub.ID(), false) - } - } -} - func (r *Room) UpdateVideoLayers(participant types.Participant, updateVideoLayers *livekit.UpdateVideoLayers) error { return participant.UpdateVideoLayers(updateVideoLayers) } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 640e85d19..0e43f400f 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -449,6 +449,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { return ErrSubscriptionLimitExceeded } + s.setPublisher(res.PublisherIdentity, res.PublisherID) + // since hasPermission defaults to true, we will want to send a message to the client the first time // that we discover permissions were denied permChanged := s.setHasPermission(res.HasPermission) @@ -459,7 +461,6 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { return ErrNoTrackPermission } - s.setPublisher(res.PublisherIdentity, res.PublisherID) subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && err != errAlreadySubscribed { // ignore already subscribed error diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 625c3ecb0..3c6299f9e 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -315,7 +315,7 @@ type LocalParticipant interface { OnParticipantUpdate(callback func(LocalParticipant)) OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket)) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) - OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) + OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 33416aa05..e4ddfea5f 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -458,10 +458,10 @@ type FakeLocalParticipant struct { onClaimsChangedArgsForCall []struct { arg1 func(types.LocalParticipant) } - OnCloseStub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) + OnCloseStub func(func(types.LocalParticipant)) onCloseMutex sync.RWMutex onCloseArgsForCall []struct { - arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + arg1 func(types.LocalParticipant) } OnDataPacketStub func(func(types.LocalParticipant, *livekit.DataPacket)) onDataPacketMutex sync.RWMutex @@ -3186,10 +3186,10 @@ func (fake *FakeLocalParticipant) OnClaimsChangedArgsForCall(i int) func(types.L return argsForCall.arg1 } -func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { +func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant)) { fake.onCloseMutex.Lock() fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { - arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + arg1 func(types.LocalParticipant) }{arg1}) stub := fake.OnCloseStub fake.recordInvocation("OnClose", []interface{}{arg1}) @@ -3205,13 +3205,13 @@ func (fake *FakeLocalParticipant) OnCloseCallCount() int { return len(fake.onCloseArgsForCall) } -func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))) { +func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant))) { fake.onCloseMutex.Lock() defer fake.onCloseMutex.Unlock() fake.OnCloseStub = stub } -func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) { +func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant) { fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() argsForCall := fake.onCloseArgsForCall[i] diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 4a63ea4e1..3966d4f97 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -376,7 +376,7 @@ func (r *RoomManager) StartSession( clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) - participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { + participant.OnClose(func(p types.LocalParticipant) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) } @@ -385,8 +385,6 @@ func (r *RoomManager) StartSession( proto := room.ToProto() persistRoomForParticipantCount(proto) r.telemetry.ParticipantLeft(ctx, proto, p.ToProto(), true) - - room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions) }) participant.OnClaimsChanged(func(participant types.LocalParticipant) { pLogger.Debugw("refreshing client token after claims change")