Remove disallowed subscriptions on close. (#1668)

With subscription manager, there is no need to tell a publisher
about a subscriber going away. Before subscription manager,
the up track manager of a participant (i. e. the publisher side)
was holding a list of pending subscriptions for its published tracks
and that had to be cleaned up if one of the subscriber goes away.
That is not the case any more.

Also set publisherID early so that subscription permission update has
the right publisherID. In fact, saw an empty ID in the logs and saw
that we still have the disallowed subscription handling which is not
necessary any more.
This commit is contained in:
Raja Subramanian
2023-04-29 09:18:07 +05:30
committed by GitHub
parent a08cd23b6d
commit 35b8319b08
6 changed files with 13 additions and 46 deletions

View File

@@ -126,8 +126,6 @@ type ParticipantImpl struct {
*UpTrackManager
*SubscriptionManager
// tracks and participants that this participant isn't allowed to subscribe to
disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
@@ -163,7 +161,7 @@ type ParticipantImpl struct {
migrateState atomic.Value // types.MigrateState
onClose func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)
onClose func(types.LocalParticipant)
onClaimsChanged func(participant types.LocalParticipant)
onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
@@ -187,7 +185,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
rtcpCh: make(chan []rtcp.Packet, 100),
pendingTracks: make(map[string]*pendingTrackInfo),
pendingPublishingTracks: make(map[livekit.TrackID]*pendingTrackInfo),
disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID),
connectedAt: time.Now(),
rttUpdatedAt: time.Now(),
cachedDownTracks: make(map[livekit.TrackID]*downTrackState),
@@ -496,7 +493,7 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *li
p.lock.Unlock()
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) {
p.lock.Lock()
p.onClose = callback
p.lock.Unlock()
@@ -684,13 +681,6 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.UpTrackManager.Close(!sendLeave)
p.lock.Lock()
disallowedSubscriptions := make(map[livekit.TrackID]livekit.ParticipantID)
for trackID, publisherID := range p.disallowedSubscriptions {
disallowedSubscriptions[trackID] = publisherID
}
p.lock.Unlock()
p.updateState(livekit.ParticipantInfo_DISCONNECTED)
// ensure this is synchronized
@@ -699,7 +689,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
onClose := p.onClose
p.lock.RUnlock()
if onClose != nil {
onClose(p, disallowedSubscriptions)
onClose(p)
}
// Close peer connections without blocking participant Close. If peer connections are gathering candidates
@@ -976,14 +966,6 @@ func (p *ParticipantImpl) onTrackUnsubscribed(subTrack types.SubscribedTrack) {
}
func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) {
p.lock.Lock()
if allowed {
delete(p.disallowedSubscriptions, trackID)
} else {
p.disallowedSubscriptions[trackID] = publisherID
}
p.lock.Unlock()
p.params.Logger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed)
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{

View File

@@ -533,20 +533,6 @@ func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant,
return nil
}
func (r *Room) RemoveDisallowedSubscriptions(sub types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
for trackID, publisherID := range disallowedSubscriptions {
pub := r.GetParticipantByID(publisherID)
if pub == nil {
continue
}
track := pub.GetPublishedTrack(trackID)
if track != nil {
track.RemoveSubscriber(sub.ID(), false)
}
}
}
func (r *Room) UpdateVideoLayers(participant types.Participant, updateVideoLayers *livekit.UpdateVideoLayers) error {
return participant.UpdateVideoLayers(updateVideoLayers)
}

View File

@@ -449,6 +449,8 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
return ErrSubscriptionLimitExceeded
}
s.setPublisher(res.PublisherIdentity, res.PublisherID)
// since hasPermission defaults to true, we will want to send a message to the client the first time
// that we discover permissions were denied
permChanged := s.setHasPermission(res.HasPermission)
@@ -459,7 +461,6 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
return ErrNoTrackPermission
}
s.setPublisher(res.PublisherIdentity, res.PublisherID)
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && err != errAlreadySubscribed {
// ignore already subscribed error

View File

@@ -315,7 +315,7 @@ type LocalParticipant interface {
OnParticipantUpdate(callback func(LocalParticipant))
OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket))
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)

View File

@@ -458,10 +458,10 @@ type FakeLocalParticipant struct {
onClaimsChangedArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnCloseStub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
OnCloseStub func(func(types.LocalParticipant))
onCloseMutex sync.RWMutex
onCloseArgsForCall []struct {
arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)
arg1 func(types.LocalParticipant)
}
OnDataPacketStub func(func(types.LocalParticipant, *livekit.DataPacket))
onDataPacketMutex sync.RWMutex
@@ -3186,10 +3186,10 @@ func (fake *FakeLocalParticipant) OnClaimsChangedArgsForCall(i int) func(types.L
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant)) {
fake.onCloseMutex.Lock()
fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct {
arg1 func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)
arg1 func(types.LocalParticipant)
}{arg1})
stub := fake.OnCloseStub
fake.recordInvocation("OnClose", []interface{}{arg1})
@@ -3205,13 +3205,13 @@ func (fake *FakeLocalParticipant) OnCloseCallCount() int {
return len(fake.onCloseArgsForCall)
}
func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))) {
func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant))) {
fake.onCloseMutex.Lock()
defer fake.onCloseMutex.Unlock()
fake.OnCloseStub = stub
}
func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID) {
func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant) {
fake.onCloseMutex.RLock()
defer fake.onCloseMutex.RUnlock()
argsForCall := fake.onCloseArgsForCall[i]

View File

@@ -376,7 +376,7 @@ func (r *RoomManager) StartSession(
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id}
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true)
participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
participant.OnClose(func(p types.LocalParticipant) {
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
pLogger.Errorw("could not delete participant", err)
}
@@ -385,8 +385,6 @@ func (r *RoomManager) StartSession(
proto := room.ToProto()
persistRoomForParticipantCount(proto)
r.telemetry.ParticipantLeft(ctx, proto, p.ToProto(), true)
room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions)
})
participant.OnClaimsChanged(func(participant types.LocalParticipant) {
pLogger.Debugw("refreshing client token after claims change")