From 872b86da3bbfb50693d5539b1565d76f2de07039 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 30 Jul 2023 12:24:58 +0530 Subject: [PATCH] Delay retransmission after first send. (#1918) * Delay retransmission after first send. * test tweak * english --- pkg/sfu/sequencer.go | 9 +++++++-- pkg/sfu/sequencer_test.go | 15 +++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index e0f89d2c7..a2699427b 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -133,6 +133,7 @@ func (s *sequencer) push( layer: layer, codecBytes: append([]byte{}, codecBytes...), ddBytes: append([]byte{}, ddBytes...), + lastNack: s.getRefTime(), // delay retransmissions after the original transmission } s.seq[slot] = &s.meta[s.metaWritePtr] @@ -198,7 +199,7 @@ func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta { defer s.Unlock() meta := make([]packetMeta, 0, len(seqNo)) - refTime := uint32(time.Now().UnixNano()/1e6 - s.startTime) + refTime := s.getRefTime() for _, sn := range seqNo { diff := s.headSN - sn if diff > (1<<15) || int(diff) >= s.max { @@ -212,7 +213,7 @@ func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta { continue } - if (seq.lastNack == 0 || refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt)))) && seq.nacked < maxAck { + if refTime-seq.lastNack > uint32(math.Min(float64(ignoreRetransmission), float64(2*s.rtt))) && seq.nacked < maxAck { seq.nacked++ seq.lastNack = refTime @@ -237,3 +238,7 @@ func (s *sequencer) wrap(slot int) int { return slot } + +func (s *sequencer) getRefTime() uint32 { + return uint32(time.Now().UnixNano()/1e6 - s.startTime) +} diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index a2303742d..8fde0d9e6 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -35,9 +35,13 @@ func Test_sequencer(t *testing.T) { seq.push(519, 519+off, 123, false, 2, nil, nil) seq.push(518, 518+off, 123, true, 2, nil, nil) - time.Sleep(60 * time.Millisecond) req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517} res := seq.getPacketsMeta(req) + // nothing should be returned as not enough time has elapsed since sending packet + require.Equal(t, 0, len(res)) + + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) + res = seq.getPacketsMeta(req) require.Equal(t, len(req), len(res)) for i, val := range res { require.Equal(t, val.targetSeqNo, req[i]) @@ -46,7 +50,7 @@ func Test_sequencer(t *testing.T) { } res = seq.getPacketsMeta(req) require.Equal(t, 0, len(res)) - time.Sleep(150 * time.Millisecond) + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) res = seq.getPacketsMeta(req) require.Equal(t, len(req), len(res)) for i, val := range res { @@ -57,10 +61,16 @@ func Test_sequencer(t *testing.T) { seq.push(521, 521+off, 123, true, 1, nil, nil) m := seq.getPacketsMeta([]uint16{521 + off}) + require.Equal(t, 0, len(m)) + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) + m = seq.getPacketsMeta([]uint16{521 + off}) require.Equal(t, 1, len(m)) seq.push(505, 505+off, 123, false, 1, nil, nil) m = seq.getPacketsMeta([]uint16{505 + off}) + require.Equal(t, 0, len(m)) + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) + m = seq.getPacketsMeta([]uint16{505 + off}) require.Equal(t, 1, len(m)) } @@ -121,6 +131,7 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) { n.pushPadding(i + tt.fields.offset) } + time.Sleep((ignoreRetransmission + 10) * time.Millisecond) g := n.getPacketsMeta(tt.args.seqNo) var got []uint16 for _, sn := range g {