diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2455fc8c5..32640524b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -132,6 +132,9 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, } + + dummyAbsSendTimeExt, _ = rtp.NewAbsSendTimeExtension(mono.Now()).Marshal() + dummyTransportCCExt, _ = rtp.TransportCCExtension{TransportSequence: 12345}.Marshal() ) // ------------------------------------------------------------------- @@ -865,32 +868,26 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } payload = payload[:len(tp.codecBytes)+n] - hdr, err := d.getTranslatedRTPHeader(extPkt, &tp) - if err != nil { - d.params.Logger.Errorw("could not get translated RTP header", err) - PacketFactory.Put(poolEntity) - return err + // translate RTP header + hdr := &rtp.Header{ + Version: extPkt.Packet.Version, + Padding: extPkt.Packet.Padding, + PayloadType: d.getTranslatedPayloadType(extPkt.Packet.PayloadType), + SequenceNumber: uint16(tp.rtp.extSequenceNumber), + Timestamp: uint32(tp.rtp.extTimestamp), + SSRC: d.ssrc, + } + if tp.marker { + hdr.Marker = tp.marker } - var extensions []pacer.ExtensionData + // add extensions if tp.ddBytes != nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.dependencyDescriptorExtID), - Payload: tp.ddBytes, - }, - ) + hdr.SetExtension(uint8(d.dependencyDescriptorExtID), tp.ddBytes) } if d.playoutDelayExtID != 0 && d.playoutDelay != nil { if val := d.playoutDelay.GetDelayExtension(hdr.SequenceNumber); val != nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.playoutDelayExtID), - Payload: val, - }, - ) + hdr.SetExtension(uint8(d.playoutDelayExtID), val) // NOTE: play out delay extension is not cached in sequencer, // i. e. they will not be added to retransmitted packet. @@ -912,20 +909,20 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, _, refSenderReport := d.forwarder.GetSenderReportParams() if refSenderReport != nil { actExtCopy := *extPkt.AbsCaptureTimeExt - if err = actExtCopy.Rewrite(rtpstats.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil { + if err = actExtCopy.Rewrite( + rtpstats.RTCPSenderReportPropagationDelay( + refSenderReport, + !d.params.DisableSenderReportPassThrough, + ), + ); err == nil { actBytes, err = actExtCopy.Marshal() if err == nil { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.absCaptureTimeExtID), - Payload: actBytes, - }, - ) + hdr.SetExtension(uint8(d.absCaptureTimeExtID), actBytes) } } } } + d.addDummyExtensions(hdr) if d.sequencer != nil { d.sequencer.push( @@ -942,22 +939,17 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { ) } - d.sendingPacket( - hdr, - len(payload), - &sendPacketMetadata{ - layer: layer, - packetTime: extPkt.Arrival, - extSequenceNumber: tp.rtp.extSequenceNumber, - extTimestamp: tp.rtp.extTimestamp, - isKeyFrame: extPkt.KeyFrame, - isOutOfOrder: extPkt.IsOutOfOrder, - tp: &tp, - }, - ) - d.pacer.Enqueue(pacer.Packet{ + d.updateStats(updateStatsParams{ + packetTime: extPkt.Arrival, + extSequenceNumber: tp.rtp.extSequenceNumber, + extTimestamp: tp.rtp.extTimestamp, + isOutOfOrder: extPkt.IsOutOfOrder, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), + marker: hdr.Marker, + }) + d.pacer.Enqueue(&pacer.Packet{ Header: hdr, - Extensions: extensions, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -965,6 +957,27 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { Pool: PacketFactory, PoolEntity: poolEntity, }) + + if extPkt.KeyFrame { + d.isNACKThrottled.Store(false) + d.rtpStats.UpdateKeyFrame(1) + d.params.Logger.Debugw( + "forwarded key frame", + "layer", layer, + "rtpsn", tp.rtp.extSequenceNumber, + "rtpts", tp.rtp.extTimestamp, + ) + } + + if tp.isSwitching { + d.postMaxLayerNotifierEvent("switching") + } + + if tp.isResuming { + if sal := d.getStreamAllocatorListener(); sal != nil { + sal.OnResume(d) + } + } return nil } @@ -1026,7 +1039,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa bytesSent := 0 for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: true, Marker: false, @@ -1034,33 +1047,33 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload := make([]byte, RTPPaddingMaxPayloadSize) // last byte of padding has padding size including that byte payload[RTPPaddingMaxPayloadSize-1] = byte(RTPPaddingMaxPayloadSize) - d.sendingPacket( - &hdr, - len(payload), - &sendPacketMetadata{ - packetTime: mono.UnixNano(), - extSequenceNumber: snts[i].extSequenceNumber, - extTimestamp: snts[i].extTimestamp, - isPadding: true, - shouldDisableCounter: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + hdrSize := hdr.MarshalSize() + payloadSize := len(payload) + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: snts[i].extSequenceNumber, + extTimestamp: snts[i].extTimestamp, + headerSize: hdrSize, + payloadSize: payloadSize, + isPadding: true, + disableCounter: true, + }) + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, }) - bytesSent += hdr.MarshalSize() + len(payload) + bytesSent += hdrSize + payloadSize } // STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be updated in pacer callback @@ -1572,7 +1585,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: false, Marker: true, @@ -1580,8 +1593,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload, err := getBlankFrame(frameEndNeeded) if err != nil { @@ -1590,13 +1603,15 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan return } - d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{ + d.updateStats(updateStatsParams{ packetTime: mono.UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), }) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -1924,7 +1939,6 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)] } - var extensions []pacer.ExtensionData if d.dependencyDescriptorExtID != 0 { var ddBytes []byte if len(epm.ddBytesSlice) != 0 { @@ -1932,39 +1946,24 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { } else { ddBytes = epm.ddBytes[:epm.ddBytesSize] } - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.dependencyDescriptorExtID), - Payload: ddBytes, - }, - ) + pkt.Header.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes) } if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 { - extensions = append( - extensions, - pacer.ExtensionData{ - ID: uint8(d.absCaptureTimeExtID), - Payload: epm.actBytes, - }, - ) + pkt.Header.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes) } + d.addDummyExtensions(&pkt.Header) - d.sendingPacket( - &pkt.Header, - len(payload), - &sendPacketMetadata{ - layer: int32(epm.layer), - packetTime: mono.UnixNano(), - extSequenceNumber: epm.extSequenceNumber, - extTimestamp: epm.extTimestamp, - isRTX: true, - isOutOfOrder: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: epm.extSequenceNumber, + extTimestamp: epm.extTimestamp, + isOutOfOrder: true, + headerSize: pkt.Header.MarshalSize(), + payloadSize: len(payload), + isRTX: true, + }) + d.pacer.Enqueue(&pacer.Packet{ Header: &pkt.Header, - Extensions: extensions, Payload: payload, IsRTX: true, AbsSendTimeExtID: uint8(d.absSendTimeExtID), @@ -1997,17 +1996,14 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { */ } -func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *TranslationParams) (*rtp.Header, error) { - hdr := extPkt.Packet.Header - hdr.PayloadType = d.getTranslatedPayloadType(hdr.PayloadType) - hdr.Timestamp = uint32(tp.rtp.extTimestamp) - hdr.SequenceNumber = uint16(tp.rtp.extSequenceNumber) - hdr.SSRC = d.ssrc - if tp.marker { - hdr.Marker = tp.marker +func (d *DownTrack) addDummyExtensions(hdr *rtp.Header) { + // add dummy extensions (actual ones will be filed by pacer) to get header size + if d.absSendTimeExtID != 0 { + hdr.SetExtension(uint8(d.absSendTimeExtID), dummyAbsSendTimeExt) + } + if d.transportWideExtID != 0 { + hdr.SetExtension(uint8(d.transportWideExtID), dummyTransportCCExt) } - - return &hdr, nil } func (d *DownTrack) getTranslatedPayloadType(src uint8) uint8 { @@ -2158,7 +2154,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { return } for i := 0; i < len(snts); i++ { - hdr := rtp.Header{ + hdr := &rtp.Header{ Version: 2, Padding: false, Marker: true, @@ -2166,8 +2162,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { SequenceNumber: uint16(snts[i].extSequenceNumber), Timestamp: uint32(snts[i].extTimestamp), SSRC: d.ssrc, - CSRC: []uint32{}, } + d.addDummyExtensions(hdr) payload, err := d.getOpusBlankFrame(false) if err != nil { @@ -2175,19 +2171,17 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { return } - d.sendingPacket( - &hdr, - len(payload), - &sendPacketMetadata{ - packetTime: mono.UnixNano(), - extSequenceNumber: snts[i].extSequenceNumber, - extTimestamp: snts[i].extTimestamp, - // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only - isPadding: true, - }, - ) - d.pacer.Enqueue(pacer.Packet{ - Header: &hdr, + d.updateStats(updateStatsParams{ + packetTime: mono.UnixNano(), + extSequenceNumber: snts[i].extSequenceNumber, + extTimestamp: snts[i].extTimestamp, + headerSize: hdr.MarshalSize(), + payloadSize: len(payload), + // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only + isPadding: true, + }) + d.pacer.Enqueue(&pacer.Packet{ + Header: hdr, Payload: payload, AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), @@ -2219,27 +2213,26 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *livekit.RTCPSend d.rtpStats.MaybeAdjustFirstPacketTime(publisherSRData, tsOffset) } -type sendPacketMetadata struct { - layer int32 - packetTime int64 - extSequenceNumber uint64 - extTimestamp uint64 - isKeyFrame bool - isRTX bool - isOutOfOrder bool - isPadding bool - shouldDisableCounter bool - tp *TranslationParams +type updateStatsParams struct { + packetTime int64 + extSequenceNumber uint64 + extTimestamp uint64 + isOutOfOrder bool + headerSize int + payloadSize int + marker bool + isRTX bool + isPadding bool + disableCounter bool } -func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPacketMetadata) { - hdrSize := hdr.MarshalSize() - if !spmd.shouldDisableCounter { +func (d *DownTrack) updateStats(params updateStatsParams) { + if !params.disableCounter { // STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter - size := uint32(hdrSize + payloadSize) + size := uint32(params.headerSize + params.payloadSize) d.streamAllocatorBytesCounter.Add(size) /* STREAM-ALLOCATOR-DATA - if spmd.isRTX { + if params.isRTX { d.bytesRetransmitted.Add(size) } else { d.bytesSent.Add(size) @@ -2248,36 +2241,23 @@ func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPa } // update RTPStats - paddingSize := payloadSize - if spmd.isPadding { + payloadSize := params.payloadSize + paddingSize := params.payloadSize + if params.isPadding { payloadSize = 0 } else { paddingSize = 0 } - d.rtpStats.Update(spmd.packetTime, spmd.extSequenceNumber, spmd.extTimestamp, hdr.Marker, hdrSize, payloadSize, paddingSize, spmd.isOutOfOrder) - - if spmd.isKeyFrame { - d.isNACKThrottled.Store(false) - d.rtpStats.UpdateKeyFrame(1) - d.params.Logger.Debugw( - "forwarded key frame", - "layer", spmd.layer, - "rtpsn", spmd.extSequenceNumber, - "rtpts", spmd.extTimestamp, - ) - } - - if spmd.tp != nil { - if spmd.tp.isSwitching { - d.postMaxLayerNotifierEvent("switching") - } - - if spmd.tp.isResuming { - if sal := d.getStreamAllocatorListener(); sal != nil { - sal.OnResume(d) - } - } - } + d.rtpStats.Update( + params.packetTime, + params.extSequenceNumber, + params.extTimestamp, + params.marker, + params.headerSize, + payloadSize, + paddingSize, + params.isOutOfOrder, + ) } // ------------------------------------------------------------------------------- diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 13d083745..5ced2b987 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -51,7 +51,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) { } }() - err := b.writeRTPHeaderExtensions(p) + err := b.patchRTPHeaderExtensions(p) if err != nil { b.logger.Errorw("writing rtp header extensions err", err) return 0, err @@ -69,21 +69,8 @@ func (b *Base) SendPacket(p *Packet) (int, error) { return written, nil } -// writes RTP header extensions of track -func (b *Base) writeRTPHeaderExtensions(p *Packet) error { - // clear out extensions that may have been in the forwarded header - p.Header.Extension = false - p.Header.ExtensionProfile = 0 - p.Header.Extensions = []rtp.Extension{} - - for _, ext := range p.Extensions { - if ext.ID == 0 || len(ext.Payload) == 0 { - continue - } - - p.Header.SetExtension(ext.ID, ext.Payload) - } - +// patch just abs-send-time and transport-cc extensions if applicable +func (b *Base) patchRTPHeaderExtensions(p *Packet) error { sendingAt := mono.Now() if p.AbsSendTimeExtID != 0 { sendTime := rtp.NewAbsSendTimeExtension(sendingAt) diff --git a/pkg/sfu/pacer/leaky_bucket.go b/pkg/sfu/pacer/leaky_bucket.go index 90d8fa2ea..add76fd3a 100644 --- a/pkg/sfu/pacer/leaky_bucket.go +++ b/pkg/sfu/pacer/leaky_bucket.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/frostbyte73/core" "github.com/gammazero/deque" "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/protocol/logger" @@ -32,11 +33,11 @@ type LeakyBucket struct { logger logger.Logger - lock sync.RWMutex - packets deque.Deque[Packet] - interval time.Duration - bitrate int - isStopped bool + lock sync.RWMutex + packets deque.Deque[*Packet] + interval time.Duration + bitrate int + stop core.Fuse } func NewLeakyBucket(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE, interval time.Duration, bitrate int) *LeakyBucket { @@ -67,23 +68,13 @@ func (l *LeakyBucket) SetBitrate(bitrate int) { } func (l *LeakyBucket) Stop() { - l.lock.Lock() - if l.isStopped { - l.lock.Unlock() - return - } - - l.isStopped = true - l.lock.Unlock() + l.stop.Break() } -func (l *LeakyBucket) Enqueue(p Packet) { +func (l *LeakyBucket) Enqueue(p *Packet) { l.lock.Lock() - defer l.lock.Unlock() - - if !l.isStopped { - l.packets.PushBack(p) - } + l.packets.PushBack(p) + l.lock.Unlock() } func (l *LeakyBucket) sendWorker() { @@ -121,12 +112,11 @@ func (l *LeakyBucket) sendWorker() { } for { - l.lock.Lock() - if l.isStopped { - l.lock.Unlock() + if l.stop.IsBroken() { return } + l.lock.Lock() if l.packets.Len() == 0 { l.lock.Unlock() // allow overshoot in next interval with shortage in this interval @@ -137,7 +127,7 @@ func (l *LeakyBucket) sendWorker() { p := l.packets.PopFront() l.lock.Unlock() - written, _ := l.Base.SendPacket(&p) + written, _ := l.Base.SendPacket(p) toSendBytes -= written if toSendBytes < 0 { // overage, wait for next interval diff --git a/pkg/sfu/pacer/no_queue.go b/pkg/sfu/pacer/no_queue.go index fc3bfd4a4..86eadebb7 100644 --- a/pkg/sfu/pacer/no_queue.go +++ b/pkg/sfu/pacer/no_queue.go @@ -17,6 +17,7 @@ package pacer import ( "sync" + "github.com/frostbyte73/core" "github.com/gammazero/deque" "github.com/livekit/livekit-server/pkg/sfu/sendsidebwe" "github.com/livekit/protocol/logger" @@ -27,10 +28,10 @@ type NoQueue struct { logger logger.Logger - lock sync.RWMutex - packets deque.Deque[Packet] - wake chan struct{} - isStopped bool + lock sync.RWMutex + packets deque.Deque[*Packet] + wake chan struct{} + stop core.Fuse } func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQueue { @@ -46,27 +47,17 @@ func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQ } func (n *NoQueue) Stop() { - n.lock.Lock() - if n.isStopped { - n.lock.Unlock() - return - } - - close(n.wake) - n.isStopped = true - n.lock.Unlock() + n.stop.Break() } -func (n *NoQueue) Enqueue(p Packet) { +func (n *NoQueue) Enqueue(p *Packet) { n.lock.Lock() - defer n.lock.Unlock() - n.packets.PushBack(p) - if n.packets.Len() == 1 && !n.isStopped { - select { - case n.wake <- struct{}{}: - default: - } + n.lock.Unlock() + + select { + case n.wake <- struct{}{}: + default: } } @@ -74,12 +65,11 @@ func (n *NoQueue) sendWorker() { for { <-n.wake for { - n.lock.Lock() - if n.isStopped { - n.lock.Unlock() + if n.stop.IsBroken() { return } + n.lock.Lock() if n.packets.Len() == 0 { n.lock.Unlock() break @@ -87,7 +77,7 @@ func (n *NoQueue) sendWorker() { p := n.packets.PopFront() n.lock.Unlock() - n.Base.SendPacket(&p) + n.Base.SendPacket(p) } } } diff --git a/pkg/sfu/pacer/pacer.go b/pkg/sfu/pacer/pacer.go index 75c9cca43..f1ec44b89 100644 --- a/pkg/sfu/pacer/pacer.go +++ b/pkg/sfu/pacer/pacer.go @@ -22,14 +22,8 @@ import ( "github.com/pion/webrtc/v3" ) -type ExtensionData struct { - ID uint8 - Payload []byte -} - type Packet struct { Header *rtp.Header - Extensions []ExtensionData Payload []byte IsRTX bool AbsSendTimeExtID uint8 @@ -40,7 +34,7 @@ type Packet struct { } type Pacer interface { - Enqueue(p Packet) + Enqueue(p *Packet) Stop() SetInterval(interval time.Duration) diff --git a/pkg/sfu/pacer/pass_through.go b/pkg/sfu/pacer/pass_through.go index fba06c792..62a4867c6 100644 --- a/pkg/sfu/pacer/pass_through.go +++ b/pkg/sfu/pacer/pass_through.go @@ -32,8 +32,8 @@ func NewPassThrough(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) func (p *PassThrough) Stop() { } -func (p *PassThrough) Enqueue(pkt Packet) { - p.Base.SendPacket(&pkt) +func (p *PassThrough) Enqueue(pkt *Packet) { + p.Base.SendPacket(pkt) } // ------------------------------------------------