From 9e334c751099d142c1048331e43814ea10336dc4 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 1 Mar 2024 15:08:07 +0530 Subject: [PATCH] Refactoring channel handling (#2532) * Refactoring channel handling * Add a version to SubscriberID() --- pkg/rtc/mediatracksubscriptions.go | 2 +- pkg/sfu/downtrack.go | 25 +++++++++++++++---------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 3f92e922d..652e2fcfb 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -190,7 +190,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) { if t.onSubscriberMaxQualityChange != nil { - t.onSubscriberMaxQualityChange(subscriberID, dt.Codec(), layer) + t.onSubscriberMaxQualityChange(dt.SubscriberID(), dt.Codec(), layer) } }) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5cff248bb..23f7433cb 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -286,6 +286,8 @@ type DownTrack struct { onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32) onRttUpdate func(dt *DownTrack, rtt uint32) onCloseHandler func(willBeResumed bool) + + createdAt int64 } // NewDownTrack returns a DownTrack. @@ -310,6 +312,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { pacer: params.Pacer, maxLayerNotifierCh: make(chan string, 1), keyFrameRequesterCh: make(chan struct{}, 1), + createdAt: time.Now().UnixNano(), } d.forwarder = NewForwarder( d.kind, @@ -524,7 +527,10 @@ func (d *DownTrack) Codec() webrtc.RTPCodecCapability { return d.codec } // StreamID is the group this track belongs too. This must be unique func (d *DownTrack) StreamID() string { return d.params.StreamID } -func (d *DownTrack) SubscriberID() livekit.ParticipantID { return d.params.SubID } +func (d *DownTrack) SubscriberID() livekit.ParticipantID { + // add `createdAt` to ensure repeated subscriptions from same subscrober to same publisher does not collide + return livekit.ParticipantID(fmt.Sprintf("%s:%d", d.params.SubID, d.createdAt)) +} // Sets RTP header extensions for this track func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter) { @@ -659,21 +665,20 @@ func (d *DownTrack) postMaxLayerNotifierEvent(event string) { } func (d *DownTrack) maxLayerNotifierWorker() { - var event string - more := true - for more { - event, more = <-d.maxLayerNotifierCh + for event := range d.maxLayerNotifierCh { + maxLayerSpatial := d.forwarder.GetMaxSubscribedSpatial() + d.params.Logger.Debugw("max subscribed layer processed", "layer", maxLayerSpatial, "event", event) - maxLayerSpatial := buffer.InvalidLayerSpatial - if more { - maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial() - d.params.Logger.Debugw("max subscribed layer processed", "layer", maxLayerSpatial, "event", event) - } if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil { d.params.Logger.Debugw("notifying max subscribed layer", "layer", maxLayerSpatial, "event", event) onMaxSubscribedLayerChanged(d, maxLayerSpatial) } } + + if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil { + d.params.Logger.Debugw("notifying max subscribed layer", "layer", buffer.InvalidLayerSpatial, "event", "close") + onMaxSubscribedLayerChanged(d, buffer.InvalidLayerSpatial) + } } // WriteRTP writes an RTP Packet to the DownTrack