Do not send down track callbacks to streamallocator via queue (#651)

This commit is contained in:
Raja Subramanian
2022-04-24 21:41:58 +05:30
committed by GitHub
parent 0a802077dd
commit 4d7950ec2f
2 changed files with 18 additions and 32 deletions
+17 -31
View File
@@ -137,10 +137,10 @@ type DownTrack struct {
onSubscribedLayersChanged func(dt *DownTrack, layers VideoLayers)
// packet sent callback
onPacketSentUnsafe []func(dt *DownTrack, size int)
onPacketSent []func(dt *DownTrack, size int)
// padding packet sent callback
onPaddingSentUnsafe []func(dt *DownTrack, size int)
onPaddingSent []func(dt *DownTrack, size int)
// update stats
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
@@ -397,7 +397,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
_, err = d.writeStream.WriteRTP(hdr, payload)
if err == nil {
pktSize := hdr.MarshalSize() + len(payload)
for _, f := range d.onPacketSentUnsafe {
for _, f := range d.onPacketSent {
f(d, pktSize)
}
@@ -503,7 +503,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
}
size := hdr.MarshalSize() + len(payload)
for _, f := range d.onPaddingSentUnsafe {
for _, f := range d.onPaddingSent {
f(d, size)
}
d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano())
@@ -548,9 +548,7 @@ func (d *DownTrack) Mute(muted bool) {
}
if d.onSubscriptionChanged != nil {
d.callbacksQueue.Enqueue(func() {
d.onSubscriptionChanged(d)
})
d.onSubscriptionChanged(d)
}
}
@@ -638,9 +636,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
}
if d.onSubscribedLayersChanged != nil {
d.callbacksQueue.Enqueue(func() {
d.onSubscribedLayersChanged(d, maxLayers)
})
d.onSubscribedLayersChanged(d, maxLayers)
}
}
@@ -651,9 +647,7 @@ func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) {
}
if d.onSubscribedLayersChanged != nil {
d.callbacksQueue.Enqueue(func() {
d.onSubscribedLayersChanged(d, maxLayers)
})
d.onSubscribedLayersChanged(d, maxLayers)
}
}
@@ -669,17 +663,13 @@ func (d *DownTrack) UpTrackLayersChange(availableLayers []int32) {
d.forwarder.UpTrackLayersChange(availableLayers)
if d.onAvailableLayersChanged != nil {
d.callbacksQueue.Enqueue(func() {
d.onAvailableLayersChanged(d)
})
d.onAvailableLayersChanged(d)
}
}
func (d *DownTrack) UpTrackBitrateAvailabilityChange() {
if d.onBitrateAvailabilityChanged != nil {
d.callbacksQueue.Enqueue(func() {
d.onBitrateAvailabilityChanged(d)
})
d.onBitrateAvailabilityChanged(d)
}
}
@@ -723,12 +713,12 @@ func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers Vide
d.onSubscribedLayersChanged = fn
}
func (d *DownTrack) OnPacketSentUnsafe(fn func(dt *DownTrack, size int)) {
d.onPacketSentUnsafe = append(d.onPacketSentUnsafe, fn)
func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) {
d.onPacketSent = append(d.onPacketSent, fn)
}
func (d *DownTrack) OnPaddingSentUnsafe(fn func(dt *DownTrack, size int)) {
d.onPaddingSentUnsafe = append(d.onPaddingSentUnsafe, fn)
func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) {
d.onPaddingSent = append(d.onPaddingSent, fn)
}
func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) {
@@ -891,7 +881,7 @@ func (d *DownTrack) writeBlankFrameRTP() error {
return err
}
for _, f := range d.onPacketSentUnsafe {
for _, f := range d.onPacketSent {
f(d, pktSize)
}
@@ -986,9 +976,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
case *rtcp.ReceiverEstimatedMaximumBitrate:
if d.onREMB != nil {
d.callbacksQueue.Enqueue(func() {
d.onREMB(d, p)
})
d.onREMB(d, p)
}
case *rtcp.ReceiverReport:
@@ -1029,9 +1017,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
case *rtcp.TransportLayerCC:
if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil {
d.callbacksQueue.Enqueue(func() {
d.onTransportCCFeedback(d, p)
})
d.onTransportCCFeedback(d, p)
}
}
}
@@ -1149,7 +1135,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
d.logger.Errorw("writing rtx packet err", err)
} else {
pktSize := pkt.Header.MarshalSize() + len(payload)
for _, f := range d.onPacketSentUnsafe {
for _, f := range d.onPacketSent {
f(d, pktSize)
}
+1 -1
View File
@@ -243,7 +243,7 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
downTrack.OnBitrateAvailabilityChanged(s.onBitrateAvailabilityChanged)
downTrack.OnSubscriptionChanged(s.onSubscriptionChanged)
downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged)
downTrack.OnPacketSentUnsafe(s.onPacketSent)
downTrack.OnPacketSent(s.onPacketSent)
}
}