From 2f902a3edc7f9996ecf9719efb683f502a70a855 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 25 Apr 2022 13:28:24 +0530 Subject: [PATCH] Remove callbacks queue from sfu/DownTrack (#655) * Remove callbacks queue from sfu/DownTrack - Connection stats callback was happening in connection stats go routine - RTT update launches a goroutine on the receive side as it affects the subscriber. So, no need to queue it. - Changed two things o Move close handler to goroutine. It is better that way as it touches the subscriber as well o Move max layer handling into a goroutine also so that the callback does minimal work. With this all the send side callback queues are removed. * small clean up --- pkg/rtc/mediatracksubscriptions.go | 95 ++++++++++++++++-------------- pkg/sfu/downtrack.go | 86 +++++++++------------------ 2 files changed, 81 insertions(+), 100 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index aeb34be04..d13b02f52 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -205,7 +205,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code }) downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) { - t.notifySubscriberMaxQuality(subscriberID, QualityForSpatialLayer(layer)) + go t.notifySubscriberMaxQuality(subscriberID, QualityForSpatialLayer(layer)) }) downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) { @@ -213,48 +213,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code }) downTrack.OnCloseHandler(func() { - t.subscribedTracksMu.Lock() - delete(t.subscribedTracks, subscriberID) - delete(t.pendingClose, subscriberID) - t.subscribedTracksMu.Unlock() - - t.maybeNotifyNoSubscribers() - - t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto()) - - // ignore if the subscribing sub is not connected - if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { - return - } - - // if the source has been terminated, we'll need to terminate all the subscribed tracks - // however, if the dest sub has disconnected, then we can skip - if sender == nil { - return - } - t.params.Logger.Debugw("removing peerconnection track", - "subscriber", sub.Identity(), - "subscriberID", subscriberID, - "kind", t.params.MediaTrack.Kind(), - ) - if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { - if err == webrtc.ErrConnectionClosed { - // sub closing, can skip removing subscribedtracks - return - } - if _, ok := err.(*rtcerr.InvalidStateError); !ok { - // most of these are safe to ignore, since the track state might have already - // been set to Inactive - t.params.Logger.Debugw("could not remove remoteTrack from forwarder", - "error", err, - "subscriber", sub.Identity(), - "subscriberID", subscriberID, - ) - } - } - - sub.RemoveSubscribedTrack(subTrack) - sub.Negotiate() + go t.downTrackClosed(sub, subTrack, sender) }) t.subscribedTracksMu.Lock() @@ -588,3 +547,53 @@ func (t *MediaTrackSubscriptions) maybeNotifyNoSubscribers() { t.onNoSubscribers() } } + +func (t *MediaTrackSubscriptions) downTrackClosed( + sub types.LocalParticipant, + subTrack types.SubscribedTrack, + sender *webrtc.RTPSender, +) { + subscriberID := sub.ID() + t.subscribedTracksMu.Lock() + delete(t.subscribedTracks, subscriberID) + delete(t.pendingClose, subscriberID) + t.subscribedTracksMu.Unlock() + + t.maybeNotifyNoSubscribers() + + t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto()) + + // ignore if the subscribing sub is not connected + if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { + return + } + + // if the source has been terminated, we'll need to terminate all the subscribed tracks + // however, if the dest sub has disconnected, then we can skip + if sender == nil { + return + } + t.params.Logger.Debugw("removing peerconnection track", + "subscriber", sub.Identity(), + "subscriberID", subscriberID, + "kind", t.params.MediaTrack.Kind(), + ) + if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { + if err == webrtc.ErrConnectionClosed { + // sub closing, can skip removing subscribedtracks + return + } + if _, ok := err.(*rtcerr.InvalidStateError); !ok { + // most of these are safe to ignore, since the track state might have already + // been set to Inactive + t.params.Logger.Debugw("could not remove remoteTrack from forwarder", + "error", err, + "subscriber", sub.Identity(), + "subscriberID", subscriberID, + ) + } + } + + sub.RemoveSubscribedTrack(subTrack) + sub.Negotiate() +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0d59ff649..1fd24af41 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -21,7 +21,6 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" - "github.com/livekit/livekit-server/pkg/utils" ) // TrackSender defines an interface send media to remote peer @@ -118,8 +117,6 @@ type DownTrack struct { isNACKThrottled atomic.Bool - callbacksQueue *utils.OpsQueue - // RTCP callbacks onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC) @@ -172,17 +169,16 @@ func NewDownTrack( } d := &DownTrack{ - logger: logger, - id: r.TrackID(), - peerID: peerID, - maxTrack: mt, - streamID: r.StreamID(), - bufferFactory: bf, - receiver: r, - codec: c, - kind: kind, - forwarder: NewForwarder(c, kind, logger), - callbacksQueue: utils.NewOpsQueue(logger, "sfu-downtrack", 50), + logger: logger, + id: r.TrackID(), + peerID: peerID, + maxTrack: mt, + streamID: r.StreamID(), + bufferFactory: bf, + receiver: r, + codec: c, + kind: kind, + forwarder: NewForwarder(c, kind, logger), } d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -196,9 +192,7 @@ func NewDownTrack( }) d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) { if d.onStatsUpdate != nil { - d.callbacksQueue.Enqueue(func() { - d.onStatsUpdate(d, stat) - }) + d.onStatsUpdate(d, stat) } }) @@ -226,8 +220,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec } - d.callbacksQueue.Start() - d.ssrc = uint32(t.SSRC()) d.payloadType = uint8(codec.PayloadType) d.writeStream = t.WriteStream() @@ -240,10 +232,11 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, if strings.HasPrefix(d.codec.MimeType, "video/") { d.sequencer = newSequencer(d.maxTrack, d.logger) } - if d.onBind != nil { - d.callbacksQueue.Enqueue(d.onBind) - } + d.bound.Store(true) + if d.onBind != nil { + d.onBind() + } d.connectionStats.Start() d.logger.Debugw("bound") @@ -402,9 +395,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } if tp.isSwitchingToMaxLayer && d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - d.callbacksQueue.Enqueue(func() { - d.onMaxLayerChanged(d, layer) - }) + d.onMaxLayerChanged(d, layer) } if extPkt.KeyFrame { @@ -531,20 +522,16 @@ func (d *DownTrack) Mute(muted bool) { } if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - if muted { - d.callbacksQueue.Enqueue(func() { - d.onMaxLayerChanged(d, InvalidLayerSpatial) - }) - } else { + notifyLayer := InvalidLayerSpatial + if !muted { // // When unmuting, don't wait for layer lock as // client might need to be notified to start layers // before locking can happen in the forwarder. // - d.callbacksQueue.Enqueue(func() { - d.onMaxLayerChanged(d, maxLayers.spatial) - }) + notifyLayer = maxLayers.spatial } + d.onMaxLayerChanged(d, notifyLayer) } if d.onSubscriptionChanged != nil { @@ -593,25 +580,14 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtpStats.Stop() d.logger.Debugw("rtp stats", "stats", d.rtpStats.ToString()) - if d.callbacksQueue.IsStarted() { - if d.kind == webrtc.RTPCodecTypeVideo { - d.callbacksQueue.Enqueue(func() { - if d.onMaxLayerChanged != nil { - d.onMaxLayerChanged(d, InvalidLayerSpatial) - } - }) - } - - if d.onCloseHandler != nil { - d.callbacksQueue.Enqueue(d.onCloseHandler) - } - - d.callbacksQueue.Stop() - } else { - if d.onCloseHandler != nil { - d.onCloseHandler() - } + if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { + d.onMaxLayerChanged(d, InvalidLayerSpatial) } + + if d.onCloseHandler != nil { + d.onCloseHandler() + } + d.stopKeyFrameRequester() }) } @@ -630,9 +606,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { // a. is higher than previous max -> client may need to start higher layer before forwarder can lock // b. is lower than previous max -> client can stop higher layer(s) // - d.callbacksQueue.Enqueue(func() { - d.onMaxLayerChanged(d, maxLayers.spatial) - }) + d.onMaxLayerChanged(d, maxLayers.spatial) } if d.onSubscribedLayersChanged != nil { @@ -1032,9 +1006,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } if d.onRttUpdate != nil { - d.callbacksQueue.Enqueue(func() { - d.onRttUpdate(d, rttToReport) - }) + d.onRttUpdate(d, rttToReport) } } }