Remove callbacks queue from sfu/DownTrack (#655)

* Remove callbacks queue from sfu/DownTrack

- Connection stats callback was happening in connection stats go routine
- RTT update launches a goroutine on the receive side as it affects the
  subscriber. So, no need to queue it.
- Changed two things
  o Move close handler to goroutine. It is better that way as it touches
the subscriber as well
  o Move max layer handling into a goroutine also so that the callback
does minimal work.

With this all the send side callback queues are removed.

* small clean up
This commit is contained in:
Raja Subramanian
2022-04-25 13:28:24 +05:30
committed by GitHub
parent f0f66a2d75
commit 2f902a3edc
2 changed files with 81 additions and 100 deletions

View File

@@ -205,7 +205,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code
})
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
t.notifySubscriberMaxQuality(subscriberID, QualityForSpatialLayer(layer))
go t.notifySubscriberMaxQuality(subscriberID, QualityForSpatialLayer(layer))
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
@@ -213,48 +213,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code
})
downTrack.OnCloseHandler(func() {
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, subscriberID)
delete(t.pendingClose, subscriberID)
t.subscribedTracksMu.Unlock()
t.maybeNotifyNoSubscribers()
t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}
// if the source has been terminated, we'll need to terminate all the subscribed tracks
// however, if the dest sub has disconnected, then we can skip
if sender == nil {
return
}
t.params.Logger.Debugw("removing peerconnection track",
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
"kind", t.params.MediaTrack.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
// sub closing, can skip removing subscribedtracks
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
t.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
)
}
}
sub.RemoveSubscribedTrack(subTrack)
sub.Negotiate()
go t.downTrackClosed(sub, subTrack, sender)
})
t.subscribedTracksMu.Lock()
@@ -588,3 +547,53 @@ func (t *MediaTrackSubscriptions) maybeNotifyNoSubscribers() {
t.onNoSubscribers()
}
}
func (t *MediaTrackSubscriptions) downTrackClosed(
sub types.LocalParticipant,
subTrack types.SubscribedTrack,
sender *webrtc.RTPSender,
) {
subscriberID := sub.ID()
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, subscriberID)
delete(t.pendingClose, subscriberID)
t.subscribedTracksMu.Unlock()
t.maybeNotifyNoSubscribers()
t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}
// if the source has been terminated, we'll need to terminate all the subscribed tracks
// however, if the dest sub has disconnected, then we can skip
if sender == nil {
return
}
t.params.Logger.Debugw("removing peerconnection track",
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
"kind", t.params.MediaTrack.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
// sub closing, can skip removing subscribedtracks
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
t.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
)
}
}
sub.RemoveSubscribedTrack(subTrack)
sub.Negotiate()
}

View File

@@ -21,7 +21,6 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/utils"
)
// TrackSender defines an interface send media to remote peer
@@ -118,8 +117,6 @@ type DownTrack struct {
isNACKThrottled atomic.Bool
callbacksQueue *utils.OpsQueue
// RTCP callbacks
onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC)
@@ -172,17 +169,16 @@ func NewDownTrack(
}
d := &DownTrack{
logger: logger,
id: r.TrackID(),
peerID: peerID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
codec: c,
kind: kind,
forwarder: NewForwarder(c, kind, logger),
callbacksQueue: utils.NewOpsQueue(logger, "sfu-downtrack", 50),
logger: logger,
id: r.TrackID(),
peerID: peerID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
codec: c,
kind: kind,
forwarder: NewForwarder(c, kind, logger),
}
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
@@ -196,9 +192,7 @@ func NewDownTrack(
})
d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
if d.onStatsUpdate != nil {
d.callbacksQueue.Enqueue(func() {
d.onStatsUpdate(d, stat)
})
d.onStatsUpdate(d, stat)
}
})
@@ -226,8 +220,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec
}
d.callbacksQueue.Start()
d.ssrc = uint32(t.SSRC())
d.payloadType = uint8(codec.PayloadType)
d.writeStream = t.WriteStream()
@@ -240,10 +232,11 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
if strings.HasPrefix(d.codec.MimeType, "video/") {
d.sequencer = newSequencer(d.maxTrack, d.logger)
}
if d.onBind != nil {
d.callbacksQueue.Enqueue(d.onBind)
}
d.bound.Store(true)
if d.onBind != nil {
d.onBind()
}
d.connectionStats.Start()
d.logger.Debugw("bound")
@@ -402,9 +395,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
if tp.isSwitchingToMaxLayer && d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.callbacksQueue.Enqueue(func() {
d.onMaxLayerChanged(d, layer)
})
d.onMaxLayerChanged(d, layer)
}
if extPkt.KeyFrame {
@@ -531,20 +522,16 @@ func (d *DownTrack) Mute(muted bool) {
}
if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
if muted {
d.callbacksQueue.Enqueue(func() {
d.onMaxLayerChanged(d, InvalidLayerSpatial)
})
} else {
notifyLayer := InvalidLayerSpatial
if !muted {
//
// When unmuting, don't wait for layer lock as
// client might need to be notified to start layers
// before locking can happen in the forwarder.
//
d.callbacksQueue.Enqueue(func() {
d.onMaxLayerChanged(d, maxLayers.spatial)
})
notifyLayer = maxLayers.spatial
}
d.onMaxLayerChanged(d, notifyLayer)
}
if d.onSubscriptionChanged != nil {
@@ -593,25 +580,14 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.rtpStats.Stop()
d.logger.Debugw("rtp stats", "stats", d.rtpStats.ToString())
if d.callbacksQueue.IsStarted() {
if d.kind == webrtc.RTPCodecTypeVideo {
d.callbacksQueue.Enqueue(func() {
if d.onMaxLayerChanged != nil {
d.onMaxLayerChanged(d, InvalidLayerSpatial)
}
})
}
if d.onCloseHandler != nil {
d.callbacksQueue.Enqueue(d.onCloseHandler)
}
d.callbacksQueue.Stop()
} else {
if d.onCloseHandler != nil {
d.onCloseHandler()
}
if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.onMaxLayerChanged(d, InvalidLayerSpatial)
}
if d.onCloseHandler != nil {
d.onCloseHandler()
}
d.stopKeyFrameRequester()
})
}
@@ -630,9 +606,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
// a. is higher than previous max -> client may need to start higher layer before forwarder can lock
// b. is lower than previous max -> client can stop higher layer(s)
//
d.callbacksQueue.Enqueue(func() {
d.onMaxLayerChanged(d, maxLayers.spatial)
})
d.onMaxLayerChanged(d, maxLayers.spatial)
}
if d.onSubscribedLayersChanged != nil {
@@ -1032,9 +1006,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
if d.onRttUpdate != nil {
d.callbacksQueue.Enqueue(func() {
d.onRttUpdate(d, rttToReport)
})
d.onRttUpdate(d, rttToReport)
}
}
}