diff --git a/go.mod b/go.mod index 79b56e4ef..2a5b35590 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/bep/debounce v1.2.0 github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect + github.com/elliotchance/orderedmap v1.4.0 // indirect github.com/gammazero/deque v0.1.0 github.com/gammazero/workerpool v1.1.2 github.com/go-logr/logr v1.2.0 diff --git a/go.sum b/go.sum index 689ef308d..dd3abe6ec 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4 github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elliotchance/orderedmap v1.4.0 h1:wZtfeEONCbx6in1CZyE6bELEt/vFayMvsxqI5SgsR+A= +github.com/elliotchance/orderedmap v1.4.0/go.mod h1:wsDwEaX5jEoyhbs7x93zk2H/qv0zwuhg4inXhDkYqys= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0e71ad1ca..b2f1e5760 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/elliotchance/orderedmap" + "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/sdp/v3" @@ -30,6 +32,15 @@ const ( RTPBlankFramesMax = 6 ) +type SequenceNumberOrdering int + +const ( + SequenceNumberOrderingContiguous SequenceNumberOrdering = iota + SequenceNumberOrderingOutOfOrder + SequenceNumberOrderingGap + SequenceNumberOrderingUnknown +) + var ( ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") @@ -816,12 +827,19 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { d.reSync.set(false) } + newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) + if err != nil { + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return nil + } + + d.pktsDropped.add(1) + return err + } + payload := extPkt.Packet.Payload - var ( - translatedVP8 *buffer.VP8 - err error - ) + var translatedVP8 *buffer.VP8 if d.vp8Munger != nil && len(payload) > 0 { // LK-TODO-START // Errors below do not update sequence number. That is a problem if the stream @@ -830,7 +848,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -858,15 +876,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { } } - newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head) if meta != nil && translatedVP8 != nil { @@ -968,12 +977,19 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } + newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) + if err != nil { + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return nil + } + + d.pktsDropped.add(1) + return err + } + payload := extPkt.Packet.Payload - var ( - translatedVP8 *buffer.VP8 - err error - ) + var translatedVP8 *buffer.VP8 if d.vp8Munger != nil && len(payload) > 0 { // LK-TODO-START // Errors below do not update sequence number. That is a problem if the stream @@ -982,7 +998,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err // that, the sequence numbers should be updated to ensure that subsequent packet // translations works fine and produce proper translated sequence numbers. // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, d.temporalLayer.get()>>16) + translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.temporalLayer.get()>>16) if err != nil { if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { @@ -1010,15 +1026,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } - newSN, newTS, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head) if meta != nil && translatedVP8 != nil { @@ -1397,6 +1404,9 @@ func (m *Munger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, ts m.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 m.snOffset = extPkt.Packet.SequenceNumber - m.lastSN - snAdjust m.tsOffset = extPkt.Packet.Timestamp - m.lastTS - tsAdjust + + // clear incoming missing sequence numbers on layer switch + m.missingSNs = make(map[uint16]uint16, 10) } func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) { @@ -1411,7 +1421,7 @@ func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) { m.snOffset += 1 } -func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, error) { +func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, SequenceNumberOrdering, error) { m.lock.Lock() defer m.lock.Unlock() @@ -1419,16 +1429,19 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err if !extPkt.Head { snOffset, ok := m.missingSNs[extPkt.Packet.SequenceNumber] if !ok { - return 0, 0, ErrOutOfOrderSequenceNumberCacheMiss + return 0, 0, SequenceNumberOrderingOutOfOrder, ErrOutOfOrderSequenceNumberCacheMiss } delete(m.missingSNs, extPkt.Packet.SequenceNumber) - return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, nil + return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, SequenceNumberOrderingOutOfOrder, nil } + ordering := SequenceNumberOrderingContiguous + // if there are gaps, record it in missing sequence number cache diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN if diff > 1 { + ordering = SequenceNumberOrderingGap var lossStartSN, lossEndSN int lossStartSN = int(m.highestIncomingSN) + 1 if extPkt.Packet.SequenceNumber > m.highestIncomingSN { @@ -1442,7 +1455,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err } else { // can get duplicate packet due to FEC if diff == 0 { - return 0, 0, ErrDuplicatePacket + return 0, 0, SequenceNumberOrderingUnknown, ErrDuplicatePacket } // if padding only packet, can be dropped and sequence number adjusted @@ -1452,7 +1465,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err if len(extPkt.Packet.Payload) == 0 { m.highestIncomingSN = extPkt.Packet.SequenceNumber m.snOffset += 1 - return 0, 0, ErrPaddingOnlyPacket + return 0, 0, SequenceNumberOrderingContiguous, ErrPaddingOnlyPacket } } @@ -1470,7 +1483,7 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err m.lastTS = mungedTS m.lastMarker = extPkt.Packet.Marker - return mungedSN, mungedTS, nil + return mungedSN, mungedTS, ordering, nil } func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) { @@ -1517,7 +1530,8 @@ type VP8MungerParams struct { keyIdxOffset uint8 keyIdxUsed int - missingPictureIds map[int32]int32 + missingPictureIds *orderedmap.OrderedMap + lastDroppedPictureId int32 } type VP8Munger struct { @@ -1528,7 +1542,8 @@ type VP8Munger struct { func NewVP8Munger() *VP8Munger { return &VP8Munger{VP8MungerParams: VP8MungerParams{ - missingPictureIds: make(map[int32]int32, 10), + missingPictureIds: orderedmap.NewOrderedMap(), + lastDroppedPictureId: -1, }} } @@ -1558,6 +1573,8 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { if v.keyIdxUsed == 1 { v.lastKeyIdx = vp8.KEYIDX } + + v.lastDroppedPictureId = -1 } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { @@ -1581,9 +1598,14 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { if v.keyIdxUsed == 1 { v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f } + + // clear missing picture ids on layer switch + v.missingPictureIds = orderedmap.NewOrderedMap() + + v.lastDroppedPictureId = -1 } -func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) { +func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*buffer.VP8, error) { v.lock.Lock() defer v.lock.Unlock() @@ -1596,12 +1618,17 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 // if out-of-order, look up missing picture id cache if !newer { - pictureIdOffset, ok := v.missingPictureIds[extPictureId] + value, ok := v.missingPictureIds.Get(extPictureId) if !ok { return nil, ErrOutOfOrderVP8PictureIdCacheMiss } + pictureIdOffset := value.(int32) - delete(v.missingPictureIds, extPictureId) + // the out-of-order picture id cannot be deleted from the cache + // as there could more than one packet in a picture and more + // than one packet of a picture could come out-of-order. + // To prevent picture id cache from growing, it is truncated + // when it reaches a certain size. mungedPictureId := uint16((extPictureId - pictureIdOffset) & 0x7fff) vp8Packet := &buffer.VP8{ @@ -1625,25 +1652,53 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 prevMaxPictureId := v.pictureIdWrapHandler.MaxPictureId() v.pictureIdWrapHandler.UpdateMaxPictureId(extPictureId, vp8.MBit) - // if there are gaps, record it in missing picture id cache - // check for > 1 as consecutive packets can have the same picture ID, - // i. e. one picture composed of multiple packets - if extPictureId-prevMaxPictureId > 1 { - for lostPictureId := prevMaxPictureId + 1; lostPictureId < extPictureId; lostPictureId++ { - v.missingPictureIds[lostPictureId] = v.pictureIdOffset + // if there is a gap in sequence number, record possible pictures that + // the missing packets can belong to in missing picture id cache. + // The missing picture cache should contain the previous picture id + // and the current picture id and all the intervening pictures. + // This is to handle a scenario as follows + // o Packet 10 -> Picture ID 10 + // o Packet 11 -> missing + // o Packet 12 -> Picture ID 11 + // In this case, Packet 11 could belong to either Picture ID 10 (last packet of that picture) + // or Picture ID 11 (first packet of the current picture). Although in this simple case, + // it is possible to deduce that (for example by looking at previous packet's RTP marker + // and check if that was the last packet of Picture 10), it could get complicated when + // the gap is larger. + if ordering == SequenceNumberOrderingGap { + // can drop packet if it belongs to the last dropped picture. + // Example: + // o Packet 10 - Picture 11 - TID that should be dropped + // o Packet 11 - missing + // o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped + // If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing + // picture id cache will not have an entry and hence will be dropped. + if extPictureId == v.lastDroppedPictureId { + return nil, ErrFilteredVP8TemporalLayer + } else { + for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ { + v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset) + } + + // trim cache if necessary + for v.missingPictureIds.Len() > 50 { + el := v.missingPictureIds.Front() + v.missingPictureIds.Delete(el.Key) + } } } else { if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) { // adjust only once per picture as a picture could have multiple packets if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId { + v.lastDroppedPictureId = extPictureId v.pictureIdOffset += 1 } return nil, ErrFilteredVP8TemporalLayer } } - // in-order incoming picture, may or may not be contiguous. - // In the case of loss (i. e. incoming picture number is not contiguous), + // in-order incoming sequence number, may or may not be contiguous. + // In the case of loss (i. e. incoming sequence number is not contiguous), // forward even if it is a filtered layer. With temporal scalability, // it is unclear if the current packet should be dropped if it is not // contiguous. Hence forward anything that is not contiguous.