mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Rework receiver restart. (#4202)
* Rework receiver restart. - Protect against concurrent restarts - Clean up and consolidate code around restarts - Use `RestartStream` of buffer rather than creating new buffers. * fix test
This commit is contained in:
@@ -77,7 +77,6 @@ type ExtPacket struct {
|
||||
AbsCaptureTimeExt *act.AbsCaptureTime
|
||||
IsOutOfOrder bool
|
||||
IsBuffered bool
|
||||
IsRestart bool
|
||||
}
|
||||
|
||||
// VideoSize represents video resolution
|
||||
@@ -111,9 +110,10 @@ type BufferProvider interface {
|
||||
GetSenderReportData() *livekit.RTCPSenderReportState
|
||||
|
||||
OnRtcpSenderReport(fn func())
|
||||
OnFpsChanged(f func())
|
||||
OnFpsChanged(fn func())
|
||||
OnVideoSizeChanged(fn func([]VideoSize))
|
||||
OnCodecChange(fn func(webrtc.RTPCodecParameters))
|
||||
OnStreamRestart(fn func(string))
|
||||
|
||||
StartKeyFrameSeeder()
|
||||
StopKeyFrameSeeder()
|
||||
@@ -128,6 +128,8 @@ type BufferProvider interface {
|
||||
oobSequenceNumber uint16,
|
||||
) (uint64, error)
|
||||
|
||||
RestartStream(reason string)
|
||||
|
||||
CloseWithReason(reason string) (*livekit.RTPStats, error)
|
||||
}
|
||||
|
||||
@@ -190,6 +192,7 @@ type BufferBase struct {
|
||||
onFpsChanged func()
|
||||
onVideoSizeChanged func([]VideoSize)
|
||||
onCodecChange func(webrtc.RTPCodecParameters)
|
||||
onStreamRestart func(string)
|
||||
|
||||
// video size tracking for multiple spatial layers
|
||||
currentVideoSize [DefaultMaxLayerSpatial + 1]VideoSize
|
||||
@@ -212,7 +215,7 @@ type BufferBase struct {
|
||||
|
||||
keyFrameSeederGeneration atomic.Int32
|
||||
|
||||
restartOnNextPacket bool
|
||||
isRestartPending bool
|
||||
|
||||
isClosed atomic.Bool
|
||||
}
|
||||
@@ -479,8 +482,16 @@ func (b *BufferBase) stopRTPStats(reason string) (stats *livekit.RTPStats, stats
|
||||
return
|
||||
}
|
||||
|
||||
func (b *BufferBase) restartStream() {
|
||||
b.logger.Infow("stream restart")
|
||||
func (b *BufferBase) RestartStream(reason string) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
b.restartStreamLocked(reason)
|
||||
b.readCond.Broadcast()
|
||||
}
|
||||
|
||||
func (b *BufferBase) restartStreamLocked(reason string) {
|
||||
b.logger.Infow("stream restart", "reason", reason)
|
||||
|
||||
// stop
|
||||
b.StopKeyFrameSeeder()
|
||||
@@ -509,13 +520,18 @@ func (b *BufferBase) restartStream() {
|
||||
b.createDDParserAndFrameRateCalculator()
|
||||
}
|
||||
|
||||
b.frameRateCalculated = false
|
||||
if b.frameRateCalculator[0] == nil {
|
||||
b.createFrameRateCalculator()
|
||||
}
|
||||
|
||||
b.StartKeyFrameSeeder()
|
||||
|
||||
b.restartOnNextPacket = true
|
||||
b.isRestartPending = true
|
||||
|
||||
if f := b.onStreamRestart; f != nil {
|
||||
go f(reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BufferBase) createDDParserAndFrameRateCalculator() {
|
||||
@@ -559,6 +575,12 @@ func (b *BufferBase) ReadExtended(buf []byte) (*ExtPacket, error) {
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
if b.isRestartPending {
|
||||
b.isRestartPending = false
|
||||
b.Unlock()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if b.extPackets.Len() > 0 {
|
||||
ep := b.extPackets.PopFront()
|
||||
patched := b.patchExtPacket(ep, buf)
|
||||
@@ -731,7 +753,7 @@ func (b *BufferBase) HandleIncomingPacketLocked(
|
||||
return 0, fmt.Errorf("unhandled reason: %s", flowState.UnhandledReason.String())
|
||||
}
|
||||
|
||||
b.restartStream()
|
||||
b.restartStreamLocked("discontinuity")
|
||||
|
||||
flowState = b.rtpStats.Update(
|
||||
arrivalTime,
|
||||
@@ -835,7 +857,7 @@ func (b *BufferBase) HandleIncomingPacketLocked(
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, b.restartOnNextPacket, flowState)
|
||||
ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, flowState)
|
||||
if ep == nil {
|
||||
return 0, errors.New("could not get ext packet")
|
||||
}
|
||||
@@ -856,7 +878,6 @@ func (b *BufferBase) HandleIncomingPacketLocked(
|
||||
b.updateNACKState(rtpPacket.SequenceNumber, flowState)
|
||||
}
|
||||
|
||||
b.restartOnNextPacket = false
|
||||
return ep.ExtSequenceNumber, nil
|
||||
}
|
||||
|
||||
@@ -931,6 +952,7 @@ func (b *BufferBase) handleCodecChange(newPT uint8) {
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
b.logger.Infow(
|
||||
"codec changed",
|
||||
"oldPayload", b.payloadType, "newPayload", newPT,
|
||||
@@ -940,30 +962,18 @@ func (b *BufferBase) handleCodecChange(newPT uint8) {
|
||||
b.payloadType = newPT
|
||||
b.rtxPayloadType = rtxPt
|
||||
b.mime = mime.NormalizeMimeType(newCodec.MimeType)
|
||||
b.frameRateCalculated = false
|
||||
|
||||
if b.ddExtID != 0 {
|
||||
b.createDDParserAndFrameRateCalculator()
|
||||
}
|
||||
|
||||
if b.frameRateCalculator[0] == nil {
|
||||
b.createFrameRateCalculator()
|
||||
}
|
||||
|
||||
b.bucket.ResyncOnNextPacket()
|
||||
b.restartStreamLocked("codec-change")
|
||||
|
||||
if f := b.onCodecChange; f != nil {
|
||||
go f(newCodec)
|
||||
}
|
||||
|
||||
b.StartKeyFrameSeeder()
|
||||
}
|
||||
|
||||
func (b *BufferBase) getExtPacket(
|
||||
rtpPacket *rtp.Packet,
|
||||
arrivalTime int64,
|
||||
isBuffered bool,
|
||||
isRestart bool,
|
||||
flowState rtpstats.RTPFlowState,
|
||||
) *ExtPacket {
|
||||
ep := ExtPacketFactory.Get().(*ExtPacket)
|
||||
@@ -978,7 +988,6 @@ func (b *BufferBase) getExtPacket(
|
||||
},
|
||||
IsOutOfOrder: flowState.IsOutOfOrder,
|
||||
IsBuffered: isBuffered,
|
||||
IsRestart: isRestart,
|
||||
}
|
||||
|
||||
if len(ep.Packet.Payload) == 0 {
|
||||
@@ -1387,6 +1396,12 @@ func (b *BufferBase) OnCodecChange(fn func(webrtc.RTPCodecParameters)) {
|
||||
b.Unlock()
|
||||
}
|
||||
|
||||
func (b *BufferBase) OnStreamRestart(fn func(string)) {
|
||||
b.Lock()
|
||||
b.onStreamRestart = fn
|
||||
b.Unlock()
|
||||
}
|
||||
|
||||
// checkVideoSizeChange checks if video size has changed for a specific spatial layer and fires callback
|
||||
func (b *BufferBase) checkVideoSizeChange(videoSizes []VideoSize) {
|
||||
if len(videoSizes) > len(b.currentVideoSize) {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package buffer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -330,6 +331,7 @@ func TestCodecChange(t *testing.T) {
|
||||
buff := NewBuffer(123, 1, 1)
|
||||
require.NotNil(t, buff)
|
||||
changedCodec := make(chan webrtc.RTPCodecParameters, 1)
|
||||
restartCleared := make(chan struct{}, 1)
|
||||
buff.OnCodecChange(func(rp webrtc.RTPCodecParameters) {
|
||||
select {
|
||||
case changedCodec <- rp:
|
||||
@@ -337,6 +339,17 @@ func TestCodecChange(t *testing.T) {
|
||||
t.Fatalf("codec change not consumed")
|
||||
}
|
||||
})
|
||||
buff.OnStreamRestart(func(reason string) {
|
||||
require.Equal(t, "codec-change", reason)
|
||||
|
||||
// read once to clear pending restart
|
||||
var buf [1500]byte
|
||||
extPkt, err := buff.ReadExtended(buf[:])
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, extPkt)
|
||||
|
||||
restartCleared <- struct{}{}
|
||||
})
|
||||
|
||||
h265Pkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
@@ -359,20 +372,28 @@ func TestCodecChange(t *testing.T) {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
buff.Bind(webrtc.RTPParameters{
|
||||
HeaderExtensions: nil,
|
||||
Codecs: []webrtc.RTPCodecParameters{vp8Codec, h265Codec},
|
||||
}, vp8Codec.RTPCodecCapability, 0)
|
||||
// Bind sets up VP8 as expected codec,
|
||||
// packet written to the buffer above before Bind is H.265,
|
||||
// that should trigger a codec change and a stream restart
|
||||
// when the queued packets from Write before Bind get flushed
|
||||
buff.Bind(
|
||||
webrtc.RTPParameters{
|
||||
HeaderExtensions: nil,
|
||||
Codecs: []webrtc.RTPCodecParameters{vp8Codec, h265Codec},
|
||||
},
|
||||
vp8Codec.RTPCodecCapability,
|
||||
0,
|
||||
)
|
||||
|
||||
select {
|
||||
case c := <-changedCodec:
|
||||
require.Equal(t, h265Codec, c)
|
||||
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("expected codec change")
|
||||
}
|
||||
<-restartCleared
|
||||
|
||||
// codec change after bind
|
||||
// second codec change - writing VP8 packet after Bind should trigger another codec change
|
||||
vp8Pkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
@@ -391,12 +412,17 @@ func TestCodecChange(t *testing.T) {
|
||||
select {
|
||||
case c := <-changedCodec:
|
||||
require.Equal(t, vp8Codec, c)
|
||||
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("expected codec change")
|
||||
}
|
||||
fmt.Printf("done second codec change\n") // REMOVE
|
||||
<-restartCleared
|
||||
|
||||
// out of order pkts can't cause codec change
|
||||
// rewrite the VP8 packet to start the sequence after a stream restart
|
||||
_, err = buff.Write(buf)
|
||||
require.NoError(t, err)
|
||||
|
||||
h265Pkt.SequenceNumber = 2
|
||||
h265Pkt.Timestamp = 2
|
||||
buf, err = h265Pkt.Marshal()
|
||||
@@ -409,7 +435,7 @@ func TestCodecChange(t *testing.T) {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
// unknown codec should not cause change
|
||||
// unknown codec should not cause change even if it is in order
|
||||
h265Pkt.SequenceNumber = 4
|
||||
h265Pkt.Timestamp = 4
|
||||
h265Pkt.PayloadType = 117
|
||||
@@ -422,6 +448,22 @@ func TestCodecChange(t *testing.T) {
|
||||
t.Fatalf("unexpected codec change")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
// an in-order packet should change codec again
|
||||
h265Pkt.SequenceNumber = 5
|
||||
h265Pkt.Timestamp = 5
|
||||
h265Pkt.PayloadType = 116
|
||||
buf, err = h265Pkt.Marshal()
|
||||
require.NoError(t, err)
|
||||
_, err = buff.Write(buf)
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case c := <-changedCodec:
|
||||
require.Equal(t, h265Codec, c)
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("expected codec change")
|
||||
}
|
||||
<-restartCleared
|
||||
}
|
||||
|
||||
func BenchmarkMemcpu(b *testing.B) {
|
||||
|
||||
@@ -1276,8 +1276,8 @@ func (d *DownTrack) handleMute(muted bool, changed bool) {
|
||||
// no layers required due to publisher mute (bit of circular dependency),
|
||||
// there will be a delay in layers turning back on when unmute happens.
|
||||
// Unmute path will require
|
||||
// 1. unmute signalling out-of-band from publisher received by down track(s)
|
||||
// 2. down track(s) notifying max layer
|
||||
// 1. unmute signalling out-of-band from publisher received by downtrack(s)
|
||||
// 2. downtrack(s) notifying max layer
|
||||
// 3. out-of-band notification about max layer sent back to the publisher
|
||||
// 4. publisher starts layer(s)
|
||||
// Ideally, on publisher mute, whatever layers were active remain active and
|
||||
@@ -1331,7 +1331,7 @@ func (d *DownTrack) CloseWithFlush(flush bool, isEnding bool) {
|
||||
return
|
||||
}
|
||||
|
||||
d.params.Logger.Debugw("close down track", "flushBlankFrame", flush)
|
||||
d.params.Logger.Debugw("close downtrack", "flushBlankFrame", flush)
|
||||
if d.bindState.Load() == bindStateBound {
|
||||
d.forwarder.Mute(true, true)
|
||||
|
||||
@@ -1447,7 +1447,7 @@ func (d *DownTrack) SeedState(state DownTrackState) {
|
||||
}
|
||||
|
||||
if state.RTPStats != nil || state.ForwarderState != nil {
|
||||
d.params.Logger.Debugw("seeding down track state", "state", state)
|
||||
d.params.Logger.Debugw("seeding downtrack state", "state", state)
|
||||
}
|
||||
if state.RTPStats != nil {
|
||||
d.rtpStats.Seed(state.RTPStats)
|
||||
|
||||
@@ -82,10 +82,6 @@ var (
|
||||
|
||||
// --------------------------------------
|
||||
|
||||
type AudioLevelHandle func(level uint8, duration uint32)
|
||||
|
||||
// --------------------------------------
|
||||
|
||||
type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64
|
||||
|
||||
// --------------------------------------
|
||||
@@ -223,6 +219,10 @@ type ReceiverBase struct {
|
||||
|
||||
redTransformer atomic.Value // redTransformer interface
|
||||
|
||||
forwardersGeneration atomic.Uint32
|
||||
forwardersWaitGroup *sync.WaitGroup
|
||||
restartInProgress bool
|
||||
|
||||
isClosed atomic.Bool
|
||||
}
|
||||
|
||||
@@ -249,6 +249,8 @@ func NewReceiverBase(params ReceiverBaseParams, trackInfo *livekit.TrackInfo, co
|
||||
)
|
||||
r.streamTrackerManager.SetListener(r)
|
||||
|
||||
r.startForwarderGeneration()
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -345,33 +347,81 @@ func (r *ReceiverBase) UpdateTrackInfo(ti *livekit.TrackInfo) {
|
||||
}
|
||||
r.trackInfo = utils.CloneProto(ti)
|
||||
// MUTABLE-TRACKINFO-TODO: notify buffers, buffers may need to resize retransmission buffer if there is layer change
|
||||
|
||||
if shouldResync {
|
||||
r.resyncLocked("update-track-info")
|
||||
}
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
r.streamTrackerManager.UpdateTrackInfo(ti)
|
||||
|
||||
if shouldResync {
|
||||
r.Restart("update-track-info")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) Restart(reason string) {
|
||||
r.bufferMu.Lock()
|
||||
defer r.bufferMu.Unlock()
|
||||
|
||||
r.resyncLocked(reason)
|
||||
r.params.Logger.Infow("restarting receiver", "reason", reason)
|
||||
r.restartInternal(reason, false)
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) resyncLocked(reason string) {
|
||||
// resync to avoid gaps in the forwarded sequence number
|
||||
r.params.Logger.Debugw("resync receiver", "reason", reason)
|
||||
r.clearAllBuffersLocked("resync")
|
||||
func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
if r.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
// 1. guard against concurrent restarts
|
||||
r.bufferMu.Lock()
|
||||
if r.restartInProgress {
|
||||
r.bufferMu.Unlock()
|
||||
return
|
||||
}
|
||||
r.restartInProgress = true
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
// 2. restart all the buffers
|
||||
// if a stream was detected, skip external restart
|
||||
//
|
||||
// NOTE: The case of external restart and detected restart (which usually comes from one buffer)
|
||||
// racing will miss restart on all buffers if detected restart from one buffer adds the guard
|
||||
// against concurrent restart. But, that condition should be very rare if at all.
|
||||
// External restart happens when the underlying track changes or when seeking
|
||||
if !isDetected {
|
||||
for _, buff := range r.GetAllBuffers() {
|
||||
if buff == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
buff.RestartStream(reason)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. wait for the forwarders to finish
|
||||
r.stopForwarderGeneration()
|
||||
|
||||
// 4. reset stream tracker
|
||||
r.streamTrackerManager.RemoveAllTrackers()
|
||||
|
||||
// 5. signal attached downtracks to resync so that they can have proper sequencing on a receiver restart
|
||||
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
dt.Resync()
|
||||
dt.ReceiverRestart()
|
||||
})
|
||||
if rt := r.redTransformer.Load(); rt != nil {
|
||||
rt.(REDTransformer).ResyncDownTracks()
|
||||
rt.(REDTransformer).OnStreamRestart()
|
||||
}
|
||||
|
||||
// 6. move forwarder generation ahead
|
||||
r.startForwarderGeneration()
|
||||
|
||||
r.bufferMu.Lock()
|
||||
// 7. release restart hold
|
||||
r.restartInProgress = false
|
||||
|
||||
// 8. restart forwarders
|
||||
for layer, buff := range r.buffers {
|
||||
if buff == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
r.startForwarderForBufferLocked(int32(layer), buff)
|
||||
}
|
||||
r.bufferMu.Unlock()
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) OnMaxLayerChange(fn func(mimeType mime.MimeType, maxLayer int32)) {
|
||||
@@ -675,6 +725,9 @@ func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt
|
||||
if r.Kind() == webrtc.RTPCodecTypeVideo && layer == 0 {
|
||||
buff.OnCodecChange(r.handleCodecChange)
|
||||
}
|
||||
buff.OnStreamRestart(func(reason string) {
|
||||
r.restartInternal(reason, true)
|
||||
})
|
||||
|
||||
var duration time.Duration
|
||||
switch layer {
|
||||
@@ -705,8 +758,9 @@ func (r *ReceiverBase) AddBuffer(buff buffer.BufferProvider, layer int32) {
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) StartBuffer(buff buffer.BufferProvider, layer int32) {
|
||||
r.params.Logger.Debugw("starting forwarder", "layer", layer)
|
||||
go r.forwardRTP(layer, buff)
|
||||
r.bufferMu.Lock()
|
||||
r.startForwarderForBufferLocked(layer, buff)
|
||||
r.bufferMu.Unlock()
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) GetAllBuffers() [buffer.DefaultMaxLayerSpatial + 1]buffer.BufferProvider {
|
||||
@@ -723,18 +777,18 @@ func (r *ReceiverBase) GetAllBuffers() [buffer.DefaultMaxLayerSpatial + 1]buffer
|
||||
|
||||
func (r *ReceiverBase) ClearAllBuffers(reason string) {
|
||||
r.bufferMu.Lock()
|
||||
defer r.bufferMu.Unlock()
|
||||
|
||||
r.clearAllBuffersLocked(reason)
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) clearAllBuffersLocked(reason string) {
|
||||
for idx := range len(r.buffers) {
|
||||
if r.buffers[idx] != nil {
|
||||
r.buffers[idx].CloseWithReason(reason)
|
||||
}
|
||||
buffers := r.buffers
|
||||
for idx := range r.buffers {
|
||||
r.buffers[idx] = nil
|
||||
}
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
for _, buff := range buffers {
|
||||
if buff == nil {
|
||||
continue
|
||||
}
|
||||
buff.CloseWithReason(reason)
|
||||
}
|
||||
|
||||
r.streamTrackerManager.RemoveAllTrackers()
|
||||
}
|
||||
@@ -788,16 +842,58 @@ func (r *ReceiverBase) GetAudioLevel() (float64, bool) {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
func (r *ReceiverBase) startForwarderGeneration() {
|
||||
r.bufferMu.Lock()
|
||||
defer r.bufferMu.Unlock()
|
||||
|
||||
r.forwardersGeneration.Inc()
|
||||
r.forwardersWaitGroup = &sync.WaitGroup{}
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) stopForwarderGeneration() {
|
||||
r.bufferMu.Lock()
|
||||
r.forwardersGeneration.Inc()
|
||||
forwarderWaitGroup := r.forwardersWaitGroup
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
if forwarderWaitGroup != nil {
|
||||
forwarderWaitGroup.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) startForwarderForBufferLocked(layer int32, buff buffer.BufferProvider) {
|
||||
if r.restartInProgress {
|
||||
return
|
||||
}
|
||||
|
||||
r.forwardersWaitGroup.Add(1)
|
||||
|
||||
r.params.Logger.Debugw("starting forwarder", "layer", layer)
|
||||
go r.forwardRTP(layer, buff, r.forwardersGeneration.Load(), r.forwardersWaitGroup)
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) forwardRTP(
|
||||
layer int32,
|
||||
buff buffer.BufferProvider,
|
||||
forwarderGeneration uint32,
|
||||
wg *sync.WaitGroup,
|
||||
) {
|
||||
var (
|
||||
extPkt *buffer.ExtPacket
|
||||
err error
|
||||
)
|
||||
|
||||
numPacketsForwarded := 0
|
||||
numPacketsDropped := 0
|
||||
defer func() {
|
||||
if r.params.IsSelfClosing {
|
||||
r.Close("forwarder-done", false)
|
||||
if err == io.EOF {
|
||||
if r.params.IsSelfClosing {
|
||||
r.Close("forwarder-done", false)
|
||||
|
||||
r.streamTrackerManager.RemoveTracker(layer)
|
||||
if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
||||
r.streamTrackerManager.RemoveAllTrackers()
|
||||
r.streamTrackerManager.RemoveTracker(layer)
|
||||
if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM {
|
||||
r.streamTrackerManager.RemoveAllTrackers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -807,6 +903,7 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
"numPacketsForwarded", numPacketsForwarded,
|
||||
"numPacketsDropped", numPacketsDropped,
|
||||
)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
var spatialTrackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker
|
||||
@@ -817,29 +914,22 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
|
||||
pktBuf := make([]byte, bucket.RTPMaxPktSize)
|
||||
r.params.Logger.Debugw("starting forwarding", "layer", layer)
|
||||
for {
|
||||
pkt, err := buff.ReadExtended(pktBuf)
|
||||
|
||||
for r.forwardersGeneration.Load() == forwarderGeneration {
|
||||
extPkt, err = buff.ReadExtended(pktBuf)
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
if extPkt == nil {
|
||||
continue
|
||||
}
|
||||
dequeuedAt := mono.UnixNano()
|
||||
|
||||
if pkt.IsRestart {
|
||||
r.params.Logger.Infow("stream restarted", "layer", layer)
|
||||
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
dt.ReceiverRestart()
|
||||
})
|
||||
|
||||
if rt := r.redTransformer.Load(); rt != nil {
|
||||
rt.(REDTransformer).OnStreamRestart()
|
||||
}
|
||||
}
|
||||
|
||||
if pkt.Packet.PayloadType != uint8(r.params.Codec.PayloadType) {
|
||||
if extPkt.Packet.PayloadType != uint8(r.params.Codec.PayloadType) {
|
||||
// drop packets as we don't support codec fallback directly
|
||||
r.params.Logger.Debugw(
|
||||
"dropping packet - payload mismatch",
|
||||
"packetPayloadType", pkt.Packet.PayloadType,
|
||||
"packetPayloadType", extPkt.Packet.PayloadType,
|
||||
"payloadType", r.params.Codec.PayloadType,
|
||||
)
|
||||
numPacketsDropped++
|
||||
@@ -847,15 +937,15 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
}
|
||||
|
||||
spatialLayer := layer
|
||||
if pkt.Spatial >= 0 {
|
||||
if extPkt.Spatial >= 0 {
|
||||
// svc packet, take spatial layer info from packet
|
||||
spatialLayer = pkt.Spatial
|
||||
spatialLayer = extPkt.Spatial
|
||||
}
|
||||
if int(spatialLayer) >= len(spatialTrackers) {
|
||||
r.params.Logger.Errorw(
|
||||
"unexpected spatial layer", nil,
|
||||
"spatialLayer", spatialLayer,
|
||||
"pktSpatialLayer", pkt.Spatial,
|
||||
"pktSpatialLayer", extPkt.Spatial,
|
||||
)
|
||||
numPacketsDropped++
|
||||
continue
|
||||
@@ -863,22 +953,21 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
|
||||
var writeCount atomic.Int32
|
||||
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
writeCount.Add(dt.WriteRTP(pkt, spatialLayer))
|
||||
writeCount.Add(dt.WriteRTP(extPkt, spatialLayer))
|
||||
})
|
||||
|
||||
if rt := r.redTransformer.Load(); rt != nil {
|
||||
writeCount.Add(rt.(REDTransformer).ForwardRTP(pkt, spatialLayer))
|
||||
writeCount.Add(rt.(REDTransformer).ForwardRTP(extPkt, spatialLayer))
|
||||
}
|
||||
|
||||
// track delay/jitter
|
||||
if writeCount.Load() > 0 && r.forwardStats != nil && !pkt.IsBuffered {
|
||||
if latency, isHigh := r.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh {
|
||||
if writeCount.Load() > 0 && r.forwardStats != nil && !extPkt.IsBuffered {
|
||||
if latency, isHigh := r.forwardStats.Update(extPkt.Arrival, mono.UnixNano()); isHigh {
|
||||
r.params.Logger.Debugw(
|
||||
"high forwarding latency",
|
||||
"latency", time.Duration(latency),
|
||||
"queuingLatency", time.Duration(dequeuedAt-pkt.Arrival),
|
||||
"queuingLatency", time.Duration(dequeuedAt-extPkt.Arrival),
|
||||
"writeCount", writeCount.Load(),
|
||||
"isOutOfOrder", pkt.IsOutOfOrder,
|
||||
"isOutOfOrder", extPkt.IsOutOfOrder,
|
||||
"layer", layer,
|
||||
)
|
||||
}
|
||||
@@ -889,7 +978,7 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
if spatialTrackers[spatialLayer] == nil {
|
||||
spatialTrackers[spatialLayer] = r.streamTrackerManager.GetTracker(spatialLayer)
|
||||
if spatialTrackers[spatialLayer] == nil {
|
||||
if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM && pkt.DependencyDescriptor != nil {
|
||||
if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM && extPkt.DependencyDescriptor != nil {
|
||||
r.streamTrackerManager.AddDependencyDescriptorTrackers()
|
||||
}
|
||||
spatialTrackers[spatialLayer] = r.streamTrackerManager.AddTracker(spatialLayer)
|
||||
@@ -897,19 +986,19 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) {
|
||||
}
|
||||
if spatialTrackers[spatialLayer] != nil {
|
||||
spatialTrackers[spatialLayer].Observe(
|
||||
pkt.Temporal,
|
||||
len(pkt.RawPacket),
|
||||
len(pkt.Packet.Payload),
|
||||
pkt.Packet.Marker,
|
||||
pkt.Packet.Timestamp,
|
||||
pkt.DependencyDescriptor,
|
||||
extPkt.Temporal,
|
||||
len(extPkt.RawPacket),
|
||||
len(extPkt.Packet.Payload),
|
||||
extPkt.Packet.Marker,
|
||||
extPkt.Packet.Timestamp,
|
||||
extPkt.DependencyDescriptor,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
numPacketsForwarded++
|
||||
|
||||
buffer.ReleaseExtPacket(pkt)
|
||||
buffer.ReleaseExtPacket(extPkt)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -128,7 +128,6 @@ func (oq *opsQueueBase[T]) Enqueue(op T) {
|
||||
select {
|
||||
case oq.wake <- struct{}{}:
|
||||
default:
|
||||
oq.params.Logger.Debugw("could not wake ops queue", "name", oq.params.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user