diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c1d72f28c..4e89eed3e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1051,7 +1051,7 @@ func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOver func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) { _, brs := d.receiver.GetLayeredBitrate() transition, available := d.forwarder.GetNextHigherTransition(brs, allowOvershoot) - d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available) + d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available, "bitrates", brs) return transition, available } diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 1cd27d745..35ea2b25f 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -136,12 +136,6 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { } r.snOffset++ r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset - - r.snOffsetsWritePtr = (r.snOffsetsWritePtr - 1) & SnOffsetCacheMask - r.snOffsetsOccupancy-- - if r.snOffsetsOccupancy < 0 { - r.logger.Warnw("sequence number offset cache is invalid", nil, "occupancy", r.snOffsetsOccupancy) - } } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 1d59759ab..e1e772023 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -105,6 +105,7 @@ func TestPacketDropped(t *testing.T) { SequenceNumber: 23333, Timestamp: 0xabcdef, SSRC: 0x12345678, + PayloadSize: 10, } extPkt, _ := testutils.GetTestExtPacket(params) r.SetLastSnTs(extPkt) @@ -114,6 +115,9 @@ func TestPacketDropped(t *testing.T) { require.Equal(t, uint16(0), r.snOffset) require.Equal(t, uint32(0), r.tsOffset) + r.UpdateAndGetSnTs(extPkt) // update sequence number offset + require.Equal(t, 1, r.snOffsetsWritePtr) + // drop a non-head packet, should cause no change in internals params = &testutils.TestExtPacketParams{ SequenceNumber: 33333, @@ -122,7 +126,7 @@ func TestPacketDropped(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) r.PacketDropped(extPkt) - require.Equal(t, r.highestIncomingSN, uint16(23332)) + require.Equal(t, r.highestIncomingSN, uint16(23333)) require.Equal(t, r.lastSN, uint16(23333)) require.Equal(t, uint16(0), r.snOffset) @@ -131,12 +135,33 @@ func TestPacketDropped(t *testing.T) { SequenceNumber: 44444, Timestamp: 0xabcdef, SSRC: 0x12345678, + PayloadSize: 20, } extPkt, _ = testutils.GetTestExtPacket(params) - r.highestIncomingSN = 44444 + + r.UpdateAndGetSnTs(extPkt) // update sequence number offset + snOffsetWritePtr := (44444 - 23333 + 1) & SnOffsetCacheMask + require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr) + require.Equal(t, SnOffsetCacheSize, r.snOffsetsOccupancy) + r.PacketDropped(extPkt) require.Equal(t, r.lastSN, uint16(44443)) require.Equal(t, uint16(1), r.snOffset) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 44445, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, _ = testutils.GetTestExtPacket(params) + + r.UpdateAndGetSnTs(extPkt) // update sequence number offset + require.Equal(t, uint16(1), r.snOffsets[snOffsetWritePtr]) + snOffsetWritePtr = (snOffsetWritePtr + 1) & SnOffsetCacheMask + require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr) + require.Equal(t, r.lastSN, uint16(44444)) + require.Equal(t, uint16(1), r.snOffset) } func TestOutOfOrderSequenceNumber(t *testing.T) { diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 329b3f2c2..e5f7d5e3f 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -723,12 +723,10 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() { } var estimateToCommit int64 - var packets, repeatedNacks uint32 - var nackRatio float64 expectedBandwidthUsage := s.getExpectedBandwidthUsage() + packets, repeatedNacks, nackRatio := s.channelObserver.GetNackRatio() switch reason { case ChannelCongestionReasonLoss: - packets, repeatedNacks, nackRatio = s.channelObserver.GetNackRatio() estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio)) default: estimateToCommit = s.lastReceivedEstimate @@ -1110,7 +1108,7 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) { "committed", s.committedChannelCapacity, "lastReceived", s.lastReceivedEstimate, "probeRateBps", probeRateBps, - "goalBps", expectedBandwidthUsage+probeRateBps, + "goalBps", s.probeGoalBps, ) } diff --git a/pkg/sfu/vp8munger.go b/pkg/sfu/vp8munger.go index 1dfa24d12..a631a6237 100644 --- a/pkg/sfu/vp8munger.go +++ b/pkg/sfu/vp8munger.go @@ -10,6 +10,12 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" ) +const ( + missingPictureIdsThreshold = 50 + droppedPictureIdsThreshold = 20 + exemptedPictureIdsThreshold = 20 +) + // VP8 munger type TranslationParamsVP8 struct { Header *buffer.VP8 @@ -47,8 +53,9 @@ type VP8MungerParams struct { keyIdxOffset uint8 keyIdxUsed int - missingPictureIds *orderedmap.OrderedMap[int32, int32] - lastDroppedPictureId int32 + missingPictureIds *orderedmap.OrderedMap[int32, int32] + droppedPictureIds *orderedmap.OrderedMap[int32, bool] + exemptedPictureIds *orderedmap.OrderedMap[int32, bool] } type VP8Munger struct { @@ -61,8 +68,9 @@ func NewVP8Munger(logger logger.Logger) *VP8Munger { return &VP8Munger{ logger: logger, VP8MungerParams: VP8MungerParams{ - missingPictureIds: orderedmap.NewOrderedMap[int32, int32](), - lastDroppedPictureId: -1, + missingPictureIds: orderedmap.NewOrderedMap[int32, int32](), + droppedPictureIds: orderedmap.NewOrderedMap[int32, bool](), + exemptedPictureIds: orderedmap.NewOrderedMap[int32, bool](), }, } } @@ -112,8 +120,6 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { if v.keyIdxUsed == 1 { v.lastKeyIdx = vp8.KEYIDX } - - v.lastDroppedPictureId = -1 } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { @@ -135,10 +141,10 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f } - // clear missing picture ids on layer switch + // clear picture id caches on layer switch v.missingPictureIds = orderedmap.NewOrderedMap[int32, int32]() - - v.lastDroppedPictureId = -1 + v.droppedPictureIds = orderedmap.NewOrderedMap[int32, bool]() + v.exemptedPictureIds = orderedmap.NewOrderedMap[int32, bool]() } func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error) { @@ -200,34 +206,56 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumb // and check if that was the last packet of Picture 10), it could get complicated when // the gap is larger. if ordering == SequenceNumberOrderingGap { - // can drop packet if it belongs to the last dropped picture. - // Example: - // o Packet 10 - Picture 11 - TID that should be dropped - // o Packet 11 - missing - // o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped - // If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing - // picture id cache will not have an entry and hence will be dropped. - if extPictureId == v.lastDroppedPictureId { - return nil, ErrFilteredVP8TemporalLayer - } else { - for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ { + for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ { + // Record missing only if picture id was not dropped. This is to avoid a subsequent packet of dropped frame going through. + // A sequence like this + // o Packet 10 - Picture 11 - TID that should be dropped + // o Packet 11 - missing - belongs to Picture 11 still + // o Packet 12 - Picture 12 - will be reported as GAP, so missing picture id mapping will be set up for Picture 11 also. + // o Next packet - Packet 11 - this will use the wrong offset from missing pictures cache + _, ok := v.droppedPictureIds.Get(lostPictureId) + if !ok { v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset) } + } + // trim cache if necessary + for v.missingPictureIds.Len() > missingPictureIdsThreshold { + el := v.missingPictureIds.Front() + v.missingPictureIds.Delete(el.Key) + } + + // if there is a gap, packet is forwarded irrespective of temporal layer as it cannot be determined + // which layer the missing packets belong to. A layer could have multiple packets. So, keep track + // of pictures that are forwarded even though they will be filterd out based on temporal layer + // requirements. That allows forwarding of the complete picture. + if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) { + v.exemptedPictureIds.Set(extPictureId, true) // trim cache if necessary - for v.missingPictureIds.Len() > 50 { - el := v.missingPictureIds.Front() - v.missingPictureIds.Delete(el.Key) + for v.exemptedPictureIds.Len() > exemptedPictureIdsThreshold { + el := v.exemptedPictureIds.Front() + v.exemptedPictureIds.Delete(el.Key) } } } else { if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) { - // adjust only once per picture as a picture could have multiple packets - if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId { - v.lastDroppedPictureId = extPictureId - v.pictureIdOffset += 1 + // drop only if not exempted + _, ok := v.exemptedPictureIds.Get(extPictureId) + if !ok { + // adjust only once per picture as a picture could have multiple packets + if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId { + // keep track of dropped picture ids so that they do not get into the missing picture cache + v.droppedPictureIds.Set(extPictureId, true) + // trim cache if necessary + for v.droppedPictureIds.Len() > droppedPictureIdsThreshold { + el := v.droppedPictureIds.Front() + v.droppedPictureIds.Delete(el.Key) + } + + v.pictureIdOffset += 1 + } + return nil, ErrFilteredVP8TemporalLayer } - return nil, ErrFilteredVP8TemporalLayer } } diff --git a/pkg/sfu/vp8munger_test.go b/pkg/sfu/vp8munger_test.go index 20fbcd922..0b32fc4db 100644 --- a/pkg/sfu/vp8munger_test.go +++ b/pkg/sfu/vp8munger_test.go @@ -65,17 +65,16 @@ func TestSetLast(t *testing.T) { totalWrap: 0, lastWrap: 0, }, - extLastPictureId: 13467, - pictureIdOffset: 0, - pictureIdUsed: 1, - lastTl0PicIdx: 233, - tl0PicIdxOffset: 0, - tl0PicIdxUsed: 1, - tidUsed: 1, - lastKeyIdx: 23, - keyIdxOffset: 0, - keyIdxUsed: 1, - lastDroppedPictureId: -1, + extLastPictureId: 13467, + pictureIdOffset: 0, + pictureIdUsed: 1, + lastTl0PicIdx: 233, + tl0PicIdxOffset: 0, + tl0PicIdxUsed: 1, + tidUsed: 1, + lastKeyIdx: 23, + keyIdxOffset: 0, + keyIdxUsed: 1, }, } @@ -140,17 +139,16 @@ func TestUpdateOffsets(t *testing.T) { totalWrap: 0, lastWrap: 0, }, - extLastPictureId: 13467, - pictureIdOffset: 345 - 13467 - 1, - pictureIdUsed: 1, - lastTl0PicIdx: 233, - tl0PicIdxOffset: (12 - 233 - 1) & 0xff, - tl0PicIdxUsed: 1, - tidUsed: 1, - lastKeyIdx: 23, - keyIdxOffset: (4 - 23 - 1) & 0x1f, - keyIdxUsed: 1, - lastDroppedPictureId: -1, + extLastPictureId: 13467, + pictureIdOffset: 345 - 13467 - 1, + pictureIdUsed: 1, + lastTl0PicIdx: 233, + tl0PicIdxOffset: (12 - 233 - 1) & 0xff, + tl0PicIdxUsed: 1, + tidUsed: 1, + lastKeyIdx: 23, + keyIdxOffset: (4 - 23 - 1) & 0x1f, + keyIdxUsed: 1, }, } require.True(t, compare(&expectedVP8Munger, v)) @@ -289,7 +287,8 @@ func TestTemporalLayerFiltering(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer) require.Nil(t, tp) - require.EqualValues(t, 13467, v.lastDroppedPictureId) + dropped, _ := v.droppedPictureIds.Get(13467) + require.True(t, dropped) require.EqualValues(t, 1, v.pictureIdOffset) // another packet with the same picture id. @@ -301,7 +300,8 @@ func TestTemporalLayerFiltering(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer) require.Nil(t, tp) - require.EqualValues(t, 13467, v.lastDroppedPictureId) + dropped, _ = v.droppedPictureIds.Get(13467) + require.True(t, dropped) require.EqualValues(t, 1, v.pictureIdOffset) // another packet with the same picture id, but a gap in sequence number. @@ -313,7 +313,8 @@ func TestTemporalLayerFiltering(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer) require.Nil(t, tp) - require.EqualValues(t, 13467, v.lastDroppedPictureId) + dropped, _ = v.droppedPictureIds.Get(13467) + require.True(t, dropped) require.EqualValues(t, 1, v.pictureIdOffset) }