diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0a7baf472..0d59ff649 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -137,10 +137,10 @@ type DownTrack struct { onSubscribedLayersChanged func(dt *DownTrack, layers VideoLayers) // packet sent callback - onPacketSentUnsafe []func(dt *DownTrack, size int) + onPacketSent []func(dt *DownTrack, size int) // padding packet sent callback - onPaddingSentUnsafe []func(dt *DownTrack, size int) + onPaddingSent []func(dt *DownTrack, size int) // update stats onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) @@ -397,7 +397,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, err = d.writeStream.WriteRTP(hdr, payload) if err == nil { pktSize := hdr.MarshalSize() + len(payload) - for _, f := range d.onPacketSentUnsafe { + for _, f := range d.onPacketSent { f(d, pktSize) } @@ -503,7 +503,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { } size := hdr.MarshalSize() + len(payload) - for _, f := range d.onPaddingSentUnsafe { + for _, f := range d.onPaddingSent { f(d, size) } d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano()) @@ -548,9 +548,7 @@ func (d *DownTrack) Mute(muted bool) { } if d.onSubscriptionChanged != nil { - d.callbacksQueue.Enqueue(func() { - d.onSubscriptionChanged(d) - }) + d.onSubscriptionChanged(d) } } @@ -638,9 +636,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { } if d.onSubscribedLayersChanged != nil { - d.callbacksQueue.Enqueue(func() { - d.onSubscribedLayersChanged(d, maxLayers) - }) + d.onSubscribedLayersChanged(d, maxLayers) } } @@ -651,9 +647,7 @@ func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { } if d.onSubscribedLayersChanged != nil { - d.callbacksQueue.Enqueue(func() { - d.onSubscribedLayersChanged(d, maxLayers) - }) + d.onSubscribedLayersChanged(d, maxLayers) } } @@ -669,17 +663,13 @@ func (d *DownTrack) UpTrackLayersChange(availableLayers []int32) { d.forwarder.UpTrackLayersChange(availableLayers) if d.onAvailableLayersChanged != nil { - d.callbacksQueue.Enqueue(func() { - d.onAvailableLayersChanged(d) - }) + d.onAvailableLayersChanged(d) } } func (d *DownTrack) UpTrackBitrateAvailabilityChange() { if d.onBitrateAvailabilityChanged != nil { - d.callbacksQueue.Enqueue(func() { - d.onBitrateAvailabilityChanged(d) - }) + d.onBitrateAvailabilityChanged(d) } } @@ -723,12 +713,12 @@ func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers Vide d.onSubscribedLayersChanged = fn } -func (d *DownTrack) OnPacketSentUnsafe(fn func(dt *DownTrack, size int)) { - d.onPacketSentUnsafe = append(d.onPacketSentUnsafe, fn) +func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) { + d.onPacketSent = append(d.onPacketSent, fn) } -func (d *DownTrack) OnPaddingSentUnsafe(fn func(dt *DownTrack, size int)) { - d.onPaddingSentUnsafe = append(d.onPaddingSentUnsafe, fn) +func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) { + d.onPaddingSent = append(d.onPaddingSent, fn) } func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) { @@ -891,7 +881,7 @@ func (d *DownTrack) writeBlankFrameRTP() error { return err } - for _, f := range d.onPacketSentUnsafe { + for _, f := range d.onPacketSent { f(d, pktSize) } @@ -986,9 +976,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { case *rtcp.ReceiverEstimatedMaximumBitrate: if d.onREMB != nil { - d.callbacksQueue.Enqueue(func() { - d.onREMB(d, p) - }) + d.onREMB(d, p) } case *rtcp.ReceiverReport: @@ -1029,9 +1017,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { case *rtcp.TransportLayerCC: if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil { - d.callbacksQueue.Enqueue(func() { - d.onTransportCCFeedback(d, p) - }) + d.onTransportCCFeedback(d, p) } } } @@ -1149,7 +1135,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { d.logger.Errorw("writing rtx packet err", err) } else { pktSize := pkt.Header.MarshalSize() + len(payload) - for _, f := range d.onPacketSentUnsafe { + for _, f := range d.onPacketSent { f(d, pktSize) } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 1850c650d..39200c28e 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -243,7 +243,7 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams) downTrack.OnBitrateAvailabilityChanged(s.onBitrateAvailabilityChanged) downTrack.OnSubscriptionChanged(s.onSubscriptionChanged) downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged) - downTrack.OnPacketSentUnsafe(s.onPacketSent) + downTrack.OnPacketSent(s.onPacketSent) } }