// Copyright 2023 LiveKit, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package buffer import ( "encoding/binary" "errors" "io" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v4" "github.com/livekit/livekit-server/pkg/sfu/flexfec" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil/pkg/bucket" "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils/mono" ) const ( rtcpReceiverReportDelta = 1e9 // fecReportInterval bounds how often a buffer logs a FlexFEC recovery summary. fecReportInterval = 5e9 InitPacketBufferSizeVideo = 300 InitPacketBufferSizeAudio = 70 ) var ( errInvalidCodec = errors.New("invalid codec") ) var _ BufferProvider = (*Buffer)(nil) type pendingPacket struct { arrivalTime int64 packet []byte } // Buffer contains all packets type Buffer struct { *BufferBase pPackets []pendingPacket lastReportAt int64 isBound bool twcc *twcc.Responder twccExtID uint8 enableAudioLossProxying bool lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only lastPacketRead int // callbacks onClose func() onRtcpFeedback func([]rtcp.Packet) onFinalRtpStats func(*livekit.RTPStats) onNotifyRTX func(uint32, uint32, string) primaryBufferForRTX *Buffer rtxPktBuf []byte // FlexFEC (publisher -> SFU recovery). // primaryBufferForFEC is set on a repair (FEC) buffer and points at the source // buffer it protects. fecDecoder lives on the source buffer and reconstructs lost // source packets from the repair stream. fecPktBuf is scratch for re-marshaling // recovered packets before injecting them back into the source buffer. primaryBufferForFEC *Buffer fecDecoder *flexfec.Decoder fecPktBuf []byte // FlexFEC observability. Counters are cumulative for the lifetime of the source // buffer; the *Reported fields snapshot the last logged values so the periodic // summary can report deltas. fecPacketsReceived uint64 fecPacketsRecovered uint64 fecReportedAt int64 fecReportedReceived uint64 fecReportedRecovered uint64 // Debug-only simulated publisher ("robot") uplink impairment; nil unless the // LK_UPLINK_* env vars are set. See buffer_impair.go. impair *uplinkImpair } func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer { b := &Buffer{} b.BufferBase = NewBufferBase(BufferBaseParams{ SSRC: ssrc, MaxVideoPkts: maxVideoPkts, MaxAudioPkts: maxAudioPkts, LoggerComponents: []string{sutils.ComponentPub, sutils.ComponentSFU}, SendPLI: b.sendPLI, IsReportingEnabled: true, }) b.initUplinkImpair() return b } func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) { b.Lock() defer b.Unlock() b.twcc = twcc b.twccExtID = extID } func (b *Buffer) SetAudioLossProxying(enable bool) { b.Lock() defer b.Unlock() b.enableAudioLossProxying = enable } func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrates int) error { b.Lock() if b.isBound { b.Unlock() return nil } if err := b.BufferBase.BindLocked(params, codec, bitrates); err != nil { b.Unlock() return err } b.lastReportAt = mono.UnixNano() if len(b.pPackets) != 0 { b.logger.Debugw("releasing queued packets on bind", "count", len(b.pPackets)) } var rtcpPackets []rtcp.Packet for _, pp := range b.pPackets { rtcpPackets = append(rtcpPackets, b.calc(pp.packet, nil, pp.arrivalTime, true, false)...) } b.pPackets = nil b.isBound = true b.Unlock() if len(rtcpPackets) != 0 { if cb := b.getOnRtcpFeedback(); cb != nil { cb(rtcpPackets) } } return nil } // Write adds an RTP Packet, ordering is not guaranteed, newer packets may arrive later // Write is the inbound entry point for the publisher's RTP (media, RTX, and FEC SSRCs). // It applies the optional debug uplink impairment (loss + one-way delay) before handing // the packet to writeNow; both are no-ops unless the LK_UPLINK_* env vars are set. func (b *Buffer) Write(pkt []byte) (int, error) { if b.impair != nil { if b.impair.dropInbound() { b.logImpairedDrop(pkt) // Packet "lost" on the simulated 5G uplink; report success so pion's read // loop is unaffected. return len(pkt), nil } if b.impair.delayInbound(b, pkt) { return len(pkt), nil } } return b.writeNow(pkt) } func (b *Buffer) writeNow(pkt []byte) (n int, err error) { var rtpPacket rtp.Packet err = rtpPacket.Unmarshal(pkt) if err != nil { return } b.logImpairedMarker(&rtpPacket) b.Lock() if b.BufferBase.IsClosed() { b.Unlock() err = io.EOF return } now := mono.UnixNano() if b.twcc != nil && b.twccExtID != 0 { if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil { b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker) } } // libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since // the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816 if rtpPacket.SSRC == 0 { b.Unlock() return } // handle RTX packet if pb := b.primaryBufferForRTX; pb != nil { b.Unlock() // skip padding only packets if rtpPacket.Padding && len(rtpPacket.Payload) == 0 { return } pb.writeRTX(&rtpPacket, now) return } // handle FlexFEC repair packet if pb := b.primaryBufferForFEC; pb != nil { b.Unlock() pb.writeFEC(&rtpPacket, now) return } if !b.isBound { packet := make([]byte, len(pkt)) copy(packet, pkt) if len(b.pPackets) == 0 { b.logger.Debugw("received first packet") } startIdx := 0 overflow := len(b.pPackets) - max(b.BufferBase.MaxVideoPkts(), b.BufferBase.MaxAudioPkts()) if overflow > 0 { startIdx = overflow } b.pPackets = append(b.pPackets[startIdx:], pendingPacket{ packet: packet, arrivalTime: now, }) b.BufferBase.NotifyRead() b.Unlock() return } rtcpPackets := b.calc(pkt, &rtpPacket, now, false, false) // feed received source packets to the FlexFEC decoder so it can reconstruct // previously lost packets, and inject any recoveries back into this buffer. if b.fecDecoder != nil { b.injectRecoveredLocked(b.fecDecoder.Decode(rtpPacket), now) } b.Unlock() if len(rtcpPackets) != 0 { b.emitRTCPFeedback(rtcpPackets) } return } // emitRTCPFeedback delivers SFU->publisher feedback (NACK/RR/etc). When the debug uplink // delay is set it defers delivery by the one-way delay so a NACK->RTX recovery costs a // full round trip; otherwise it fires immediately. func (b *Buffer) emitRTCPFeedback(rtcpPackets []rtcp.Packet) { if b.impair != nil && b.impair.delayFeedback(b, rtcpPackets) { return } if cb := b.getOnRtcpFeedback(); cb != nil { cb(rtcpPackets) } } func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) { b.Lock() b.primaryBufferForRTX = primaryBuffer pkts := b.pPackets b.pPackets = nil b.Unlock() for _, pp := range pkts { var rtpPacket rtp.Packet err := rtpPacket.Unmarshal(pp.packet) if err != nil { continue } if rtpPacket.Padding && len(rtpPacket.Payload) == 0 { continue } primaryBuffer.writeRTX(&rtpPacket, pp.arrivalTime) } } // SetPrimaryBufferForFEC associates this (repair/FEC) buffer with the source buffer it // protects. It installs a FlexFEC decoder on the source buffer and replays any FEC // packets that arrived before the association was known. func (b *Buffer) SetPrimaryBufferForFEC(primaryBuffer *Buffer) { primaryBuffer.enableFECDecoder(b.BufferBase.SSRC()) b.Lock() b.primaryBufferForFEC = primaryBuffer pkts := b.pPackets b.pPackets = nil b.Unlock() for _, pp := range pkts { var rtpPacket rtp.Packet if err := rtpPacket.Unmarshal(pp.packet); err != nil { continue } primaryBuffer.writeFEC(&rtpPacket, pp.arrivalTime) } } // enableFECDecoder installs a FlexFEC decoder on this (source) buffer for the given // repair SSRC. Safe to call before the buffer is bound. func (b *Buffer) enableFECDecoder(fecSSRC uint32) { b.Lock() defer b.Unlock() if b.fecDecoder == nil { b.fecDecoder = flexfec.NewDecoder(fecSSRC, b.BufferBase.SSRC()) b.logger.Infow("flexfec decoder enabled", "fecSSRC", fecSSRC, "mediaSSRC", b.BufferBase.SSRC()) } } // writeFEC feeds a received FlexFEC repair packet to this (source) buffer's decoder and // injects any recovered source packets back into the buffer. Called on the source buffer. func (b *Buffer) writeFEC(fecPkt *rtp.Packet, arrivalTime int64) { b.Lock() defer b.Unlock() if b.fecDecoder == nil { return } b.fecPacketsReceived++ b.injectRecoveredLocked(b.fecDecoder.Decode(*fecPkt), arrivalTime) b.maybeLogFECStatsLocked(arrivalTime) } // maybeLogFECStatsLocked emits a periodic FlexFEC recovery summary at most once per // fecReportInterval, and only when there has been FEC activity since the last summary. // b.Lock must be held. func (b *Buffer) maybeLogFECStatsLocked(now int64) { if b.fecReportedAt == 0 { b.fecReportedAt = now b.fecReportedReceived = b.fecPacketsReceived b.fecReportedRecovered = b.fecPacketsRecovered return } if now-b.fecReportedAt < fecReportInterval { return } receivedDelta := b.fecPacketsReceived - b.fecReportedReceived recoveredDelta := b.fecPacketsRecovered - b.fecReportedRecovered b.fecReportedAt = now b.fecReportedReceived = b.fecPacketsReceived b.fecReportedRecovered = b.fecPacketsRecovered if receivedDelta == 0 && recoveredDelta == 0 { return } b.logger.Infow( "flexfec recovery stats", "mediaSSRC", b.BufferBase.SSRC(), "fecPacketsReceived", b.fecPacketsReceived, "fecPacketsReceivedDelta", receivedDelta, "packetsRecovered", b.fecPacketsRecovered, "packetsRecoveredDelta", recoveredDelta, ) } // injectRecoveredLocked re-injects FlexFEC-recovered source packets into the buffer as // repaired packets (treated like RTX repairs, so they become NACK/forward eligible). // b.Lock must be held. func (b *Buffer) injectRecoveredLocked(recovered []rtp.Packet, arrivalTime int64) { if len(recovered) == 0 || !b.isBound { return } if b.fecPktBuf == nil { b.fecPktBuf = make([]byte, bucket.RTPMaxPktSize) } for i := range recovered { pkt := recovered[i] n, err := pkt.MarshalTo(b.fecPktBuf) if err != nil { b.logger.Errorw("could not marshal flexfec recovered packet", err, "sn", pkt.SequenceNumber) continue } b.calc(b.fecPktBuf[:n], &pkt, arrivalTime, false, true) b.fecPacketsRecovered++ } } func (b *Buffer) NotifyRTX(ssrc uint32, repairSSRC uint32, rsid string) { if onNotifyRTX := b.getOnNotifyRTX(); onNotifyRTX != nil { onNotifyRTX(ssrc, repairSSRC, rsid) } } func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) { b.Lock() defer b.Unlock() if !b.isBound { return } if rtxPkt.PayloadType != b.rtxPayloadType { b.logger.Debugw("unexpected rtx payload type", "expected", b.rtxPayloadType, "actual", rtxPkt.PayloadType) return } if b.rtxPktBuf == nil { b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize) } if len(rtxPkt.Payload) < 2 { b.logger.Warnw("rtx payload too short", nil, "size", len(rtxPkt.Payload)) return } repairedPkt := *rtxPkt repairedPkt.PayloadType = b.payloadType repairedPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2]) repairedPkt.SSRC = b.BufferBase.SSRC() repairedPkt.Payload = rtxPkt.Payload[2:] n, err := repairedPkt.MarshalTo(b.rtxPktBuf) if err != nil { b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.BufferBase.SSRC(), "sn", repairedPkt.SequenceNumber) return } b.calc(b.rtxPktBuf[:n], &repairedPkt, arrivalTime, false, true) } func (b *Buffer) Read(buff []byte) (n int, err error) { b.Lock() for { if b.BufferBase.IsClosed() { b.Unlock() return 0, io.EOF } if b.pPackets != nil && len(b.pPackets) > b.lastPacketRead { if len(buff) < len(b.pPackets[b.lastPacketRead].packet) { b.Unlock() return 0, bucket.ErrBufferTooSmall } n = copy(buff, b.pPackets[b.lastPacketRead].packet) b.lastPacketRead++ b.Unlock() return } b.BufferBase.WaitRead() } } func (b *Buffer) Close() error { stats, err := b.BufferBase.CloseWithReason("close") if err != nil { return err } if stats != nil { if cb := b.getOnFinalRtpStats(); cb != nil { cb(stats) } } if cb := b.getOnClose(); cb != nil { cb() } return nil } func (b *Buffer) OnClose(fn func()) { b.Lock() b.onClose = fn b.Unlock() } func (b *Buffer) getOnClose() func() { b.RLock() defer b.RUnlock() return b.onClose } func (b *Buffer) sendPLI() { ssrc := b.BufferBase.SSRC() if ssrc == 0 { return } b.logger.Debugw("send pli", "mediaSSRC", ssrc) pli := []rtcp.Packet{ &rtcp.PictureLossIndication{ SenderSSRC: ssrc, MediaSSRC: ssrc, }, } if cb := b.getOnRtcpFeedback(); cb != nil { cb(pli) } } func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, isRTX bool) []rtcp.Packet { b.BufferBase.HandleIncomingPacketLocked( rawPkt, rtpPacket, arrivalTime, isBuffered, isRTX, nil, 0, ) return b.getRTCPPackets(arrivalTime) } func (b *Buffer) getRTCPPackets(arrivalTime int64) []rtcp.Packet { var pkts []rtcp.Packet if nackPkt := b.getNACKPacket(); nackPkt != nil { pkts = append(pkts, nackPkt) } if receiverReport := b.getRTCPReceiverReport(arrivalTime); receiverReport != nil { pkts = append(pkts, receiverReport) } return pkts } func (b *Buffer) getNACKPacket() *rtcp.TransportLayerNack { if nacks := b.BufferBase.GetNACKPairsLocked(); len(nacks) > 0 { ssrc := b.BufferBase.SSRC() return &rtcp.TransportLayerNack{ SenderSSRC: ssrc, MediaSSRC: ssrc, Nacks: nacks, } } return nil } func (b *Buffer) getRTCPReceiverReport(arrivalTime int64) *rtcp.ReceiverReport { if arrivalTime-b.lastReportAt < rtcpReceiverReportDelta { return nil } b.lastReportAt = arrivalTime proxyLoss := b.lastFractionLostToReport if b.codecType == webrtc.RTPCodecTypeAudio && !b.enableAudioLossProxying { proxyLoss = 0 } if receptionReport := b.BufferBase.GetRtcpReceptionReportLocked(proxyLoss); receptionReport != nil { return &rtcp.ReceiverReport{ SSRC: b.BufferBase.SSRC(), Reports: []rtcp.ReceptionReport{*receptionReport}, } } return nil } func (b *Buffer) SetLastFractionLostReport(lost uint8) { b.Lock() defer b.Unlock() b.lastFractionLostToReport = lost } func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { b.Lock() b.onRtcpFeedback = fn b.Unlock() } func (b *Buffer) getOnRtcpFeedback() func(fb []rtcp.Packet) { b.RLock() defer b.RUnlock() return b.onRtcpFeedback } func (b *Buffer) OnFinalRtpStats(fn func(*livekit.RTPStats)) { b.Lock() b.onFinalRtpStats = fn b.Unlock() } func (b *Buffer) getOnFinalRtpStats() func(*livekit.RTPStats) { b.RLock() defer b.RUnlock() return b.onFinalRtpStats } func (b *Buffer) OnNotifyRTX(fn func(ssrc uint32, repairSSRC uint32, rsid string)) { b.Lock() b.onNotifyRTX = fn b.Unlock() } func (b *Buffer) getOnNotifyRTX() func(ssrc uint32, repairSSRC uint32, rsid string) { b.RLock() defer b.RUnlock() return b.onNotifyRTX }