From 41fbcec2cd44411be5f5daef209afd2ea8c294a1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 12 Nov 2024 10:53:57 +0530 Subject: [PATCH] Fix header size calculation in stats. (#3171) * Fix header size calculation in stats. With pacer inserting some extensions, the header size used in stats (and more impoetantly when probing for bandwidth estimation and metering the bytes to control the probes) was incorrect. The size was effectively was that of incoming extensions. It would have been close enough though. Anyhow, a bit of history - initially was planning on packaging all the necessary fields into pacer packet and pacer would callback after sending, but that was not great for a couple of reasons - had to send in a bunch of useless data (as far as pacer is concerned) into pacer. - callback every packet (this is not bad, just a function call which happens in the foward path too, but had to lug around the above data). - in the forward path, there is a very edge case issue when calling stats update after pacer.Enqueue() - details in https://github.com/livekit/livekit/pull/2085, but that is a rare case. Because of those reasons, the update was placed in the forward path before enqueue, but did not notice the header size issue till now. As a compromise, `pacer.Enqueue` returns the headerSize and payloadSize. It uses a dummy header to calculate size. Real extension will be added just before sending packet on the wire. pion/rtp replaces extension if one is already present. So, the dummy would be replaced by the real one before sending on the wire. https://github.com/pion/rtp/blob/a21194ecfb5362261a0dc4af1f68e4a8944df345/packet.go#L398 This does introduce back the second rare edge case, but that is very rare and even if it happens, not catastrophic. * cleanup * add extensions and dummy as well in downtrack to make pacer cleaner --- pkg/sfu/downtrack.go | 300 ++++++++++++++++------------------ pkg/sfu/pacer/base.go | 19 +-- pkg/sfu/pacer/leaky_bucket.go | 36 ++-- pkg/sfu/pacer/no_queue.go | 40 ++--- pkg/sfu/pacer/pacer.go | 8 +- pkg/sfu/pacer/pass_through.go | 4 +- 6 files changed, 174 insertions(+), 233 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2455fc8c5..32640524b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -132,6 +132,9 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, } + + dummyAbsSendTimeExt, _ = rtp.NewAbsSendTimeExtension(mono.Now()).Marshal() + dummyTransportCCExt, _ = rtp.TransportCCExtension{TransportSequence: 12345}.Marshal() ) // ------------------------------------------------------------------- @@ -865,32 +868,26 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } payload = payload[:len(tp.codecBytes)+n] - hdr, err := d.getTranslatedRTPHeader(extPkt, &tp) - if err != nil { - d.params.Logger.Errorw("could not get translated RTP header", err) - PacketFactory.Put(poolEntity) - return err + // translate RTP header + hdr := &rtp.Header{ + Version: extPkt.Packet.Version, + Padding: extPkt.Packet.Padding, + PayloadType: d.getTranslatedPayloadType(extPkt.Packet.PayloadType), + SequenceNumber: uint16(tp.rtp.extSequenceNumber), + Timestamp: uint32(tp.rtp.extTimestamp), + SSRC: d.ssrc, + } + if tp.marker { + hdr.Marker = tp.marker } - var extensions []pacer.ExtensionData + // add extensions if tp.ddBytes != nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.dependencyDescriptorExtID), - Payload: tp.ddBytes, - }, - ) + hdr.SetExtension(uint8(d.dependencyDescriptorExtID), tp.ddBytes) } if d.playoutDelayExtID != 0 && d.playoutDelay != nil { if val := d.playoutDelay.GetDelayExtension(hdr.SequenceNumber); val != nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.playoutDelayExtID), - Payload: val, - }, - ) + hdr.SetExtension(uint8(d.playoutDelayExtID), val) // NOTE: play out delay extension is not cached in sequencer, // i. e. they will not be added to retransmitted packet. @@ -912,20 +909,20 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, _, refSenderReport := d.forwarder.GetSenderReportParams() if refSenderReport != nil { actExtCopy := *extPkt.AbsCaptureTimeExt - if err = actExtCopy.Rewrite(rtpstats.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil { + if err = actExtCopy.Rewrite( + rtpstats.RTCPSenderReportPropagationDelay( + refSenderReport, + !d.params.DisableSenderReportPassThrough, + ), + ); err == nil { actBytes, err = actExtCopy.Marshal() if err == nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.absCaptureTimeExtID), - Payload: actBytes, - }, - ) + hdr.SetExtension(uint8(d.absCaptureTimeExtID), actBytes) } } } } + d.addDummyExtensions(hdr) if d.sequencer != nil { d.sequencer.push( @@ -942,22 +939,17 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { ) } - d.sendingPacket( - hdr, - len(payload), - &sendPacketMetadata{ - layer: layer, - packetTime: extPkt.Arrival, - extSequenceNumber: tp.rtp.extSequenceNumber, - extTimestamp: tp.rtp.extTimestamp, - isKeyFrame: extPkt.KeyFrame, - isOutOfOrder: extPkt.IsOutOfOrder, - tp: &tp, - }, - ) - d.pacer.Enqueue(pacer.Packet{ + d.updateStats(updateStatsParams{ + packetTime: extPkt.Arrival, + extSequenceNumber: tp.rtp.extSequenceNumber, + extTimestamp: tp.rtp.extTimestamp, + isOutOfOrder: extPkt.IsOutOfOrder, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), + marker: hdr.Marker, + }) + d.pacer.Enqueue(&pacer.Packet{ Header: hdr, - Extensions: extensions, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -965,6 +957,27 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { Pool: PacketFactory, PoolEntity: poolEntity, }) + + if extPkt.KeyFrame { + d.isNACKThrottled.Store(false) + d.rtpStats.UpdateKeyFrame(1) + d.params.Logger.Debugw( + "forwarded key frame", + "layer", layer, + "rtpsn", tp.rtp.extSequenceNumber, + "rtpts", tp.rtp.extTimestamp, + ) + } + + if tp.isSwitching { + d.postMaxLayerNotifierEvent("switching") + } + + if tp.isResuming { + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnResume(d) + } + } return nil } @@ -1026,7 +1039,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa bytesSent := 0 for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: true, Marker: false, @@ -1034,33 +1047,33 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload := make([]byte, RTPPaddingMaxPayloadSize) // last byte of padding has padding size including that byte payload[RTPPaddingMaxPayloadSize-1] = byte(RTPPaddingMaxPayloadSize) - d.sendingPacket( - &hdr, - len(payload), - &sendPacketMetadata{ - packetTime: mono.UnixNano(), - extSequenceNumber: snts[i].extSequenceNumber, - extTimestamp: snts[i].extTimestamp, - isPadding: true, - shouldDisableCounter: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + hdrSize := hdr.MarshalSize() + payloadSize := len(payload) + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: snts[i].extSequenceNumber, + extTimestamp: snts[i].extTimestamp, + headerSize: hdrSize, + payloadSize: payloadSize, + isPadding: true, + disableCounter: true, + }) + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, }) - bytesSent += hdr.MarshalSize() + len(payload) + bytesSent += hdrSize + payloadSize } // STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be updated in pacer callback @@ -1572,7 +1585,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: false, Marker: true, @@ -1580,8 +1593,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload, err := getBlankFrame(frameEndNeeded) if err != nil { @@ -1590,13 +1603,15 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan return } - d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{ + d.updateStats(updateStatsParams{ packetTime: mono.UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), }) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -1924,7 +1939,6 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] } - var extensions []pacer.ExtensionData if d.dependencyDescriptorExtID != 0 { var ddBytes []byte if len(epm.ddBytesSlice) != 0 { @@ -1932,39 +1946,24 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { } else { ddBytes = epm.ddBytes[:epm.ddBytesSize] } - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.dependencyDescriptorExtID), - Payload: ddBytes, - }, - ) + pkt.Header.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes) } if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.absCaptureTimeExtID), - Payload: epm.actBytes, - }, - ) + pkt.Header.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes) } + d.addDummyExtensions(&pkt.Header) - d.sendingPacket( - &pkt.Header, - len(payload), - &sendPacketMetadata{ - layer: int32(epm.layer), - packetTime: mono.UnixNano(), - extSequenceNumber: epm.extSequenceNumber, - extTimestamp: epm.extTimestamp, - isRTX: true, - isOutOfOrder: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: epm.extSequenceNumber, + extTimestamp: epm.extTimestamp, + isOutOfOrder: true, + headerSize: pkt.Header.MarshalSize(), + payloadSize: len(payload), + isRTX: true, + }) + d.pacer.Enqueue(&pacer.Packet{ Header: &pkt.Header, - Extensions: extensions, Payload: payload, IsRTX: true, AbsSendTimeExtID: uint8(d.absSendTimeExtID), @@ -1997,17 +1996,14 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { */ } -func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *TranslationParams) (*rtp.Header, error) { - hdr := extPkt.Packet.Header - hdr.PayloadType = d.getTranslatedPayloadType(hdr.PayloadType) - hdr.Timestamp = uint32(tp.rtp.extTimestamp) - hdr.SequenceNumber = uint16(tp.rtp.extSequenceNumber) - hdr.SSRC = d.ssrc - if tp.marker { - hdr.Marker = tp.marker +func (d *DownTrack) addDummyExtensions(hdr *rtp.Header) { + // add dummy extensions (actual ones will be filed by pacer) to get header size + if d.absSendTimeExtID != 0 { + hdr.SetExtension(uint8(d.absSendTimeExtID), dummyAbsSendTimeExt) + } + if d.transportWideExtID != 0 { + hdr.SetExtension(uint8(d.transportWideExtID), dummyTransportCCExt) } - - return &hdr, nil } func (d *DownTrack) getTranslatedPayloadType(src uint8) uint8 { @@ -2158,7 +2154,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { return } for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: false, Marker: true, @@ -2166,8 +2162,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload, err := d.getOpusBlankFrame(false) if err != nil { @@ -2175,19 +2171,17 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { return } - d.sendingPacket( - &hdr, - len(payload), - &sendPacketMetadata{ - packetTime: mono.UnixNano(), - extSequenceNumber: snts[i].extSequenceNumber, - extTimestamp: snts[i].extTimestamp, - // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only - isPadding: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: snts[i].extSequenceNumber, + extTimestamp: snts[i].extTimestamp, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), + // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only + isPadding: true, + }) + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -2219,27 +2213,26 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *livekit.RTCPSend d.rtpStats.MaybeAdjustFirstPacketTime(publisherSRData, tsOffset) } -type sendPacketMetadata struct { - layer int32 - packetTime int64 - extSequenceNumber uint64 - extTimestamp uint64 - isKeyFrame bool - isRTX bool - isOutOfOrder bool - isPadding bool - shouldDisableCounter bool - tp *TranslationParams +type updateStatsParams struct { + packetTime int64 + extSequenceNumber uint64 + extTimestamp uint64 + isOutOfOrder bool + headerSize int + payloadSize int + marker bool + isRTX bool + isPadding bool + disableCounter bool } -func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPacketMetadata) { - hdrSize := hdr.MarshalSize() - if !spmd.shouldDisableCounter { +func (d *DownTrack) updateStats(params updateStatsParams) { + if !params.disableCounter { // STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter - size := uint32(hdrSize + payloadSize) + size := uint32(params.headerSize + params.payloadSize) d.streamAllocatorBytesCounter.Add(size) /* STREAM-ALLOCATOR-DATA - if spmd.isRTX { + if params.isRTX { d.bytesRetransmitted.Add(size) } else { d.bytesSent.Add(size) @@ -2248,36 +2241,23 @@ func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPa } // update RTPStats - paddingSize := payloadSize - if spmd.isPadding { + payloadSize := params.payloadSize + paddingSize := params.payloadSize + if params.isPadding { payloadSize = 0 } else { paddingSize = 0 } - d.rtpStats.Update(spmd.packetTime, spmd.extSequenceNumber, spmd.extTimestamp, hdr.Marker, hdrSize, payloadSize, paddingSize, spmd.isOutOfOrder) - - if spmd.isKeyFrame { - d.isNACKThrottled.Store(false) - d.rtpStats.UpdateKeyFrame(1) - d.params.Logger.Debugw( - "forwarded key frame", - "layer", spmd.layer, - "rtpsn", spmd.extSequenceNumber, - "rtpts", spmd.extTimestamp, - ) - } - - if spmd.tp != nil { - if spmd.tp.isSwitching { - d.postMaxLayerNotifierEvent("switching") - } - - if spmd.tp.isResuming { - if sal := d.getStreamAllocatorListener(); sal != nil { - sal.OnResume(d) - } - } - } + d.rtpStats.Update( + params.packetTime, + params.extSequenceNumber, + params.extTimestamp, + params.marker, + params.headerSize, + payloadSize, + paddingSize, + params.isOutOfOrder, + ) } // ------------------------------------------------------------------------------- diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 13d083745..5ced2b987 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -51,7 +51,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) { } }() - err := b.writeRTPHeaderExtensions(p) + err := b.patchRTPHeaderExtensions(p) if err != nil { b.logger.Errorw("writing rtp header extensions err", err) return 0, err @@ -69,21 +69,8 @@ func (b *Base) SendPacket(p *Packet) (int, error) { return written, nil } -// writes RTP header extensions of track -func (b *Base) writeRTPHeaderExtensions(p *Packet) error { - // clear out extensions that may have been in the forwarded header - p.Header.Extension = false - p.Header.ExtensionProfile = 0 - p.Header.Extensions = []rtp.Extension{} - - for _, ext := range p.Extensions { - if ext.ID == 0 || len(ext.Payload) == 0 { - continue - } - - p.Header.SetExtension(ext.ID, ext.Payload) - } - +// patch just abs-send-time and transport-cc extensions if applicable +func (b *Base) patchRTPHeaderExtensions(p *Packet) error { sendingAt := mono.Now() if p.AbsSendTimeExtID != 0 { sendTime := rtp.NewAbsSendTimeExtension(sendingAt) diff --git a/pkg/sfu/pacer/leaky_bucket.go b/pkg/sfu/pacer/leaky_bucket.go index 90d8fa2ea..add76fd3a 100644 --- a/pkg/sfu/pacer/leaky_bucket.go +++ b/pkg/sfu/pacer/leaky_bucket.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/frostbyte73/core" "github.com/gammazero/deque" "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/protocol/logger" @@ -32,11 +33,11 @@ type LeakyBucket struct { logger logger.Logger - lock sync.RWMutex - packets deque.Deque[Packet] - interval time.Duration - bitrate int - isStopped bool + lock sync.RWMutex + packets deque.Deque[*Packet] + interval time.Duration + bitrate int + stop core.Fuse } func NewLeakyBucket(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE, interval time.Duration, bitrate int) *LeakyBucket { @@ -67,23 +68,13 @@ func (l *LeakyBucket) SetBitrate(bitrate int) { } func (l *LeakyBucket) Stop() { - l.lock.Lock() - if l.isStopped { - l.lock.Unlock() - return - } - - l.isStopped = true - l.lock.Unlock() + l.stop.Break() } -func (l *LeakyBucket) Enqueue(p Packet) { +func (l *LeakyBucket) Enqueue(p *Packet) { l.lock.Lock() - defer l.lock.Unlock() - - if !l.isStopped { - l.packets.PushBack(p) - } + l.packets.PushBack(p) + l.lock.Unlock() } func (l *LeakyBucket) sendWorker() { @@ -121,12 +112,11 @@ func (l *LeakyBucket) sendWorker() { } for { - l.lock.Lock() - if l.isStopped { - l.lock.Unlock() + if l.stop.IsBroken() { return } + l.lock.Lock() if l.packets.Len() == 0 { l.lock.Unlock() // allow overshoot in next interval with shortage in this interval @@ -137,7 +127,7 @@ func (l *LeakyBucket) sendWorker() { p := l.packets.PopFront() l.lock.Unlock() - written, _ := l.Base.SendPacket(&p) + written, _ := l.Base.SendPacket(p) toSendBytes -= written if toSendBytes < 0 { // overage, wait for next interval diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index fc3bfd4a4..86eadebb7 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -17,6 +17,7 @@ package pacer import ( "sync" + "github.com/frostbyte73/core" "github.com/gammazero/deque" "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/protocol/logger" @@ -27,10 +28,10 @@ type NoQueue struct { logger logger.Logger - lock sync.RWMutex - packets deque.Deque[Packet] - wake chan struct{} - isStopped bool + lock sync.RWMutex + packets deque.Deque[*Packet] + wake chan struct{} + stop core.Fuse } func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQueue { @@ -46,27 +47,17 @@ func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQ } func (n *NoQueue) Stop() { - n.lock.Lock() - if n.isStopped { - n.lock.Unlock() - return - } - - close(n.wake) - n.isStopped = true - n.lock.Unlock() + n.stop.Break() } -func (n *NoQueue) Enqueue(p Packet) { +func (n *NoQueue) Enqueue(p *Packet) { n.lock.Lock() - defer n.lock.Unlock() - n.packets.PushBack(p) - if n.packets.Len() == 1 && !n.isStopped { - select { - case n.wake <- struct{}{}: - default: - } + n.lock.Unlock() + + select { + case n.wake <- struct{}{}: + default: } } @@ -74,12 +65,11 @@ func (n *NoQueue) sendWorker() { for { <-n.wake for { - n.lock.Lock() - if n.isStopped { - n.lock.Unlock() + if n.stop.IsBroken() { return } + n.lock.Lock() if n.packets.Len() == 0 { n.lock.Unlock() break @@ -87,7 +77,7 @@ func (n *NoQueue) sendWorker() { p := n.packets.PopFront() n.lock.Unlock() - n.Base.SendPacket(&p) + n.Base.SendPacket(p) } } } diff --git a/pkg/sfu/pacer/pacer.go b/pkg/sfu/pacer/pacer.go index 75c9cca43..f1ec44b89 100644 --- a/pkg/sfu/pacer/pacer.go +++ b/pkg/sfu/pacer/pacer.go @@ -22,14 +22,8 @@ import ( "github.com/pion/webrtc/v3" ) -type ExtensionData struct { - ID uint8 - Payload []byte -} - type Packet struct { Header *rtp.Header - Extensions []ExtensionData Payload []byte IsRTX bool AbsSendTimeExtID uint8 @@ -40,7 +34,7 @@ type Packet struct { } type Pacer interface { - Enqueue(p Packet) + Enqueue(p *Packet) Stop() SetInterval(interval time.Duration) diff --git a/pkg/sfu/pacer/pass_through.go b/pkg/sfu/pacer/pass_through.go index fba06c792..62a4867c6 100644 --- a/pkg/sfu/pacer/pass_through.go +++ b/pkg/sfu/pacer/pass_through.go @@ -32,8 +32,8 @@ func NewPassThrough(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) func (p *PassThrough) Stop() { } -func (p *PassThrough) Enqueue(pkt Packet) { - p.Base.SendPacket(&pkt) +func (p *PassThrough) Enqueue(pkt *Packet) { + p.Base.SendPacket(pkt) } // ------------------------------------------------