diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 93ef0c1a2..a4df99776 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1660,9 +1660,22 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { poolEntity := PacketFactory.Get().(*[]byte) payload := *poolEntity - copy(payload, epm.codecBytes[:epm.numCodecBytesOut]) - copy(payload[epm.numCodecBytesOut:], pkt.Payload[epm.numCodecBytesIn:]) - payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] + if len(epm.codecBytesSlice) != 0 { + n := copy(payload, epm.codecBytesSlice) + m := copy(payload[n:], pkt.Payload[epm.numCodecBytesIn:]) + payload = payload[:n+m] + } else { + copy(payload, epm.codecBytes[:epm.numCodecBytesOut]) + copy(payload[epm.numCodecBytesOut:], pkt.Payload[epm.numCodecBytesIn:]) + payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] + } + + var ddBytes []byte + if len(epm.ddBytesSlice) != 0 { + ddBytes = epm.ddBytesSlice + } else { + ddBytes = epm.ddBytes[:epm.ddBytesSize] + } d.sendingPacket( &pkt.Header, @@ -1677,7 +1690,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { ) d.pacer.Enqueue(pacer.Packet{ Header: &pkt.Header, - Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: epm.ddBytes}}, + Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: ddBytes}}, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 4eee083a0..0fafcfc81 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -70,8 +70,11 @@ type packetMeta struct { codecBytes [8]byte numCodecBytesIn uint8 numCodecBytesOut uint8 + codecBytesSlice []byte // Dependency Descriptor of packet - ddBytes []byte + ddBytes [8]byte + ddBytesSize uint8 + ddBytesSlice []byte } type extPacketMeta struct { @@ -199,17 +202,23 @@ func (s *sequencer) push( marker: marker, layer: layer, numCodecBytesIn: uint8(numCodecBytesIn), - ddBytes: append([]byte{}, ddBytes...), lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission } pm := &s.meta[slot] + pm.numCodecBytesOut = uint8(len(codecBytes)) - if pm.numCodecBytesOut > uint8(len(pm.codecBytes)) { - s.logger.Errorw("codec bytes too large", nil, "need", pm.numCodecBytesOut, "bufSize", len(pm.codecBytes)) - s.invalidateSlot(int(slot)) - return + if len(codecBytes) > len(pm.codecBytes) { + pm.codecBytesSlice = append([]byte{}, codecBytes...) + } else { + copy(pm.codecBytes[:pm.numCodecBytesOut], codecBytes) + } + + pm.ddBytesSize = uint8(len(ddBytes)) + if len(ddBytes) > len(pm.ddBytes) { + pm.ddBytesSlice = append([]byte{}, ddBytes...) + } else { + copy(pm.ddBytes[:pm.ddBytesSize], ddBytes) } - copy(pm.codecBytes[:pm.numCodecBytesOut], codecBytes) if extModifiedSN > s.extHighestSN { s.extHighestSN = extModifiedSN @@ -333,8 +342,8 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { extSequenceNumber: extSN, extTimestamp: extTS, } - copy(epm.codecBytes[:epm.numCodecBytesOut], meta.codecBytes[:epm.numCodecBytesOut]) - epm.ddBytes = append([]byte{}, meta.ddBytes...) + epm.codecBytesSlice = append([]byte{}, meta.codecBytesSlice...) + epm.ddBytesSlice = append([]byte{}, meta.ddBytesSlice...) extPacketMetas = append(extPacketMetas, epm) } } diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index 595acf02c..39dcec906 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -95,8 +95,10 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { numCodecBytesInOdd int codecBytesEven []byte numCodecBytesInEven int + codecBytesOversized []byte ddBytesOdd []byte ddBytesEven []byte + ddBytesOversized []byte } tests := []struct { @@ -126,8 +128,10 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { numCodecBytesInOdd: 3, codecBytesEven: []byte{5, 6, 7}, numCodecBytesInEven: 4, + codecBytesOversized: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, ddBytesOdd: []byte{8, 9, 10}, ddBytesEven: []byte{11, 12}, + ddBytesOversized: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19}, }, args: args{ seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */}, @@ -147,10 +151,44 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { if i.isPadding { n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset) } else { - if i.seqNo%2 == 0 { - n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven) + if i.seqNo%5 == 0 { + n.push( + time.Now(), + i.seqNo, + i.seqNo+tt.fields.offset, + 123, + tt.fields.markerOdd, + 3, + tt.fields.codecBytesOversized, + len(tt.fields.codecBytesOversized), + tt.fields.ddBytesOversized, + ) } else { - n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd) + if i.seqNo%2 == 0 { + n.push( + time.Now(), + i.seqNo, + i.seqNo+tt.fields.offset, + 123, + tt.fields.markerEven, + 3, + tt.fields.codecBytesEven, + tt.fields.numCodecBytesInEven, + tt.fields.ddBytesEven, + ) + } else { + n.push( + time.Now(), + i.seqNo, + i.seqNo+tt.fields.offset, + 123, + tt.fields.markerOdd, + 3, + tt.fields.codecBytesOdd, + tt.fields.numCodecBytesInOdd, + tt.fields.ddBytesOdd, + ) + } } } } @@ -160,16 +198,26 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { var got []uint16 for _, sn := range g { got = append(got, sn.sourceSeqNo) - if sn.sourceSeqNo%2 == 0 { - require.Equal(t, tt.fields.markerEven, sn.marker) - require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut]) - require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn) - require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes) - } else { + if sn.sourceSeqNo%5 == 0 { require.Equal(t, tt.fields.markerOdd, sn.marker) - require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut]) - require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn) - require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes) + require.Equal(t, tt.fields.codecBytesOversized, sn.codecBytesSlice) + require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.numCodecBytesIn) + require.Equal(t, tt.fields.ddBytesOversized, sn.ddBytesSlice) + require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.ddBytesSize) + } else { + if sn.sourceSeqNo%2 == 0 { + require.Equal(t, tt.fields.markerEven, sn.marker) + require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut]) + require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn) + require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize]) + require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize) + } else { + require.Equal(t, tt.fields.markerOdd, sn.marker) + require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut]) + require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn) + require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize]) + require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize) + } } } if !reflect.DeepEqual(got, tt.want) { @@ -248,9 +296,29 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset) } else { if i.seqNo%2 == 0 { - n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven) + n.push( + time.Now(), + i.seqNo, + i.seqNo+tt.fields.offset, + 123, + tt.fields.markerEven, + 3, + tt.fields.codecBytesEven, + tt.fields.numCodecBytesInEven, + tt.fields.ddBytesEven, + ) } else { - n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd) + n.push( + time.Now(), + i.seqNo, + i.seqNo+tt.fields.offset, + 123, + tt.fields.markerOdd, + 3, + tt.fields.codecBytesOdd, + tt.fields.numCodecBytesInOdd, + tt.fields.ddBytesOdd, + ) } } } @@ -264,12 +332,14 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { require.Equal(t, tt.fields.markerEven, sn.marker) require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut]) require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn) - require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes) + require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize]) + require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize) } else { require.Equal(t, tt.fields.markerOdd, sn.marker) require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut]) require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn) - require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes) + require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize]) + require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize) } } if !reflect.DeepEqual(got, tt.want) {