Fix sequence number offset on packet drop (#1556)

This commit is contained in:
Raja Subramanian
2023-03-29 07:43:28 +05:30
committed by GitHub
parent de86ccb3df
commit 2c439b3063
6 changed files with 112 additions and 66 deletions

View File

@@ -1051,7 +1051,7 @@ func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOver
func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) {
_, brs := d.receiver.GetLayeredBitrate()
transition, available := d.forwarder.GetNextHigherTransition(brs, allowOvershoot)
d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available)
d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available, "bitrates", brs)
return transition, available
}

View File

@@ -136,12 +136,6 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
}
r.snOffset++
r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset
r.snOffsetsWritePtr = (r.snOffsetsWritePtr - 1) & SnOffsetCacheMask
r.snOffsetsOccupancy--
if r.snOffsetsOccupancy < 0 {
r.logger.Warnw("sequence number offset cache is invalid", nil, "occupancy", r.snOffsetsOccupancy)
}
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {

View File

@@ -105,6 +105,7 @@ func TestPacketDropped(t *testing.T) {
SequenceNumber: 23333,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
}
extPkt, _ := testutils.GetTestExtPacket(params)
r.SetLastSnTs(extPkt)
@@ -114,6 +115,9 @@ func TestPacketDropped(t *testing.T) {
require.Equal(t, uint16(0), r.snOffset)
require.Equal(t, uint32(0), r.tsOffset)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, 1, r.snOffsetsWritePtr)
// drop a non-head packet, should cause no change in internals
params = &testutils.TestExtPacketParams{
SequenceNumber: 33333,
@@ -122,7 +126,7 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.PacketDropped(extPkt)
require.Equal(t, r.highestIncomingSN, uint16(23332))
require.Equal(t, r.highestIncomingSN, uint16(23333))
require.Equal(t, r.lastSN, uint16(23333))
require.Equal(t, uint16(0), r.snOffset)
@@ -131,12 +135,33 @@ func TestPacketDropped(t *testing.T) {
SequenceNumber: 44444,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.highestIncomingSN = 44444
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
snOffsetWritePtr := (44444 - 23333 + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, SnOffsetCacheSize, r.snOffsetsOccupancy)
r.PacketDropped(extPkt)
require.Equal(t, r.lastSN, uint16(44443))
require.Equal(t, uint16(1), r.snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 44445,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, uint16(1), r.snOffsets[snOffsetWritePtr])
snOffsetWritePtr = (snOffsetWritePtr + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, r.lastSN, uint16(44444))
require.Equal(t, uint16(1), r.snOffset)
}
func TestOutOfOrderSequenceNumber(t *testing.T) {

View File

@@ -723,12 +723,10 @@ func (s *StreamAllocator) handleNewEstimateInNonProbe() {
}
var estimateToCommit int64
var packets, repeatedNacks uint32
var nackRatio float64
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
packets, repeatedNacks, nackRatio := s.channelObserver.GetNackRatio()
switch reason {
case ChannelCongestionReasonLoss:
packets, repeatedNacks, nackRatio = s.channelObserver.GetNackRatio()
estimateToCommit = int64(float64(expectedBandwidthUsage) * (1.0 - NackRatioAttenuator*nackRatio))
default:
estimateToCommit = s.lastReceivedEstimate
@@ -1110,7 +1108,7 @@ func (s *StreamAllocator) initProbe(probeRateBps int64) {
"committed", s.committedChannelCapacity,
"lastReceived", s.lastReceivedEstimate,
"probeRateBps", probeRateBps,
"goalBps", expectedBandwidthUsage+probeRateBps,
"goalBps", s.probeGoalBps,
)
}

View File

@@ -10,6 +10,12 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const (
missingPictureIdsThreshold = 50
droppedPictureIdsThreshold = 20
exemptedPictureIdsThreshold = 20
)
// VP8 munger
type TranslationParamsVP8 struct {
Header *buffer.VP8
@@ -47,8 +53,9 @@ type VP8MungerParams struct {
keyIdxOffset uint8
keyIdxUsed int
missingPictureIds *orderedmap.OrderedMap[int32, int32]
lastDroppedPictureId int32
missingPictureIds *orderedmap.OrderedMap[int32, int32]
droppedPictureIds *orderedmap.OrderedMap[int32, bool]
exemptedPictureIds *orderedmap.OrderedMap[int32, bool]
}
type VP8Munger struct {
@@ -61,8 +68,9 @@ func NewVP8Munger(logger logger.Logger) *VP8Munger {
return &VP8Munger{
logger: logger,
VP8MungerParams: VP8MungerParams{
missingPictureIds: orderedmap.NewOrderedMap[int32, int32](),
lastDroppedPictureId: -1,
missingPictureIds: orderedmap.NewOrderedMap[int32, int32](),
droppedPictureIds: orderedmap.NewOrderedMap[int32, bool](),
exemptedPictureIds: orderedmap.NewOrderedMap[int32, bool](),
},
}
}
@@ -112,8 +120,6 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) {
if v.keyIdxUsed == 1 {
v.lastKeyIdx = vp8.KEYIDX
}
v.lastDroppedPictureId = -1
}
func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
@@ -135,10 +141,10 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f
}
// clear missing picture ids on layer switch
// clear picture id caches on layer switch
v.missingPictureIds = orderedmap.NewOrderedMap[int32, int32]()
v.lastDroppedPictureId = -1
v.droppedPictureIds = orderedmap.NewOrderedMap[int32, bool]()
v.exemptedPictureIds = orderedmap.NewOrderedMap[int32, bool]()
}
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error) {
@@ -200,34 +206,56 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumb
// and check if that was the last packet of Picture 10), it could get complicated when
// the gap is larger.
if ordering == SequenceNumberOrderingGap {
// can drop packet if it belongs to the last dropped picture.
// Example:
// o Packet 10 - Picture 11 - TID that should be dropped
// o Packet 11 - missing
// o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped
// If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing
// picture id cache will not have an entry and hence will be dropped.
if extPictureId == v.lastDroppedPictureId {
return nil, ErrFilteredVP8TemporalLayer
} else {
for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ {
for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ {
// Record missing only if picture id was not dropped. This is to avoid a subsequent packet of dropped frame going through.
// A sequence like this
// o Packet 10 - Picture 11 - TID that should be dropped
// o Packet 11 - missing - belongs to Picture 11 still
// o Packet 12 - Picture 12 - will be reported as GAP, so missing picture id mapping will be set up for Picture 11 also.
// o Next packet - Packet 11 - this will use the wrong offset from missing pictures cache
_, ok := v.droppedPictureIds.Get(lostPictureId)
if !ok {
v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset)
}
}
// trim cache if necessary
for v.missingPictureIds.Len() > missingPictureIdsThreshold {
el := v.missingPictureIds.Front()
v.missingPictureIds.Delete(el.Key)
}
// if there is a gap, packet is forwarded irrespective of temporal layer as it cannot be determined
// which layer the missing packets belong to. A layer could have multiple packets. So, keep track
// of pictures that are forwarded even though they will be filterd out based on temporal layer
// requirements. That allows forwarding of the complete picture.
if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) {
v.exemptedPictureIds.Set(extPictureId, true)
// trim cache if necessary
for v.missingPictureIds.Len() > 50 {
el := v.missingPictureIds.Front()
v.missingPictureIds.Delete(el.Key)
for v.exemptedPictureIds.Len() > exemptedPictureIdsThreshold {
el := v.exemptedPictureIds.Front()
v.exemptedPictureIds.Delete(el.Key)
}
}
} else {
if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) {
// adjust only once per picture as a picture could have multiple packets
if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId {
v.lastDroppedPictureId = extPictureId
v.pictureIdOffset += 1
// drop only if not exempted
_, ok := v.exemptedPictureIds.Get(extPictureId)
if !ok {
// adjust only once per picture as a picture could have multiple packets
if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId {
// keep track of dropped picture ids so that they do not get into the missing picture cache
v.droppedPictureIds.Set(extPictureId, true)
// trim cache if necessary
for v.droppedPictureIds.Len() > droppedPictureIdsThreshold {
el := v.droppedPictureIds.Front()
v.droppedPictureIds.Delete(el.Key)
}
v.pictureIdOffset += 1
}
return nil, ErrFilteredVP8TemporalLayer
}
return nil, ErrFilteredVP8TemporalLayer
}
}

View File

@@ -65,17 +65,16 @@ func TestSetLast(t *testing.T) {
totalWrap: 0,
lastWrap: 0,
},
extLastPictureId: 13467,
pictureIdOffset: 0,
pictureIdUsed: 1,
lastTl0PicIdx: 233,
tl0PicIdxOffset: 0,
tl0PicIdxUsed: 1,
tidUsed: 1,
lastKeyIdx: 23,
keyIdxOffset: 0,
keyIdxUsed: 1,
lastDroppedPictureId: -1,
extLastPictureId: 13467,
pictureIdOffset: 0,
pictureIdUsed: 1,
lastTl0PicIdx: 233,
tl0PicIdxOffset: 0,
tl0PicIdxUsed: 1,
tidUsed: 1,
lastKeyIdx: 23,
keyIdxOffset: 0,
keyIdxUsed: 1,
},
}
@@ -140,17 +139,16 @@ func TestUpdateOffsets(t *testing.T) {
totalWrap: 0,
lastWrap: 0,
},
extLastPictureId: 13467,
pictureIdOffset: 345 - 13467 - 1,
pictureIdUsed: 1,
lastTl0PicIdx: 233,
tl0PicIdxOffset: (12 - 233 - 1) & 0xff,
tl0PicIdxUsed: 1,
tidUsed: 1,
lastKeyIdx: 23,
keyIdxOffset: (4 - 23 - 1) & 0x1f,
keyIdxUsed: 1,
lastDroppedPictureId: -1,
extLastPictureId: 13467,
pictureIdOffset: 345 - 13467 - 1,
pictureIdUsed: 1,
lastTl0PicIdx: 233,
tl0PicIdxOffset: (12 - 233 - 1) & 0xff,
tl0PicIdxUsed: 1,
tidUsed: 1,
lastKeyIdx: 23,
keyIdxOffset: (4 - 23 - 1) & 0x1f,
keyIdxUsed: 1,
},
}
require.True(t, compare(&expectedVP8Munger, v))
@@ -289,7 +287,8 @@ func TestTemporalLayerFiltering(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer)
require.Nil(t, tp)
require.EqualValues(t, 13467, v.lastDroppedPictureId)
dropped, _ := v.droppedPictureIds.Get(13467)
require.True(t, dropped)
require.EqualValues(t, 1, v.pictureIdOffset)
// another packet with the same picture id.
@@ -301,7 +300,8 @@ func TestTemporalLayerFiltering(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer)
require.Nil(t, tp)
require.EqualValues(t, 13467, v.lastDroppedPictureId)
dropped, _ = v.droppedPictureIds.Get(13467)
require.True(t, dropped)
require.EqualValues(t, 1, v.pictureIdOffset)
// another packet with the same picture id, but a gap in sequence number.
@@ -313,7 +313,8 @@ func TestTemporalLayerFiltering(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrFilteredVP8TemporalLayer)
require.Nil(t, tp)
require.EqualValues(t, 13467, v.lastDroppedPictureId)
dropped, _ = v.droppedPictureIds.Get(13467)
require.True(t, dropped)
require.EqualValues(t, 1, v.pictureIdOffset)
}