Fixing edge cases in picture id munging. (#180)

* Fixing edge cases in picture id munging.

Changes

1. Check the RTP sequence number order before VP8 temporal layer
   filtering and use that ordering result while doing temporal
   layer filtering.

    In a sequence like below
       o Packet 10 -> Picture ID 10
       o Packet 11 -> missing
       o Packet 12 -> Picture ID 11
     it is not known if packet 11 will belong to Picture ID 10 or
     Picture ID 11. The problem becomes a lot more tricky if there
     is a burst loss and there is a larger hole in the picture id
     space also as a result.

     So, in the event of a packet loss, forward even if the current
     packet belongs to a layer that can be dropped. More comments
     in code.

2. Use result of sequence number ordering check while doing VP8 picture id munging.

3. When adding to missing picture id cache, have to include picture ids including
   both ends. As a picture can span multiple packets and it is not known which
   picture the packet belongs to, have to include both ends also in missing
   picture id cache in the event of a gap.

4. As a picture can span multiple packets, it is not possible to have a simple
   map of missing picture ids as an entry cannot be deleted if an out-of-order
   picture id is received. There may be more missing packets belonging to that
   picture id that is yet to be received.

   So, have to use an ordered map and truncate the map if it grows too large.

   Picked this for ordered map - https://github.com/elliotchance/orderedmap.
   Has a simple API, had the highest number of stars of all the ones I checked.
   And there are benchmarks.
   The author also wrote a medium post at https://medium.com/swlh/an-ordered-map-in-go-436634692381

   Another one which I looked at is - https://github.com/wk8/go-ordered-map.
   The author of that wrote at https://morioh.com/p/990229f32171 and has a
   bunch of other options at the end of that post (but does not include the
   one I picked above). None of those have that many stars.

Testing:
--------
- Set max temporal layers to 0 so that temporal filtering happens and run for
an hour on sample app.

* do not let padding packets through VP8

* Correct comment

* fix comment

* Review comments from Jie

* golang naming convention
This commit is contained in:
Raja Subramanian
2021-11-11 19:03:33 +05:30
committed by GitHub
parent fc52b18776
commit 2ec5f2bd3d
3 changed files with 105 additions and 47 deletions

1
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/bep/debounce v1.2.0
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/elliotchance/orderedmap v1.4.0 // indirect
github.com/gammazero/deque v0.1.0
github.com/gammazero/workerpool v1.1.2
github.com/go-logr/logr v1.2.0

2
go.sum
View File

@@ -30,6 +30,8 @@ github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4
github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elliotchance/orderedmap v1.4.0 h1:wZtfeEONCbx6in1CZyE6bELEt/vFayMvsxqI5SgsR+A=
github.com/elliotchance/orderedmap v1.4.0/go.mod h1:wsDwEaX5jEoyhbs7x93zk2H/qv0zwuhg4inXhDkYqys=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/elliotchance/orderedmap"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
@@ -30,6 +32,15 @@ const (
RTPBlankFramesMax = 6
)
type SequenceNumberOrdering int
const (
SequenceNumberOrderingContiguous SequenceNumberOrdering = iota
SequenceNumberOrderingOutOfOrder
SequenceNumberOrderingGap
SequenceNumberOrderingUnknown
)
var (
ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded")
@@ -816,12 +827,19 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
d.reSync.set(false)
}
newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt)
if err != nil {
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return nil
}
d.pktsDropped.add(1)
return err
}
payload := extPkt.Packet.Payload
var (
translatedVP8 *buffer.VP8
err error
)
var translatedVP8 *buffer.VP8
if d.vp8Munger != nil && len(payload) > 0 {
// LK-TODO-START
// Errors below do not update sequence number. That is a problem if the stream
@@ -830,7 +848,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
// that, the sequence numbers should be updated to ensure that subsequent packet
// translations works fine and produce proper translated sequence numbers.
// LK-TODO-END
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16)
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16)
if err != nil {
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
@@ -858,15 +876,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
}
}
newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt)
if err != nil {
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return nil
}
d.pktsDropped.add(1)
return err
}
if d.sequencer != nil {
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head)
if meta != nil && translatedVP8 != nil {
@@ -968,12 +977,19 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
}
}
newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt)
if err != nil {
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return nil
}
d.pktsDropped.add(1)
return err
}
payload := extPkt.Packet.Payload
var (
translatedVP8 *buffer.VP8
err error
)
var translatedVP8 *buffer.VP8
if d.vp8Munger != nil && len(payload) > 0 {
// LK-TODO-START
// Errors below do not update sequence number. That is a problem if the stream
@@ -982,7 +998,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
// that, the sequence numbers should be updated to ensure that subsequent packet
// translations works fine and produce proper translated sequence numbers.
// LK-TODO-END
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16)
translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16)
if err != nil {
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
@@ -1010,15 +1026,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
}
}
newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt)
if err != nil {
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return nil
}
d.pktsDropped.add(1)
return err
}
if d.sequencer != nil {
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head)
if meta != nil && translatedVP8 != nil {
@@ -1397,6 +1404,9 @@ func (m *Munger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, ts
m.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
m.snOffset = extPkt.Packet.SequenceNumber - m.lastSN - snAdjust
m.tsOffset = extPkt.Packet.Timestamp - m.lastTS - tsAdjust
// clear incoming missing sequence numbers on layer switch
m.missingSNs = make(map[uint16]uint16, 10)
}
func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) {
@@ -1411,7 +1421,7 @@ func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) {
m.snOffset += 1
}
func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, error) {
func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, SequenceNumberOrdering, error) {
m.lock.Lock()
defer m.lock.Unlock()
@@ -1419,16 +1429,19 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
if !extPkt.Head {
snOffset, ok := m.missingSNs[extPkt.Packet.SequenceNumber]
if !ok {
return 0, 0, ErrOutOfOrderSequenceNumberCacheMiss
return 0, 0, SequenceNumberOrderingOutOfOrder, ErrOutOfOrderSequenceNumberCacheMiss
}
delete(m.missingSNs, extPkt.Packet.SequenceNumber)
return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, nil
return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, SequenceNumberOrderingOutOfOrder, nil
}
ordering := SequenceNumberOrderingContiguous
// if there are gaps, record it in missing sequence number cache
diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN
if diff > 1 {
ordering = SequenceNumberOrderingGap
var lossStartSN, lossEndSN int
lossStartSN = int(m.highestIncomingSN) + 1
if extPkt.Packet.SequenceNumber > m.highestIncomingSN {
@@ -1442,7 +1455,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
} else {
// can get duplicate packet due to FEC
if diff == 0 {
return 0, 0, ErrDuplicatePacket
return 0, 0, SequenceNumberOrderingUnknown, ErrDuplicatePacket
}
// if padding only packet, can be dropped and sequence number adjusted
@@ -1452,7 +1465,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
if len(extPkt.Packet.Payload) == 0 {
m.highestIncomingSN = extPkt.Packet.SequenceNumber
m.snOffset += 1
return 0, 0, ErrPaddingOnlyPacket
return 0, 0, SequenceNumberOrderingContiguous, ErrPaddingOnlyPacket
}
}
@@ -1470,7 +1483,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
m.lastTS = mungedTS
m.lastMarker = extPkt.Packet.Marker
return mungedSN, mungedTS, nil
return mungedSN, mungedTS, ordering, nil
}
func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) {
@@ -1517,7 +1530,8 @@ type VP8MungerParams struct {
keyIdxOffset uint8
keyIdxUsed int
missingPictureIds map[int32]int32
missingPictureIds *orderedmap.OrderedMap
lastDroppedPictureId int32
}
type VP8Munger struct {
@@ -1528,7 +1542,8 @@ type VP8Munger struct {
func NewVP8Munger() *VP8Munger {
return &VP8Munger{VP8MungerParams: VP8MungerParams{
missingPictureIds: make(map[int32]int32, 10),
missingPictureIds: orderedmap.NewOrderedMap(),
lastDroppedPictureId: -1,
}}
}
@@ -1558,6 +1573,8 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) {
if v.keyIdxUsed == 1 {
v.lastKeyIdx = vp8.KEYIDX
}
v.lastDroppedPictureId = -1
}
func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
@@ -1581,9 +1598,14 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
if v.keyIdxUsed == 1 {
v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f
}
// clear missing picture ids on layer switch
v.missingPictureIds = orderedmap.NewOrderedMap()
v.lastDroppedPictureId = -1
}
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) {
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*buffer.VP8, error) {
v.lock.Lock()
defer v.lock.Unlock()
@@ -1596,12 +1618,17 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3
// if out-of-order, look up missing picture id cache
if !newer {
pictureIdOffset, ok := v.missingPictureIds[extPictureId]
value, ok := v.missingPictureIds.Get(extPictureId)
if !ok {
return nil, ErrOutOfOrderVP8PictureIdCacheMiss
}
pictureIdOffset := value.(int32)
delete(v.missingPictureIds, extPictureId)
// the out-of-order picture id cannot be deleted from the cache
// as there could more than one packet in a picture and more
// than one packet of a picture could come out-of-order.
// To prevent picture id cache from growing, it is truncated
// when it reaches a certain size.
mungedPictureId := uint16((extPictureId - pictureIdOffset) & 0x7fff)
vp8Packet := &buffer.VP8{
@@ -1625,25 +1652,53 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3
prevMaxPictureId := v.pictureIdWrapHandler.MaxPictureId()
v.pictureIdWrapHandler.UpdateMaxPictureId(extPictureId, vp8.MBit)
// if there are gaps, record it in missing picture id cache
// check for > 1 as consecutive packets can have the same picture ID,
// i. e. one picture composed of multiple packets
if extPictureId-prevMaxPictureId > 1 {
for lostPictureId := prevMaxPictureId + 1; lostPictureId < extPictureId; lostPictureId++ {
v.missingPictureIds[lostPictureId] = v.pictureIdOffset
// if there is a gap in sequence number, record possible pictures that
// the missing packets can belong to in missing picture id cache.
// The missing picture cache should contain the previous picture id
// and the current picture id and all the intervening pictures.
// This is to handle a scenario as follows
// o Packet 10 -> Picture ID 10
// o Packet 11 -> missing
// o Packet 12 -> Picture ID 11
// In this case, Packet 11 could belong to either Picture ID 10 (last packet of that picture)
// or Picture ID 11 (first packet of the current picture). Although in this simple case,
// it is possible to deduce that (for example by looking at previous packet's RTP marker
// and check if that was the last packet of Picture 10), it could get complicated when
// the gap is larger.
if ordering == SequenceNumberOrderingGap {
// can drop packet if it belongs to the last dropped picture.
// Example:
// o Packet 10 - Picture 11 - TID that should be dropped
// o Packet 11 - missing
// o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped
// If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing
// picture id cache will not have an entry and hence will be dropped.
if extPictureId == v.lastDroppedPictureId {
return nil, ErrFilteredVP8TemporalLayer
} else {
for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ {
v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset)
}
// trim cache if necessary
for v.missingPictureIds.Len() > 50 {
el := v.missingPictureIds.Front()
v.missingPictureIds.Delete(el.Key)
}
}
} else {
if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) {
// adjust only once per picture as a picture could have multiple packets
if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId {
v.lastDroppedPictureId = extPictureId
v.pictureIdOffset += 1
}
return nil, ErrFilteredVP8TemporalLayer
}
}
// in-order incoming picture, may or may not be contiguous.
// In the case of loss (i. e. incoming picture number is not contiguous),
// in-order incoming sequence number, may or may not be contiguous.
// In the case of loss (i. e. incoming sequence number is not contiguous),
// forward even if it is a filtered layer. With temporal scalability,
// it is unclear if the current packet should be dropped if it is not
// contiguous. Hence forward anything that is not contiguous.