mirror of
https://github.com/livekit/livekit.git
synced 2026-07-04 04:32:11 +00:00
Reduce heap for dependency descriptor in forwarding path. (#2496)
* Reduce heap for dependency descriptor in forwarding path. Marshaled dependency descriptor is held in sequencer adding heap objcts. Store DD bytes in sequencer to avoid heap usage. Also, accomodating over sized objects via storing in slice and using it in case the bytes do not fit in the internal array. NOTE: Marshal DD still does a make([]byte...), but I think it should be on the stack given the short use of it. Have to verify. * fix test and also add cases for oversized codec/dd bytes
This commit is contained in:
+17
-4
@@ -1660,9 +1660,22 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
|
||||
|
||||
poolEntity := PacketFactory.Get().(*[]byte)
|
||||
payload := *poolEntity
|
||||
copy(payload, epm.codecBytes[:epm.numCodecBytesOut])
|
||||
copy(payload[epm.numCodecBytesOut:], pkt.Payload[epm.numCodecBytesIn:])
|
||||
payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)]
|
||||
if len(epm.codecBytesSlice) != 0 {
|
||||
n := copy(payload, epm.codecBytesSlice)
|
||||
m := copy(payload[n:], pkt.Payload[epm.numCodecBytesIn:])
|
||||
payload = payload[:n+m]
|
||||
} else {
|
||||
copy(payload, epm.codecBytes[:epm.numCodecBytesOut])
|
||||
copy(payload[epm.numCodecBytesOut:], pkt.Payload[epm.numCodecBytesIn:])
|
||||
payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)]
|
||||
}
|
||||
|
||||
var ddBytes []byte
|
||||
if len(epm.ddBytesSlice) != 0 {
|
||||
ddBytes = epm.ddBytesSlice
|
||||
} else {
|
||||
ddBytes = epm.ddBytes[:epm.ddBytesSize]
|
||||
}
|
||||
|
||||
d.sendingPacket(
|
||||
&pkt.Header,
|
||||
@@ -1677,7 +1690,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
|
||||
)
|
||||
d.pacer.Enqueue(pacer.Packet{
|
||||
Header: &pkt.Header,
|
||||
Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: epm.ddBytes}},
|
||||
Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: ddBytes}},
|
||||
Payload: payload,
|
||||
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
|
||||
TransportWideExtID: uint8(d.transportWideExtID),
|
||||
|
||||
+18
-9
@@ -70,8 +70,11 @@ type packetMeta struct {
|
||||
codecBytes [8]byte
|
||||
numCodecBytesIn uint8
|
||||
numCodecBytesOut uint8
|
||||
codecBytesSlice []byte
|
||||
// Dependency Descriptor of packet
|
||||
ddBytes []byte
|
||||
ddBytes [8]byte
|
||||
ddBytesSize uint8
|
||||
ddBytesSlice []byte
|
||||
}
|
||||
|
||||
type extPacketMeta struct {
|
||||
@@ -199,17 +202,23 @@ func (s *sequencer) push(
|
||||
marker: marker,
|
||||
layer: layer,
|
||||
numCodecBytesIn: uint8(numCodecBytesIn),
|
||||
ddBytes: append([]byte{}, ddBytes...),
|
||||
lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission
|
||||
}
|
||||
pm := &s.meta[slot]
|
||||
|
||||
pm.numCodecBytesOut = uint8(len(codecBytes))
|
||||
if pm.numCodecBytesOut > uint8(len(pm.codecBytes)) {
|
||||
s.logger.Errorw("codec bytes too large", nil, "need", pm.numCodecBytesOut, "bufSize", len(pm.codecBytes))
|
||||
s.invalidateSlot(int(slot))
|
||||
return
|
||||
if len(codecBytes) > len(pm.codecBytes) {
|
||||
pm.codecBytesSlice = append([]byte{}, codecBytes...)
|
||||
} else {
|
||||
copy(pm.codecBytes[:pm.numCodecBytesOut], codecBytes)
|
||||
}
|
||||
|
||||
pm.ddBytesSize = uint8(len(ddBytes))
|
||||
if len(ddBytes) > len(pm.ddBytes) {
|
||||
pm.ddBytesSlice = append([]byte{}, ddBytes...)
|
||||
} else {
|
||||
copy(pm.ddBytes[:pm.ddBytesSize], ddBytes)
|
||||
}
|
||||
copy(pm.codecBytes[:pm.numCodecBytesOut], codecBytes)
|
||||
|
||||
if extModifiedSN > s.extHighestSN {
|
||||
s.extHighestSN = extModifiedSN
|
||||
@@ -333,8 +342,8 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
|
||||
extSequenceNumber: extSN,
|
||||
extTimestamp: extTS,
|
||||
}
|
||||
copy(epm.codecBytes[:epm.numCodecBytesOut], meta.codecBytes[:epm.numCodecBytesOut])
|
||||
epm.ddBytes = append([]byte{}, meta.ddBytes...)
|
||||
epm.codecBytesSlice = append([]byte{}, meta.codecBytesSlice...)
|
||||
epm.ddBytesSlice = append([]byte{}, meta.ddBytesSlice...)
|
||||
extPacketMetas = append(extPacketMetas, epm)
|
||||
}
|
||||
}
|
||||
|
||||
+86
-16
@@ -95,8 +95,10 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
|
||||
numCodecBytesInOdd int
|
||||
codecBytesEven []byte
|
||||
numCodecBytesInEven int
|
||||
codecBytesOversized []byte
|
||||
ddBytesOdd []byte
|
||||
ddBytesEven []byte
|
||||
ddBytesOversized []byte
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@@ -126,8 +128,10 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
|
||||
numCodecBytesInOdd: 3,
|
||||
codecBytesEven: []byte{5, 6, 7},
|
||||
numCodecBytesInEven: 4,
|
||||
codecBytesOversized: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
ddBytesOdd: []byte{8, 9, 10},
|
||||
ddBytesEven: []byte{11, 12},
|
||||
ddBytesOversized: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19},
|
||||
},
|
||||
args: args{
|
||||
seqNo: []uint16{65526 + 5, 65527 + 5, 65530 + 5, 0 /* 65531 input */, 1 /* 65532 input */, 2 /* 65533 input */, 3 /* 65534 input */},
|
||||
@@ -147,10 +151,44 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
|
||||
if i.isPadding {
|
||||
n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset)
|
||||
} else {
|
||||
if i.seqNo%2 == 0 {
|
||||
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven)
|
||||
if i.seqNo%5 == 0 {
|
||||
n.push(
|
||||
time.Now(),
|
||||
i.seqNo,
|
||||
i.seqNo+tt.fields.offset,
|
||||
123,
|
||||
tt.fields.markerOdd,
|
||||
3,
|
||||
tt.fields.codecBytesOversized,
|
||||
len(tt.fields.codecBytesOversized),
|
||||
tt.fields.ddBytesOversized,
|
||||
)
|
||||
} else {
|
||||
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd)
|
||||
if i.seqNo%2 == 0 {
|
||||
n.push(
|
||||
time.Now(),
|
||||
i.seqNo,
|
||||
i.seqNo+tt.fields.offset,
|
||||
123,
|
||||
tt.fields.markerEven,
|
||||
3,
|
||||
tt.fields.codecBytesEven,
|
||||
tt.fields.numCodecBytesInEven,
|
||||
tt.fields.ddBytesEven,
|
||||
)
|
||||
} else {
|
||||
n.push(
|
||||
time.Now(),
|
||||
i.seqNo,
|
||||
i.seqNo+tt.fields.offset,
|
||||
123,
|
||||
tt.fields.markerOdd,
|
||||
3,
|
||||
tt.fields.codecBytesOdd,
|
||||
tt.fields.numCodecBytesInOdd,
|
||||
tt.fields.ddBytesOdd,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,16 +198,26 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
|
||||
var got []uint16
|
||||
for _, sn := range g {
|
||||
got = append(got, sn.sourceSeqNo)
|
||||
if sn.sourceSeqNo%2 == 0 {
|
||||
require.Equal(t, tt.fields.markerEven, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes)
|
||||
} else {
|
||||
if sn.sourceSeqNo%5 == 0 {
|
||||
require.Equal(t, tt.fields.markerOdd, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes)
|
||||
require.Equal(t, tt.fields.codecBytesOversized, sn.codecBytesSlice)
|
||||
require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesOversized, sn.ddBytesSlice)
|
||||
require.Equal(t, uint8(len(tt.fields.codecBytesOversized)), sn.ddBytesSize)
|
||||
} else {
|
||||
if sn.sourceSeqNo%2 == 0 {
|
||||
require.Equal(t, tt.fields.markerEven, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize])
|
||||
require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize)
|
||||
} else {
|
||||
require.Equal(t, tt.fields.markerOdd, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize])
|
||||
require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
@@ -248,9 +296,29 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
|
||||
n.pushPadding(i.seqNo+tt.fields.offset, i.seqNo+tt.fields.offset)
|
||||
} else {
|
||||
if i.seqNo%2 == 0 {
|
||||
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerEven, 3, tt.fields.codecBytesEven, tt.fields.numCodecBytesInEven, tt.fields.ddBytesEven)
|
||||
n.push(
|
||||
time.Now(),
|
||||
i.seqNo,
|
||||
i.seqNo+tt.fields.offset,
|
||||
123,
|
||||
tt.fields.markerEven,
|
||||
3,
|
||||
tt.fields.codecBytesEven,
|
||||
tt.fields.numCodecBytesInEven,
|
||||
tt.fields.ddBytesEven,
|
||||
)
|
||||
} else {
|
||||
n.push(time.Now(), i.seqNo, i.seqNo+tt.fields.offset, 123, tt.fields.markerOdd, 3, tt.fields.codecBytesOdd, tt.fields.numCodecBytesInOdd, tt.fields.ddBytesOdd)
|
||||
n.push(
|
||||
time.Now(),
|
||||
i.seqNo,
|
||||
i.seqNo+tt.fields.offset,
|
||||
123,
|
||||
tt.fields.markerOdd,
|
||||
3,
|
||||
tt.fields.codecBytesOdd,
|
||||
tt.fields.numCodecBytesInOdd,
|
||||
tt.fields.ddBytesOdd,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -264,12 +332,14 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
|
||||
require.Equal(t, tt.fields.markerEven, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInEven), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes)
|
||||
require.Equal(t, tt.fields.ddBytesEven, sn.ddBytes[:sn.ddBytesSize])
|
||||
require.Equal(t, uint8(len(tt.fields.ddBytesEven)), sn.ddBytesSize)
|
||||
} else {
|
||||
require.Equal(t, tt.fields.markerOdd, sn.marker)
|
||||
require.Equal(t, tt.fields.codecBytesOdd, sn.codecBytes[:sn.numCodecBytesOut])
|
||||
require.Equal(t, uint8(tt.fields.numCodecBytesInOdd), sn.numCodecBytesIn)
|
||||
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes)
|
||||
require.Equal(t, tt.fields.ddBytesOdd, sn.ddBytes[:sn.ddBytesSize])
|
||||
require.Equal(t, uint8(len(tt.fields.ddBytesOdd)), sn.ddBytesSize)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
|
||||
Reference in New Issue
Block a user