Perform unsubscribe in parallel to avoid blocking (#1760)

* Perform unsubscribe in parallel to avoid blocking

When unsubscribing from tracks, we flush a blank frame in order to prepare
the transceivers for re-use. This process is blocking for ~200ms. If
the unsubscribes are performed serially, it would prevent other subscribe
operation from continuing.

This PR parallelizes that operation, and ensures subsequent subscribe
operations could reuse the existing transceivers.

* also perform in parallel when uptrack close

* fix a few log fields
This commit is contained in:
David Zhao
2023-06-02 00:13:18 -07:00
committed by GitHub
parent 9a698736d1
commit b5c8fe5294
8 changed files with 53 additions and 20 deletions
+1 -1
View File
@@ -1686,7 +1686,7 @@ func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo
p.params.Logger.Debugw("add migrate muted track", "cid", cid, "track", ti.String())
rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid)
if rtpReceiver == nil {
p.params.Logger.Errorw("could not find receiver for migrated track", nil, "track", ti.Sid)
p.params.Logger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid)
return nil
}
+29 -7
View File
@@ -40,6 +40,7 @@ var (
// amount of time to try otherwise before flagging subscription as failed
subscriptionTimeout = iceFailedTimeout
trackRemoveGracePeriod = time.Second
maxUnsubscribeWait = time.Second
)
const (
@@ -60,9 +61,10 @@ type SubscriptionManagerParams struct {
// SubscriptionManager manages a participant's subscriptions
type SubscriptionManager struct {
params SubscriptionManagerParams
lock sync.RWMutex
subscriptions map[livekit.TrackID]*trackSubscription
params SubscriptionManagerParams
lock sync.RWMutex
subscriptions map[livekit.TrackID]*trackSubscription
pendingUnsubscribes atomic.Int32
subscribedVideoCount, subscribedAudioCount atomic.Int32
@@ -109,8 +111,15 @@ func (m *SubscriptionManager) Close(willBeResumed bool) {
}
}
for _, dt := range downTracksToClose {
dt.CloseWithFlush(!willBeResumed)
if willBeResumed {
for _, dt := range downTracksToClose {
dt.CloseWithFlush(false)
}
} else {
// flush blocks, so execute in parallel
for _, dt := range downTracksToClose {
go dt.CloseWithFlush(true)
}
}
}
@@ -274,6 +283,15 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) {
return
}
if s.needsSubscribe() {
if m.pendingUnsubscribes.Load() != 0 && s.durationSinceStart() < maxUnsubscribeWait {
// enqueue this in a bit, after pending unsubscribes are complete
go func() {
time.Sleep(time.Duration(sfu.RTPBlankFramesCloseSeconds * float32(time.Second)))
m.queueReconcile(s.trackID)
}()
return
}
numAttempts := s.getNumAttempts()
if numAttempts == 0 {
m.params.Telemetry.TrackSubscribeRequested(
@@ -498,7 +516,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
go m.params.OnTrackSubscribed(subTrack)
}
m.params.Logger.Debugw("subscribed to track", "track", s.trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load())
m.params.Logger.Debugw("subscribed to track", "trackID", s.trackID, "subscribedAudioCount", m.subscribedAudioCount.Load(), "subscribedVideoCount", m.subscribedVideoCount.Load())
// add mark the participant as someone we've subscribed to
firstSubscribe := false
@@ -531,7 +549,11 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error {
track := subTrack.MediaTrack()
pID := m.params.Participant.ID()
track.RemoveSubscriber(pID, false)
m.pendingUnsubscribes.Inc()
go func() {
defer m.pendingUnsubscribes.Dec()
track.RemoveSubscriber(pID, false)
}()
return nil
}
+3
View File
@@ -236,6 +236,9 @@ func TestUnsubscribe(t *testing.T) {
if s.needsUnsubscribe() {
return false
}
if sm.pendingUnsubscribes.Load() != 0 {
return false
}
sm.lock.RLock()
subLen := len(sm.subscriptions)
sm.lock.RUnlock()
+2 -2
View File
@@ -212,7 +212,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
}
func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "track", req.TrackSid, "muted", req.Muted)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", req.TrackSid, "muted", req.Muted)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
}
@@ -298,7 +298,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
for _, pt := range req.ParticipantTracks {
trackSIDs = append(trackSIDs, pt.TrackSids...)
}
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "track", trackSIDs)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", trackSIDs)
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateSubscriptions{
UpdateSubscriptions: req,
+1 -1
View File
@@ -830,7 +830,7 @@ func (d *DownTrack) Close() {
d.CloseWithFlush(true)
}
// Close track, flush used to indicate whether send blank frame to flush
// CloseWithFlush - flush used to indicate whether send blank frame to flush
// decoder of client.
// 1. When transceiver is reused by other participant's video track,
// set flush=true to avoid previous video shows before new stream is displayed.
+15 -3
View File
@@ -662,9 +662,7 @@ func (w *WebRTCReceiver) closeTracks() {
w.connectionStats.Close()
w.streamTrackerManager.Close()
for _, dt := range w.downTrackSpreader.ResetAndGetDownTracks() {
dt.Close()
}
closeTrackSenders(w.downTrackSpreader.ResetAndGetDownTracks())
if w.onCloseHandler != nil {
w.onCloseHandler()
@@ -753,3 +751,17 @@ func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSende
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
// closes all track senders in parallel, returns when all are closed
func closeTrackSenders(senders []TrackSender) {
wg := sync.WaitGroup{}
for _, dt := range senders {
dt := dt
wg.Add(1)
go func() {
defer wg.Done()
dt.Close()
}()
}
wg.Wait()
}
+1 -3
View File
@@ -98,9 +98,7 @@ func (r *RedPrimaryReceiver) CanClose() bool {
func (r *RedPrimaryReceiver) Close() {
r.closed.Store(true)
for _, dt := range r.downTrackSpreader.ResetAndGetDownTracks() {
dt.Close()
}
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
+1 -3
View File
@@ -95,9 +95,7 @@ func (r *RedReceiver) IsClosed() bool {
func (r *RedReceiver) Close() {
r.closed.Store(true)
for _, dt := range r.downTrackSpreader.ResetAndGetDownTracks() {
dt.Close()
}
closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks())
}
func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {