From 2ec5f2bd3d42beeebcb42f369a0f5e041d11eb27 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 11 Nov 2021 19:03:33 +0530 Subject: [PATCH] Fixing edge cases in picture id munging. (#180) * Fixing edge cases in picture id munging. Changes 1. Check the RTP sequence number order before VP8 temporal layer filtering and use that ordering result while doing temporal layer filtering. In a sequence like below o Packet 10 -> Picture ID 10 o Packet 11 -> missing o Packet 12 -> Picture ID 11 it is not known if packet 11 will belong to Picture ID 10 or Picture ID 11. The problem becomes a lot more tricky if there is a burst loss and there is a larger hole in the picture id space also as a result. So, in the event of a packet loss, forward even if the current packet belongs to a layer that can be dropped. More comments in code. 2. Use result of sequence number ordering check while doing VP8 picture id munging. 3. When adding to missing picture id cache, have to include picture ids including both ends. As a picture can span multiple packets and it is not known which picture the packet belongs to, have to include both ends also in missing picture id cache in the event of a gap. 4. As a picture can span multiple packets, it is not possible to have a simple map of missing picture ids as an entry cannot be deleted if an out-of-order picture id is received. There may be more missing packets belonging to that picture id that is yet to be received. So, have to use an ordered map and truncate the map if it grows too large. Picked this for ordered map - https://github.com/elliotchance/orderedmap. Has a simple API, had the highest number of stars of all the ones I checked. And there are benchmarks. The author also wrote a medium post at https://medium.com/swlh/an-ordered-map-in-go-436634692381 Another one which I looked at is - https://github.com/wk8/go-ordered-map. The author of that wrote at https://morioh.com/p/990229f32171 and has a bunch of other options at the end of that post (but does not include the one I picked above). None of those have that many stars. Testing: -------- - Set max temporal layers to 0 so that temporal filtering happens and run for an hour on sample app. * do not let padding packets through VP8 * Correct comment * fix comment * Review comments from Jie * golang naming convention --- go.mod | 1 + go.sum | 2 + pkg/sfu/downtrack.go | 149 +++++++++++++++++++++++++++++-------------- 3 files changed, 105 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 79b56e4ef..2a5b35590 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/bep/debounce v1.2.0 github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect + github.com/elliotchance/orderedmap v1.4.0 // indirect github.com/gammazero/deque v0.1.0 github.com/gammazero/workerpool v1.1.2 github.com/go-logr/logr v1.2.0 diff --git a/go.sum b/go.sum index 689ef308d..dd3abe6ec 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4 github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elliotchance/orderedmap v1.4.0 h1:wZtfeEONCbx6in1CZyE6bELEt/vFayMvsxqI5SgsR+A= +github.com/elliotchance/orderedmap v1.4.0/go.mod h1:wsDwEaX5jEoyhbs7x93zk2H/qv0zwuhg4inXhDkYqys= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0e71ad1ca..b2f1e5760 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/elliotchance/orderedmap" + "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/sdp/v3" @@ -30,6 +32,15 @@ const ( RTPBlankFramesMax = 6 ) +type SequenceNumberOrdering int + +const ( + SequenceNumberOrderingContiguous SequenceNumberOrdering = iota + SequenceNumberOrderingOutOfOrder + SequenceNumberOrderingGap + SequenceNumberOrderingUnknown +) + var ( ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") @@ -816,12 +827,19 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { d.reSync.set(false) } + newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) + if err != nil { + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return nil + } + + d.pktsDropped.add(1) + return err + } + payload := extPkt.Packet.Payload - var ( - translatedVP8 *buffer.VP8 - err error - ) + var translatedVP8 *buffer.VP8 if d.vp8Munger != nil && len(payload) > 0 { // LK-TODO-START // Errors below do not update sequence number. That is a problem if the stream @@ -830,7 +848,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -858,15 +876,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { } } - newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head) if meta != nil && translatedVP8 != nil { @@ -968,12 +977,19 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } + newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) + if err != nil { + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return nil + } + + d.pktsDropped.add(1) + return err + } + payload := extPkt.Packet.Payload - var ( - translatedVP8 *buffer.VP8 - err error - ) + var translatedVP8 *buffer.VP8 if d.vp8Munger != nil && len(payload) > 0 { // LK-TODO-START // Errors below do not update sequence number. That is a problem if the stream @@ -982,7 +998,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -1010,15 +1026,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } - newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head) if meta != nil && translatedVP8 != nil { @@ -1397,6 +1404,9 @@ func (m *Munger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, ts m.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 m.snOffset = extPkt.Packet.SequenceNumber - m.lastSN - snAdjust m.tsOffset = extPkt.Packet.Timestamp - m.lastTS - tsAdjust + + // clear incoming missing sequence numbers on layer switch + m.missingSNs = make(map[uint16]uint16, 10) } func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) { @@ -1411,7 +1421,7 @@ func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) { m.snOffset += 1 } -func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, error) { +func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, SequenceNumberOrdering, error) { m.lock.Lock() defer m.lock.Unlock() @@ -1419,16 +1429,19 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err if !extPkt.Head { snOffset, ok := m.missingSNs[extPkt.Packet.SequenceNumber] if !ok { - return 0, 0, ErrOutOfOrderSequenceNumberCacheMiss + return 0, 0, SequenceNumberOrderingOutOfOrder, ErrOutOfOrderSequenceNumberCacheMiss } delete(m.missingSNs, extPkt.Packet.SequenceNumber) - return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, nil + return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, SequenceNumberOrderingOutOfOrder, nil } + ordering := SequenceNumberOrderingContiguous + // if there are gaps, record it in missing sequence number cache diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN if diff > 1 { + ordering = SequenceNumberOrderingGap var lossStartSN, lossEndSN int lossStartSN = int(m.highestIncomingSN) + 1 if extPkt.Packet.SequenceNumber > m.highestIncomingSN { @@ -1442,7 +1455,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err } else { // can get duplicate packet due to FEC if diff == 0 { - return 0, 0, ErrDuplicatePacket + return 0, 0, SequenceNumberOrderingUnknown, ErrDuplicatePacket } // if padding only packet, can be dropped and sequence number adjusted @@ -1452,7 +1465,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err if len(extPkt.Packet.Payload) == 0 { m.highestIncomingSN = extPkt.Packet.SequenceNumber m.snOffset += 1 - return 0, 0, ErrPaddingOnlyPacket + return 0, 0, SequenceNumberOrderingContiguous, ErrPaddingOnlyPacket } } @@ -1470,7 +1483,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err m.lastTS = mungedTS m.lastMarker = extPkt.Packet.Marker - return mungedSN, mungedTS, nil + return mungedSN, mungedTS, ordering, nil } func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) { @@ -1517,7 +1530,8 @@ type VP8MungerParams struct { keyIdxOffset uint8 keyIdxUsed int - missingPictureIds map[int32]int32 + missingPictureIds *orderedmap.OrderedMap + lastDroppedPictureId int32 } type VP8Munger struct { @@ -1528,7 +1542,8 @@ type VP8Munger struct { func NewVP8Munger() *VP8Munger { return &VP8Munger{VP8MungerParams: VP8MungerParams{ - missingPictureIds: make(map[int32]int32, 10), + missingPictureIds: orderedmap.NewOrderedMap(), + lastDroppedPictureId: -1, }} } @@ -1558,6 +1573,8 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { if v.keyIdxUsed == 1 { v.lastKeyIdx = vp8.KEYIDX } + + v.lastDroppedPictureId = -1 } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { @@ -1581,9 +1598,14 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { if v.keyIdxUsed == 1 { v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f } + + // clear missing picture ids on layer switch + v.missingPictureIds = orderedmap.NewOrderedMap() + + v.lastDroppedPictureId = -1 } -func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) { +func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*buffer.VP8, error) { v.lock.Lock() defer v.lock.Unlock() @@ -1596,12 +1618,17 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 // if out-of-order, look up missing picture id cache if !newer { - pictureIdOffset, ok := v.missingPictureIds[extPictureId] + value, ok := v.missingPictureIds.Get(extPictureId) if !ok { return nil, ErrOutOfOrderVP8PictureIdCacheMiss } + pictureIdOffset := value.(int32) - delete(v.missingPictureIds, extPictureId) + // the out-of-order picture id cannot be deleted from the cache + // as there could more than one packet in a picture and more + // than one packet of a picture could come out-of-order. + // To prevent picture id cache from growing, it is truncated + // when it reaches a certain size. mungedPictureId := uint16((extPictureId - pictureIdOffset) & 0x7fff) vp8Packet := &buffer.VP8{ @@ -1625,25 +1652,53 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 prevMaxPictureId := v.pictureIdWrapHandler.MaxPictureId() v.pictureIdWrapHandler.UpdateMaxPictureId(extPictureId, vp8.MBit) - // if there are gaps, record it in missing picture id cache - // check for > 1 as consecutive packets can have the same picture ID, - // i. e. one picture composed of multiple packets - if extPictureId-prevMaxPictureId > 1 { - for lostPictureId := prevMaxPictureId + 1; lostPictureId < extPictureId; lostPictureId++ { - v.missingPictureIds[lostPictureId] = v.pictureIdOffset + // if there is a gap in sequence number, record possible pictures that + // the missing packets can belong to in missing picture id cache. + // The missing picture cache should contain the previous picture id + // and the current picture id and all the intervening pictures. + // This is to handle a scenario as follows + // o Packet 10 -> Picture ID 10 + // o Packet 11 -> missing + // o Packet 12 -> Picture ID 11 + // In this case, Packet 11 could belong to either Picture ID 10 (last packet of that picture) + // or Picture ID 11 (first packet of the current picture). Although in this simple case, + // it is possible to deduce that (for example by looking at previous packet's RTP marker + // and check if that was the last packet of Picture 10), it could get complicated when + // the gap is larger. + if ordering == SequenceNumberOrderingGap { + // can drop packet if it belongs to the last dropped picture. + // Example: + // o Packet 10 - Picture 11 - TID that should be dropped + // o Packet 11 - missing + // o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped + // If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing + // picture id cache will not have an entry and hence will be dropped. + if extPictureId == v.lastDroppedPictureId { + return nil, ErrFilteredVP8TemporalLayer + } else { + for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ { + v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset) + } + + // trim cache if necessary + for v.missingPictureIds.Len() > 50 { + el := v.missingPictureIds.Front() + v.missingPictureIds.Delete(el.Key) + } } } else { if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) { // adjust only once per picture as a picture could have multiple packets if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId { + v.lastDroppedPictureId = extPictureId v.pictureIdOffset += 1 } return nil, ErrFilteredVP8TemporalLayer } } - // in-order incoming picture, may or may not be contiguous. - // In the case of loss (i. e. incoming picture number is not contiguous), + // in-order incoming sequence number, may or may not be contiguous. + // In the case of loss (i. e. incoming sequence number is not contiguous), // forward even if it is a filtered layer. With temporal scalability, // it is unclear if the current packet should be dropped if it is not // contiguous. Hence forward anything that is not contiguous.