diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fe8bc3c57..793defe93 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -126,8 +126,6 @@ type ParticipantImpl struct { *UpTrackManager *SubscriptionManager - // tracks and participants that this participant isn't allowed to subscribe to - disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -163,7 +161,7 @@ type ParticipantImpl struct { migrateState atomic.Value // types.MigrateState - onClose func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + onClose func(types.LocalParticipant) onClaimsChanged func(participant types.LocalParticipant) onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) @@ -187,7 +185,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { rtcpCh: make(chan []rtcp.Packet, 100), pendingTracks: make(map[string]*pendingTrackInfo), pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo), - disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), rttUpdatedAt: time.Now(), cachedDownTracks: make(map[livekit.TrackID]*downTrackState), @@ -496,7 +493,7 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *li p.lock.Unlock() } -func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { +func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) { p.lock.Lock() p.onClose = callback p.lock.Unlock() @@ -684,13 +681,6 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.UpTrackManager.Close(!sendLeave) - p.lock.Lock() - disallowedSubscriptions := make(map[livekit.TrackID]livekit.ParticipantID) - for trackID, publisherID := range p.disallowedSubscriptions { - disallowedSubscriptions[trackID] = publisherID - } - p.lock.Unlock() - p.updateState(livekit.ParticipantInfo_DISCONNECTED) // ensure this is synchronized @@ -699,7 +689,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea onClose := p.onClose p.lock.RUnlock() if onClose != nil { - onClose(p, disallowedSubscriptions) + onClose(p) } // Close peer connections without blocking participant Close. If peer connections are gathering candidates @@ -976,14 +966,6 @@ func (p *ParticipantImpl) onTrackUnsubscribed(subTrack types.SubscribedTrack) { } func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) { - p.lock.Lock() - if allowed { - delete(p.disallowedSubscriptions, trackID) - } else { - p.disallowedSubscriptions[trackID] = publisherID - } - p.lock.Unlock() - p.params.Logger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ba5d5db44..72cbb206f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -533,20 +533,6 @@ func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, return nil } -func (r *Room) RemoveDisallowedSubscriptions(sub types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { - for trackID, publisherID := range disallowedSubscriptions { - pub := r.GetParticipantByID(publisherID) - if pub == nil { - continue - } - - track := pub.GetPublishedTrack(trackID) - if track != nil { - track.RemoveSubscriber(sub.ID(), false) - } - } -} - func (r *Room) UpdateVideoLayers(participant types.Participant, updateVideoLayers *livekit.UpdateVideoLayers) error { return participant.UpdateVideoLayers(updateVideoLayers) } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 640e85d19..0e43f400f 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -449,6 +449,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { return ErrSubscriptionLimitExceeded } + s.setPublisher(res.PublisherIdentity, res.PublisherID) + // since hasPermission defaults to true, we will want to send a message to the client the first time // that we discover permissions were denied permChanged := s.setHasPermission(res.HasPermission) @@ -459,7 +461,6 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { return ErrNoTrackPermission } - s.setPublisher(res.PublisherIdentity, res.PublisherID) subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && err != errAlreadySubscribed { // ignore already subscribed error diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 625c3ecb0..3c6299f9e 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -315,7 +315,7 @@ type LocalParticipant interface { OnParticipantUpdate(callback func(LocalParticipant)) OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket)) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) - OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) + OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 33416aa05..e4ddfea5f 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -458,10 +458,10 @@ type FakeLocalParticipant struct { onClaimsChangedArgsForCall []struct { arg1 func(types.LocalParticipant) } - OnCloseStub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) + OnCloseStub func(func(types.LocalParticipant)) onCloseMutex sync.RWMutex onCloseArgsForCall []struct { - arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + arg1 func(types.LocalParticipant) } OnDataPacketStub func(func(types.LocalParticipant, *livekit.DataPacket)) onDataPacketMutex sync.RWMutex @@ -3186,10 +3186,10 @@ func (fake *FakeLocalParticipant) OnClaimsChangedArgsForCall(i int) func(types.L return argsForCall.arg1 } -func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { +func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant)) { fake.onCloseMutex.Lock() fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { - arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) + arg1 func(types.LocalParticipant) }{arg1}) stub := fake.OnCloseStub fake.recordInvocation("OnClose", []interface{}{arg1}) @@ -3205,13 +3205,13 @@ func (fake *FakeLocalParticipant) OnCloseCallCount() int { return len(fake.onCloseArgsForCall) } -func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))) { +func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant))) { fake.onCloseMutex.Lock() defer fake.onCloseMutex.Unlock() fake.OnCloseStub = stub } -func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) { +func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant) { fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() argsForCall := fake.onCloseArgsForCall[i] diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 4a63ea4e1..3966d4f97 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -376,7 +376,7 @@ func (r *RoomManager) StartSession( clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) - participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { + participant.OnClose(func(p types.LocalParticipant) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) } @@ -385,8 +385,6 @@ func (r *RoomManager) StartSession( proto := room.ToProto() persistRoomForParticipantCount(proto) r.telemetry.ParticipantLeft(ctx, proto, p.ToProto(), true) - - room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions) }) participant.OnClaimsChanged(func(participant types.LocalParticipant) { pLogger.Debugw("refreshing client token after claims change")