Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-09-08 23:49:59 +05:30
7 changed files with 391 additions and 244 deletions
+7 -7
View File
@@ -1133,17 +1133,17 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
fracLost = proxyFracLost
}
var dlsr uint32
if r.srNewest != nil && !r.srNewest.At.IsZero() {
delayMS := uint32(time.Since(r.srNewest.At).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
lastSR := uint32(0)
dlsr := uint32(0)
if r.srNewest != nil {
lastSR = uint32(r.srNewest.NTPTimestamp >> 16)
if !r.srNewest.At.IsZero() {
delayMS := uint32(time.Since(r.srNewest.At).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
+22 -27
View File
@@ -71,8 +71,6 @@ const (
keyFrameIntervalMax = 1000
flushTimeout = 1 * time.Second
maxPadding = 2000
waitBeforeSendPaddingOnMute = 100 * time.Millisecond
maxPaddingOnMuteDuration = 5 * time.Second
)
@@ -393,11 +391,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.rtcpReader = rr
}
if d.kind == webrtc.RTPCodecTypeAudio {
d.sequencer = newSequencer(d.params.MaxTrack, 0, d.params.Logger)
} else {
d.sequencer = newSequencer(d.params.MaxTrack, maxPadding, d.params.Logger)
}
d.sequencer = newSequencer(d.params.MaxTrack, d.kind == webrtc.RTPCodecTypeVideo, d.params.Logger)
d.codec = codec.RTPCodecCapability
if d.onBinding != nil {
@@ -710,9 +704,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
if d.sequencer != nil {
d.sequencer.push(
extPkt.Packet.SequenceNumber,
tp.rtp.sequenceNumber,
tp.rtp.timestamp,
extPkt.Arrival,
extPkt.ExtSequenceNumber,
tp.rtp.extSequenceNumber,
tp.rtp.extTimestamp,
hdr.Marker,
int8(layer),
tp.codecBytes,
@@ -786,6 +781,15 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
return 0
}
//
// Register with sequencer as padding only so that NACKs for these can be filtered out.
// Retransmission is probably a sign of network congestion/badness.
// So, retransmitting padding only packets is only going to make matters worse.
//
if d.sequencer != nil {
d.sequencer.pushPadding(snts[0].extSequenceNumber, snts[len(snts)-1].extSequenceNumber)
}
bytesSent := 0
for i := 0; i < len(snts); i++ {
hdr := rtp.Header{
@@ -793,8 +797,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
Padding: true,
Marker: false,
PayloadType: d.payloadType,
SequenceNumber: snts[i].sequenceNumber,
Timestamp: snts[i].timestamp,
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
@@ -816,15 +820,6 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
OnSent: d.packetSent,
})
//
// Register with sequencer with invalid layer so that NACKs for these can be filtered out.
// Retransmission is probably a sign of network congestion/badness.
// So, retransmitting padding packets is only going to make matters worse.
//
if d.sequencer != nil {
d.sequencer.pushPadding(hdr.SequenceNumber)
}
bytesSent += hdr.MarshalSize() + len(payload)
}
@@ -1298,8 +1293,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
Padding: false,
Marker: true,
PayloadType: d.payloadType,
SequenceNumber: snts[i].sequenceNumber,
Timestamp: snts[i].timestamp,
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
@@ -1637,8 +1632,8 @@ func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *Transla
tpRTP := tp.rtp
hdr := extPkt.Packet.Header
hdr.PayloadType = d.payloadType
hdr.Timestamp = tpRTP.timestamp
hdr.SequenceNumber = tpRTP.sequenceNumber
hdr.Timestamp = uint32(tpRTP.extTimestamp)
hdr.SequenceNumber = uint16(tpRTP.extSequenceNumber)
hdr.SSRC = d.ssrc
if tp.marker {
hdr.Marker = tp.marker
@@ -1785,8 +1780,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
Padding: false,
Marker: true,
PayloadType: d.payloadType,
SequenceNumber: snts[i].sequenceNumber,
Timestamp: snts[i].timestamp,
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
+47 -47
View File
@@ -1194,9 +1194,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
// should lock onto the first packet
expectedTP := TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23333,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23333,
extTimestamp: 0xabcdef,
},
}
actualTP, err := f.GetTranslationParams(extPkt, 0)
@@ -1227,9 +1227,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23331,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23331,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1262,9 +1262,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23333,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23333,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1281,9 +1281,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 23335,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 23335,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1301,9 +1301,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23334,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23334,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1321,9 +1321,9 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23336,
timestamp: 0xabcdf0,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23336,
extTimestamp: 0xabcdf0,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1417,9 +1417,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
isSwitching: true,
isResuming: true,
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23333,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23333,
extTimestamp: 0xabcdef,
},
codecBytes: marshalledVP8,
marker: true,
@@ -1495,9 +1495,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
require.NoError(t, err)
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23334,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23334,
extTimestamp: 0xabcdef,
},
codecBytes: marshalledVP8,
}
@@ -1548,9 +1548,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
require.NoError(t, err)
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23335,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23335,
extTimestamp: 0xabcdef,
},
codecBytes: marshalledVP8,
}
@@ -1630,9 +1630,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
require.NoError(t, err)
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23336,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23336,
extTimestamp: 0xabcdef,
},
codecBytes: marshalledVP8,
}
@@ -1650,9 +1650,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 23338,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 23338,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1669,9 +1669,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23337,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23337,
extTimestamp: 0xabcdef,
},
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
@@ -1728,9 +1728,9 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
expectedTP = TranslationParams{
isSwitching: true,
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23339,
timestamp: 0xabcdf0,
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23339,
extTimestamp: 0xabcdf0,
},
codecBytes: marshalledVP8,
}
@@ -1788,8 +1788,8 @@ func TestForwarderGetSnTsForPadding(t *testing.T) {
var sntsExpected = make([]SnTs, numPadding)
for i := 0; i < numPadding; i++ {
sntsExpected[i] = SnTs{
sequenceNumber: 23333 + uint16(i) + 1,
timestamp: 0xabcdef + (uint32(i)*clockRate)/frameRate,
extSequenceNumber: 23333 + uint64(i) + 1,
extTimestamp: 0xabcdef + (uint64(i)*uint64(clockRate))/uint64(frameRate),
}
}
require.Equal(t, sntsExpected, snts)
@@ -1800,8 +1800,8 @@ func TestForwarderGetSnTsForPadding(t *testing.T) {
for i := 0; i < numPadding; i++ {
sntsExpected[i] = SnTs{
sequenceNumber: 23338 + uint16(i) + 1,
timestamp: 0xabcdef + (uint32(i+1)*clockRate)/frameRate,
extSequenceNumber: 23338 + uint64(i) + 1,
extTimestamp: 0xabcdef + (uint64(i+1)*uint64(clockRate))/uint64(frameRate),
}
}
require.Equal(t, sntsExpected, snts)
@@ -1861,8 +1861,8 @@ func TestForwarderGetSnTsForBlankFrames(t *testing.T) {
ts = params.Timestamp + 1 + ((uint32(i)*clockRate)+frameRate-1)/frameRate
}
sntsExpected[i] = SnTs{
sequenceNumber: params.SequenceNumber + uint16(i) + 1,
timestamp: ts,
extSequenceNumber: uint64(params.SequenceNumber) + uint64(i) + 1,
extTimestamp: uint64(ts),
}
}
require.Equal(t, sntsExpected, snts)
@@ -1873,9 +1873,9 @@ func TestForwarderGetSnTsForBlankFrames(t *testing.T) {
sntsExpected = sntsExpected[:numPadding]
for i := 0; i < numPadding; i++ {
sntsExpected[i] = SnTs{
sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1,
extSequenceNumber: uint64(params.SequenceNumber) + uint64(len(snts)) + uint64(i) + 1,
// +1 here due to expected time stamp bumpint by at least one so that time stamp is always moving ahead
timestamp: snts[len(snts)-1].timestamp + 1 + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate,
extTimestamp: snts[len(snts)-1].extTimestamp + 1 + ((uint64(i+1)*uint64(clockRate))+uint64(frameRate)-1)/uint64(frameRate),
}
}
snts, frameEndNeeded, err = f.GetSnTsForBlankFrames(30, numBlankFrames)
+15 -15
View File
@@ -40,14 +40,14 @@ const (
)
type TranslationParamsRTP struct {
snOrdering SequenceNumberOrdering
sequenceNumber uint16
timestamp uint32
snOrdering SequenceNumberOrdering
extSequenceNumber uint64
extTimestamp uint64
}
type SnTs struct {
sequenceNumber uint16
timestamp uint32
extSequenceNumber uint64
extTimestamp uint64
}
// ----------------------------------------------------------------------
@@ -188,9 +188,9 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: uint16(extMungedSN),
timestamp: uint32(extMungedTS),
snOrdering: ordering,
extSequenceNumber: extMungedSN,
extTimestamp: extMungedTS,
}, nil
}
@@ -204,9 +204,9 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset),
timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset),
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: extPkt.ExtSequenceNumber - snOffset,
extTimestamp: extPkt.ExtTimestamp - r.tsOffset,
}, nil
}
@@ -268,21 +268,21 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
extLastSN++
vals[i].sequenceNumber = uint16(extLastSN)
vals[i].extSequenceNumber = extLastSN
if frameRate != 0 {
if useLastTSForFirst && i == 0 {
vals[i].timestamp = uint32(r.extLastTS)
vals[i].extTimestamp = r.extLastTS
} else {
ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate)
if int64(ets-extLastTS) <= 0 {
ets = extLastTS + 1
}
extLastTS = ets
vals[i].timestamp = uint32(ets)
vals[i].extTimestamp = ets
}
} else {
vals[i].timestamp = uint32(r.extLastTS)
vals[i].extTimestamp = r.extLastTS
}
}
+29 -29
View File
@@ -173,9 +173,9 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected := TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23331,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 23331,
extTimestamp: 0xabcdef,
}
tp, err := r.UpdateAndGetSnTs(extPkt)
@@ -258,9 +258,9 @@ func TestPaddingOnlyPacket(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 23334,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 23334,
extTimestamp: 0xabcdef,
}
tp, err = r.UpdateAndGetSnTs(extPkt)
@@ -299,9 +299,9 @@ func TestGapInSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected := TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 1,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 65536 + 1,
extTimestamp: 0xabcdef,
}
tp, err := r.UpdateAndGetSnTs(extPkt)
@@ -352,9 +352,9 @@ func TestGapInSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 3,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 65536 + 3,
extTimestamp: 0xabcdef,
}
tp, err = r.UpdateAndGetSnTs(extPkt)
@@ -403,9 +403,9 @@ func TestGapInSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 5,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingGap,
extSequenceNumber: 65536 + 5,
extTimestamp: 0xabcdef,
}
tp, err = r.UpdateAndGetSnTs(extPkt)
@@ -436,9 +436,9 @@ func TestGapInSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 4,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 65536 + 4,
extTimestamp: 0xabcdef,
}
tp, err = r.UpdateAndGetSnTs(extPkt)
@@ -459,9 +459,9 @@ func TestGapInSequenceNumber(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 2,
timestamp: 0xabcdef,
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: 65536 + 2,
extTimestamp: 0xabcdef,
}
tp, err = r.UpdateAndGetSnTs(extPkt)
@@ -494,27 +494,27 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
// forcing a marker should not error out.
// And timestamp on first padding should be the same as the last one.
numPadding := 10
clockRate := uint32(10)
frameRate := uint32(5)
clockRate := uint64(10)
frameRate := uint64(5)
var sntsExpected = make([]SnTs, numPadding)
for i := 0; i < numPadding; i++ {
sntsExpected[i] = SnTs{
sequenceNumber: params.SequenceNumber + uint16(i) + 1,
timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate,
extSequenceNumber: uint64(params.SequenceNumber) + uint64(i) + 1,
extTimestamp: uint64(params.Timestamp) + ((uint64(i)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp)
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, uint32(clockRate), uint32(frameRate), true, extPkt.ExtTimestamp)
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
// now that there is a marker, timestamp should jump on first padding when asked again
for i := 0; i < numPadding; i++ {
sntsExpected[i] = SnTs{
sequenceNumber: params.SequenceNumber + uint16(len(snts)) + uint16(i) + 1,
timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate,
extSequenceNumber: uint64(params.SequenceNumber) + uint64(len(snts)) + uint64(i) + 1,
extTimestamp: snts[len(snts)-1].extTimestamp + ((uint64(i+1)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp))
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, uint32(clockRate), uint32(frameRate), false, snts[len(snts)-1].extTimestamp)
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
}
+138 -96
View File
@@ -19,6 +19,7 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/logger"
)
@@ -74,28 +75,30 @@ type packetMeta struct {
// Sequencer stores the packet sequence received by the down track
type sequencer struct {
sync.Mutex
init bool
max int
seq []*packetMeta
meta []packetMeta
metaWritePtr int
step int
headSN uint16
size int
startTime int64
initialized bool
extHighestSN uint64
snOffset uint64
meta []packetMeta
snRangeMap *utils.RangeMap[uint64, uint64]
rtt uint32
logger logger.Logger
}
func newSequencer(maxTrack int, maxPadding int, logger logger.Logger) *sequencer {
return &sequencer{
startTime: time.Now().UnixNano() / 1e6,
max: maxTrack + maxPadding,
seq: make([]*packetMeta, maxTrack+maxPadding),
meta: make([]packetMeta, maxTrack),
metaWritePtr: 0,
rtt: defaultRtt,
logger: logger,
func newSequencer(size int, maybeSparse bool, logger logger.Logger) *sequencer {
s := &sequencer{
size: size,
startTime: time.Now().UnixMilli(),
meta: make([]packetMeta, size),
rtt: defaultRtt,
logger: logger,
}
if maybeSparse {
s.snRangeMap = utils.NewRangeMap[uint64, uint64]((size + 1) / 2) // assume run lengths of at least 2 in between padding bursts
}
return s
}
func (s *sequencer) setRTT(rtt uint32) {
@@ -110,8 +113,9 @@ func (s *sequencer) setRTT(rtt uint32) {
}
func (s *sequencer) push(
sn, offSn uint16,
timeStamp uint32,
packetTime time.Time,
extIncomingSN, extModifiedSN uint64,
extModifiedTS uint64,
marker bool,
layer int8,
codecBytes []byte,
@@ -120,125 +124,163 @@ func (s *sequencer) push(
s.Lock()
defer s.Unlock()
slot, isValid := s.getSlot(offSn)
if !isValid {
return
if !s.initialized {
s.extHighestSN = extModifiedSN - 1
s.updateSNOffset()
}
s.meta[s.metaWritePtr] = packetMeta{
sourceSeqNo: sn,
targetSeqNo: offSn,
timestamp: timeStamp,
snOffset := s.snOffset
diff := int64(extModifiedSN - s.extHighestSN)
if diff >= 0 {
s.extHighestSN = extModifiedSN
} else {
if diff < -int64(s.size) {
s.logger.Debugw("old packet, can not be sequenced", "extHighestSN", s.extHighestSN, "extModifiedSN", extModifiedSN)
return
}
if s.snRangeMap != nil {
var err error
snOffset, err = s.snRangeMap.GetValue(extModifiedSN)
if err != nil {
s.logger.Errorw("could not get sequence number offset", err, "extIncomingSN", extIncomingSN, "extModifiedSn", extModifiedSN)
return
}
}
}
slot := (extModifiedSN - snOffset) % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: uint16(extIncomingSN),
targetSeqNo: uint16(extModifiedSN),
timestamp: uint32(extModifiedTS),
marker: marker,
layer: layer,
codecBytes: append([]byte{}, codecBytes...),
ddBytes: append([]byte{}, ddBytes...),
lastNack: s.getRefTime(), // delay retransmissions after the original transmission
}
s.seq[slot] = &s.meta[s.metaWritePtr]
s.metaWritePtr++
if s.metaWritePtr >= len(s.meta) {
s.metaWritePtr -= len(s.meta)
lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission
}
}
func (s *sequencer) pushPadding(offSn uint16) {
func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive uint64) {
s.Lock()
defer s.Unlock()
slot, isValid := s.getSlot(offSn)
if !isValid {
if s.snRangeMap == nil {
return
}
s.seq[slot] = nil
}
if extStartSNInclusive <= s.extHighestSN {
// a higher sequence number has already been recorded with an offset,
// adding an exclusion range before the highest means the offset of sequence numbers
// after the exclusion range will be affected and all those higher sequence numbers
// need to be patched.
//
// Not recording exclusion range means a few slots (of the size of exclusion range)
// are wasted in this cycle. That should be fine as the exclusion ranges should be
// a few packets at a time.
s.logger.Warnw("cannot exclude old range", nil, "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive)
func (s *sequencer) getSlot(offSn uint16) (int, bool) {
if !s.init {
s.headSN = offSn - 1
s.init = true
}
// if exclusion range is before what has already been sequenced, invalidate exclusion range slots
for sn := extStartSNInclusive; sn != extEndSNInclusive+1; sn++ {
diff := int64(sn - s.extHighestSN)
if diff >= 0 || diff < -int64(s.size) {
// too old OR too new (too new should not happen, just be safe)
continue
}
diff := offSn - s.headSN
if diff == 0 {
// duplicate
return 0, false
}
snOffset, err := s.snRangeMap.GetValue(sn)
if err != nil {
s.logger.Errorw("could not get sequence number offset", err, "sn", sn)
continue
}
slot := 0
if diff > (1 << 15) {
// out-of-order
back := int(s.headSN - offSn)
if back >= s.max {
s.logger.Debugw("old packet, can not be sequenced", "head", s.headSN, "received", offSn)
return 0, false
slot := (sn - snOffset) % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: 0,
targetSeqNo: 0,
}
}
slot = s.step - back - 1
} else {
s.headSN = offSn
// invalidate intervening slots
for idx := 0; idx < int(diff)-1; idx++ {
s.seq[s.wrap(s.step+idx)] = nil
}
slot = s.step + int(diff) - 1
// for next packet
s.step = s.wrap(s.step + int(diff))
return
}
return s.wrap(slot), true
if err := s.snRangeMap.ExcludeRange(extStartSNInclusive, extEndSNInclusive+1); err != nil {
s.logger.Errorw("could not exclude range", err, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive)
return
}
s.extHighestSN = extEndSNInclusive
s.updateSNOffset()
}
func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta {
s.Lock()
defer s.Unlock()
meta := make([]packetMeta, 0, len(seqNo))
refTime := s.getRefTime()
snOffset := uint64(0)
var err error
packetsMeta := make([]packetMeta, 0, len(seqNo))
refTime := s.getRefTime(time.Now())
highestSN := uint16(s.extHighestSN)
for _, sn := range seqNo {
diff := s.headSN - sn
if diff > (1<<15) || int(diff) >= s.max {
// out-of-order from head (should not happen) or too old
diff := highestSN - sn
if diff > (1 << 15) {
// out-of-order from head (should not happen, just be safe)
continue
}
slot := s.wrap(s.step - int(diff) - 1)
seq := s.seq[slot]
if seq == nil || seq.targetSeqNo != sn {
// find slot by adjusting for padding only packets that were not recorded in sequencer
extSN := uint64(sn) + (s.extHighestSN & 0xFFFF_FFFF_FFFF_0000)
if sn > highestSN {
extSN -= (1 << 16)
}
if s.extHighestSN-extSN >= uint64(s.size) {
// too old
continue
}
if refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) && seq.nacked < maxAck {
seq.nacked++
seq.lastNack = refTime
if s.snRangeMap != nil {
snOffset, err = s.snRangeMap.GetValue(extSN)
if err != nil {
// could be padding packet which is excluded and will not have value
continue
}
}
pm := *seq
pm.codecBytes = append([]byte{}, seq.codecBytes...)
pm.ddBytes = append([]byte{}, seq.ddBytes...)
meta = append(meta, pm)
slot := (extSN - snOffset) % uint64(s.size)
meta := &s.meta[slot]
if meta.targetSeqNo != sn {
continue
}
if meta.nacked < maxAck && refTime-meta.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) {
meta.nacked++
meta.lastNack = refTime
pm := *meta
pm.codecBytes = append([]byte{}, meta.codecBytes...)
pm.ddBytes = append([]byte{}, meta.ddBytes...)
packetsMeta = append(packetsMeta, pm)
}
}
return meta
return packetsMeta
}
func (s *sequencer) wrap(slot int) int {
for slot < 0 {
slot += s.max
func (s *sequencer) getRefTime(at time.Time) uint32 {
return uint32(at.UnixMilli() - s.startTime)
}
func (s *sequencer) updateSNOffset() {
if s.snRangeMap == nil {
return
}
for slot >= s.max {
slot -= s.max
snOffset, err := s.snRangeMap.GetValue(s.extHighestSN + 1)
if err != nil {
s.logger.Errorw("could not update sequence number offset", err, "extHighestSN", s.extHighestSN)
return
}
return slot
}
func (s *sequencer) getRefTime() uint32 {
return uint32(time.Now().UnixNano()/1e6 - s.startTime)
s.snOffset = snOffset
}
+133 -23
View File
@@ -25,15 +25,15 @@ import (
)
func Test_sequencer(t *testing.T) {
seq := newSequencer(500, 0, logger.GetLogger())
seq := newSequencer(500, false, logger.GetLogger())
off := uint16(15)
for i := uint16(1); i < 518; i++ {
seq.push(i, i+off, 123, true, 2, nil, nil)
for i := uint64(1); i < 518; i++ {
seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, nil)
}
// send the last two out-of-order
seq.push(519, 519+off, 123, false, 2, nil, nil)
seq.push(518, 518+off, 123, true, 2, nil, nil)
seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, nil)
seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, nil)
req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517}
res := seq.getPacketsMeta(req)
@@ -59,14 +59,14 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, val.layer, int8(2))
}
seq.push(521, 521+off, 123, true, 1, nil, nil)
seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, nil)
m := seq.getPacketsMeta([]uint16{521 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
m = seq.getPacketsMeta([]uint16{521 + off})
require.Equal(t, 1, len(m))
seq.push(505, 505+off, 123, false, 1, nil, nil)
seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, nil)
m = seq.getPacketsMeta([]uint16{505 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
@@ -74,14 +74,17 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, 1, len(m))
}
func Test_sequencer_getNACKSeqNo(t *testing.T) {
func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
type args struct {
seqNo []uint16
}
type input struct {
seqNo uint64
isPadding bool
}
type fields struct {
input []uint16
padding []uint16
offset uint16
inputs []input
offset uint64
markerOdd bool
markerEven bool
codecBytesOdd []byte
@@ -99,8 +102,17 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) {
{
name: "Should get correct seq numbers",
fields: fields{
input: []uint16{2, 3, 4, 7, 8, 11},
padding: []uint16{9, 10},
inputs: []input{
{65526, false},
{65524, false},
{65525, false},
{65529, false},
{65530, false},
{65531, true},
{65533, false},
{65532, true},
{65534, false},
},
offset: 5,
markerOdd: true,
markerEven: false,
@@ -110,26 +122,124 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) {
ddBytesEven: []byte{11, 12},
},
args: args{
seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5},
seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */},
},
want: []uint16{4, 8, 11},
// although 65526 is originally pushed, that would have been reset by 65532 (padding only packet)
// because of trying to add an exclusion range before highest sequence number which will fail
// and the resulting fix up of the exclusion range slots
want: []uint16{65530, 65533, 65534},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
n := newSequencer(5, 10, logger.GetLogger())
n := newSequencer(5, true, logger.GetLogger())
for _, i := range tt.fields.input {
if i%2 == 0 {
n.push(i, i+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven)
for _, i := range tt.fields.inputs {
if i.isPadding {
n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset)
} else {
n.push(i, i+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd)
if i.seqNo%2 == 0 {
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven)
} else {
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd)
}
}
}
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
g := n.getPacketsMeta(tt.args.seqNo)
var got []uint16
for _, sn := range g {
got = append(got, sn.sourceSeqNo)
if sn.sourceSeqNo%2 == 0 {
require.Equal(t, tt.fields.markerEven, sn.marker)
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes)
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes)
} else {
require.Equal(t, tt.fields.markerOdd, sn.marker)
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes)
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes)
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getPacketsMeta() = %v, want %v", got, tt.want)
}
})
}
}
func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
type args struct {
seqNo []uint16
}
type input struct {
seqNo uint64
isPadding bool
}
type fields struct {
inputs []input
offset uint64
markerOdd bool
markerEven bool
codecBytesOdd []byte
codecBytesEven []byte
ddBytesOdd []byte
ddBytesEven []byte
}
tests := []struct {
name string
fields fields
args args
want []uint16
}{
{
name: "Should get correct seq numbers",
fields: fields{
inputs: []input{
{2, false},
{3, false},
{4, false},
{7, false},
{8, false},
{9, true},
{11, false},
{10, true},
{12, false},
{13, false},
},
offset: 5,
markerOdd: true,
markerEven: false,
codecBytesOdd: []byte{1, 2, 3, 4},
codecBytesEven: []byte{5, 6, 7},
ddBytesOdd: []byte{8, 9, 10},
ddBytesEven: []byte{11, 12},
},
args: args{
seqNo: []uint16{4 + 5, 5 + 5, 8 + 5, 9 + 5, 10 + 5, 11 + 5, 12 + 5},
},
// although 4 and 8 were originally added, they would be too old after a cycle of sequencer buffer
want: []uint16{11, 12},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
n := newSequencer(5, false, logger.GetLogger())
for _, i := range tt.fields.inputs {
if i.isPadding {
n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset)
} else {
if i.seqNo%2 == 0 {
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.ddBytesEven)
} else {
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.ddBytesOdd)
}
}
}
for _, i := range tt.fields.padding {
n.pushPadding(i + tt.fields.offset)
}
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
g := n.getPacketsMeta(tt.args.seqNo)