Refactoring channel handling (#2532)

* Refactoring channel handling

* Add a version to SubscriberID()
This commit is contained in:
Raja Subramanian
2024-03-01 15:08:07 +05:30
committed by GitHub
parent 95011d64f8
commit 9e334c7510
2 changed files with 16 additions and 11 deletions
+1 -1
View File
@@ -190,7 +190,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
if t.onSubscriberMaxQualityChange != nil {
t.onSubscriberMaxQualityChange(subscriberID, dt.Codec(), layer)
t.onSubscriberMaxQualityChange(dt.SubscriberID(), dt.Codec(), layer)
}
})
+15 -10
View File
@@ -286,6 +286,8 @@ type DownTrack struct {
onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32)
onRttUpdate func(dt *DownTrack, rtt uint32)
onCloseHandler func(willBeResumed bool)
createdAt int64
}
// NewDownTrack returns a DownTrack.
@@ -310,6 +312,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
pacer: params.Pacer,
maxLayerNotifierCh: make(chan string, 1),
keyFrameRequesterCh: make(chan struct{}, 1),
createdAt: time.Now().UnixNano(),
}
d.forwarder = NewForwarder(
d.kind,
@@ -524,7 +527,10 @@ func (d *DownTrack) Codec() webrtc.RTPCodecCapability { return d.codec }
// StreamID is the group this track belongs too. This must be unique
func (d *DownTrack) StreamID() string { return d.params.StreamID }
func (d *DownTrack) SubscriberID() livekit.ParticipantID { return d.params.SubID }
func (d *DownTrack) SubscriberID() livekit.ParticipantID {
// add `createdAt` to ensure repeated subscriptions from same subscrober to same publisher does not collide
return livekit.ParticipantID(fmt.Sprintf("%s:%d", d.params.SubID, d.createdAt))
}
// Sets RTP header extensions for this track
func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter) {
@@ -659,21 +665,20 @@ func (d *DownTrack) postMaxLayerNotifierEvent(event string) {
}
func (d *DownTrack) maxLayerNotifierWorker() {
var event string
more := true
for more {
event, more = <-d.maxLayerNotifierCh
for event := range d.maxLayerNotifierCh {
maxLayerSpatial := d.forwarder.GetMaxSubscribedSpatial()
d.params.Logger.Debugw("max subscribed layer processed", "layer", maxLayerSpatial, "event", event)
maxLayerSpatial := buffer.InvalidLayerSpatial
if more {
maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial()
d.params.Logger.Debugw("max subscribed layer processed", "layer", maxLayerSpatial, "event", event)
}
if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil {
d.params.Logger.Debugw("notifying max subscribed layer", "layer", maxLayerSpatial, "event", event)
onMaxSubscribedLayerChanged(d, maxLayerSpatial)
}
}
if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil {
d.params.Logger.Debugw("notifying max subscribed layer", "layer", buffer.InvalidLayerSpatial, "event", "close")
onMaxSubscribedLayerChanged(d, buffer.InvalidLayerSpatial)
}
}
// WriteRTP writes an RTP Packet to the DownTrack