diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index a098cba07..ecdfd2082 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1686,7 +1686,7 @@ func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo p.params.Logger.Debugw("add migrate muted track", "cid", cid, "track", ti.String()) rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid) if rtpReceiver == nil { - p.params.Logger.Errorw("could not find receiver for migrated track", nil, "track", ti.Sid) + p.params.Logger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid) return nil } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 5bc0b54d7..18a3581ad 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -40,6 +40,7 @@ var ( // amount of time to try otherwise before flagging subscription as failed subscriptionTimeout = iceFailedTimeout trackRemoveGracePeriod = time.Second + maxUnsubscribeWait = time.Second ) const ( @@ -60,9 +61,10 @@ type SubscriptionManagerParams struct { // SubscriptionManager manages a participant's subscriptions type SubscriptionManager struct { - params SubscriptionManagerParams - lock sync.RWMutex - subscriptions map[livekit.TrackID]*trackSubscription + params SubscriptionManagerParams + lock sync.RWMutex + subscriptions map[livekit.TrackID]*trackSubscription + pendingUnsubscribes atomic.Int32 subscribedVideoCount, subscribedAudioCount atomic.Int32 @@ -109,8 +111,15 @@ func (m *SubscriptionManager) Close(willBeResumed bool) { } } - for _, dt := range downTracksToClose { - dt.CloseWithFlush(!willBeResumed) + if willBeResumed { + for _, dt := range downTracksToClose { + dt.CloseWithFlush(false) + } + } else { + // flush blocks, so execute in parallel + for _, dt := range downTracksToClose { + go dt.CloseWithFlush(true) + } } } @@ -274,6 +283,15 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) { return } if s.needsSubscribe() { + if m.pendingUnsubscribes.Load() != 0 && s.durationSinceStart() < maxUnsubscribeWait { + // enqueue this in a bit, after pending unsubscribes are complete + go func() { + time.Sleep(time.Duration(sfu.RTPBlankFramesCloseSeconds * float32(time.Second))) + m.queueReconcile(s.trackID) + }() + return + } + numAttempts := s.getNumAttempts() if numAttempts == 0 { m.params.Telemetry.TrackSubscribeRequested( @@ -498,7 +516,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { go m.params.OnTrackSubscribed(subTrack) } - m.params.Logger.Debugw("subscribed to track", "track", s.trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load()) + m.params.Logger.Debugw("subscribed to track", "trackID", s.trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load()) // add mark the participant as someone we've subscribed to firstSubscribe := false @@ -531,7 +549,11 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error { track := subTrack.MediaTrack() pID := m.params.Participant.ID() - track.RemoveSubscriber(pID, false) + m.pendingUnsubscribes.Inc() + go func() { + defer m.pendingUnsubscribes.Dec() + track.RemoveSubscriber(pID, false) + }() return nil } diff --git a/pkg/rtc/subscriptionmanager_test.go b/pkg/rtc/subscriptionmanager_test.go index 944655a0c..9ad6f20ea 100644 --- a/pkg/rtc/subscriptionmanager_test.go +++ b/pkg/rtc/subscriptionmanager_test.go @@ -236,6 +236,9 @@ func TestUnsubscribe(t *testing.T) { if s.needsUnsubscribe() { return false } + if sm.pendingUnsubscribes.Load() != 0 { + return false + } sm.lock.RLock() subLen := len(sm.subscriptions) sm.lock.RUnlock() diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index d4a4fe5a0..749e367de 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -212,7 +212,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa } func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) { - AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "track", req.TrackSid, "muted", req.Muted) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", req.TrackSid, "muted", req.Muted) if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { return nil, twirpAuthError(err) } @@ -298,7 +298,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda for _, pt := range req.ParticipantTracks { trackSIDs = append(trackSIDs, pt.TrackSids...) } - AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "track", trackSIDs) + AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", trackSIDs) err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateSubscriptions{ UpdateSubscriptions: req, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0d26ad127..b90d8ebb5 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -830,7 +830,7 @@ func (d *DownTrack) Close() { d.CloseWithFlush(true) } -// Close track, flush used to indicate whether send blank frame to flush +// CloseWithFlush - flush used to indicate whether send blank frame to flush // decoder of client. // 1. When transceiver is reused by other participant's video track, // set flush=true to avoid previous video shows before new stream is displayed. diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index eda4f7412..07c072501 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -662,9 +662,7 @@ func (w *WebRTCReceiver) closeTracks() { w.connectionStats.Close() w.streamTrackerManager.Close() - for _, dt := range w.downTrackSpreader.ResetAndGetDownTracks() { - dt.Close() - } + closeTrackSenders(w.downTrackSpreader.ResetAndGetDownTracks()) if w.onCloseHandler != nil { w.onCloseHandler() @@ -753,3 +751,17 @@ func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSende func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } + +// closes all track senders in parallel, returns when all are closed +func closeTrackSenders(senders []TrackSender) { + wg := sync.WaitGroup{} + for _, dt := range senders { + dt := dt + wg.Add(1) + go func() { + defer wg.Done() + dt.Close() + }() + } + wg.Wait() +} diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 9789bb32e..5f6fd8183 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -98,9 +98,7 @@ func (r *RedPrimaryReceiver) CanClose() bool { func (r *RedPrimaryReceiver) Close() { r.closed.Store(true) - for _, dt := range r.downTrackSpreader.ResetAndGetDownTracks() { - dt.Close() - } + closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks()) } func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 0d2876cf9..7a9adc700 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -95,9 +95,7 @@ func (r *RedReceiver) IsClosed() bool { func (r *RedReceiver) Close() { r.closed.Store(true) - for _, dt := range r.downTrackSpreader.ResetAndGetDownTracks() { - dt.Close() - } + closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks()) } func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {