Files
livekit/pkg/sfu/buffer/buffer.go
T
2026-06-06 10:15:47 -07:00

631 lines
15 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"encoding/binary"
"errors"
"io"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"github.com/livekit/livekit-server/pkg/sfu/flexfec"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil/pkg/bucket"
"github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils/mono"
)
const (
rtcpReceiverReportDelta = 1e9
// fecReportInterval bounds how often a buffer logs a FlexFEC recovery summary.
fecReportInterval = 5e9
InitPacketBufferSizeVideo = 300
InitPacketBufferSizeAudio = 70
)
var (
errInvalidCodec = errors.New("invalid codec")
)
var _ BufferProvider = (*Buffer)(nil)
type pendingPacket struct {
arrivalTime int64
packet []byte
}
// Buffer contains all packets
type Buffer struct {
*BufferBase
pPackets []pendingPacket
lastReportAt int64
isBound bool
twcc *twcc.Responder
twccExtID uint8
enableAudioLossProxying bool
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
lastPacketRead int
// callbacks
onClose func()
onRtcpFeedback func([]rtcp.Packet)
onFinalRtpStats func(*livekit.RTPStats)
onNotifyRTX func(uint32, uint32, string)
primaryBufferForRTX *Buffer
rtxPktBuf []byte
// FlexFEC (publisher -> SFU recovery).
// primaryBufferForFEC is set on a repair (FEC) buffer and points at the source
// buffer it protects. fecDecoder lives on the source buffer and reconstructs lost
// source packets from the repair stream. fecPktBuf is scratch for re-marshaling
// recovered packets before injecting them back into the source buffer.
primaryBufferForFEC *Buffer
fecDecoder *flexfec.Decoder
fecPktBuf []byte
// FlexFEC observability. Counters are cumulative for the lifetime of the source
// buffer; the *Reported fields snapshot the last logged values so the periodic
// summary can report deltas.
fecPacketsReceived uint64
fecPacketsRecovered uint64
fecReportedAt int64
fecReportedReceived uint64
fecReportedRecovered uint64
// Debug-only simulated publisher ("robot") uplink impairment; nil unless the
// LK_UPLINK_* env vars are set. See buffer_impair.go.
impair *uplinkImpair
}
func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer {
b := &Buffer{}
b.BufferBase = NewBufferBase(BufferBaseParams{
SSRC: ssrc,
MaxVideoPkts: maxVideoPkts,
MaxAudioPkts: maxAudioPkts,
LoggerComponents: []string{sutils.ComponentPub, sutils.ComponentSFU},
SendPLI: b.sendPLI,
IsReportingEnabled: true,
})
b.initUplinkImpair()
return b
}
func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) {
b.Lock()
defer b.Unlock()
b.twcc = twcc
b.twccExtID = extID
}
func (b *Buffer) SetAudioLossProxying(enable bool) {
b.Lock()
defer b.Unlock()
b.enableAudioLossProxying = enable
}
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrates int) error {
b.Lock()
if b.isBound {
b.Unlock()
return nil
}
if err := b.BufferBase.BindLocked(params, codec, bitrates); err != nil {
b.Unlock()
return err
}
b.lastReportAt = mono.UnixNano()
if len(b.pPackets) != 0 {
b.logger.Debugw("releasing queued packets on bind", "count", len(b.pPackets))
}
var rtcpPackets []rtcp.Packet
for _, pp := range b.pPackets {
rtcpPackets = append(rtcpPackets, b.calc(pp.packet, nil, pp.arrivalTime, true, false)...)
}
b.pPackets = nil
b.isBound = true
b.Unlock()
if len(rtcpPackets) != 0 {
if cb := b.getOnRtcpFeedback(); cb != nil {
cb(rtcpPackets)
}
}
return nil
}
// Write adds an RTP Packet, ordering is not guaranteed, newer packets may arrive later
// Write is the inbound entry point for the publisher's RTP (media, RTX, and FEC SSRCs).
// It applies the optional debug uplink impairment (loss + one-way delay) before handing
// the packet to writeNow; both are no-ops unless the LK_UPLINK_* env vars are set.
func (b *Buffer) Write(pkt []byte) (int, error) {
if b.impair != nil {
if b.impair.dropInbound() {
// Packet "lost" on the simulated 5G uplink; report success so pion's read
// loop is unaffected.
return len(pkt), nil
}
if b.impair.delayInbound(b, pkt) {
return len(pkt), nil
}
}
return b.writeNow(pkt)
}
func (b *Buffer) writeNow(pkt []byte) (n int, err error) {
var rtpPacket rtp.Packet
err = rtpPacket.Unmarshal(pkt)
if err != nil {
return
}
b.Lock()
if b.BufferBase.IsClosed() {
b.Unlock()
err = io.EOF
return
}
now := mono.UnixNano()
if b.twcc != nil && b.twccExtID != 0 {
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker)
}
}
// libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since
// the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816
if rtpPacket.SSRC == 0 {
b.Unlock()
return
}
// handle RTX packet
if pb := b.primaryBufferForRTX; pb != nil {
b.Unlock()
// skip padding only packets
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
return
}
pb.writeRTX(&rtpPacket, now)
return
}
// handle FlexFEC repair packet
if pb := b.primaryBufferForFEC; pb != nil {
b.Unlock()
pb.writeFEC(&rtpPacket, now)
return
}
if !b.isBound {
packet := make([]byte, len(pkt))
copy(packet, pkt)
if len(b.pPackets) == 0 {
b.logger.Debugw("received first packet")
}
startIdx := 0
overflow := len(b.pPackets) - max(b.BufferBase.MaxVideoPkts(), b.BufferBase.MaxAudioPkts())
if overflow > 0 {
startIdx = overflow
}
b.pPackets = append(b.pPackets[startIdx:], pendingPacket{
packet: packet,
arrivalTime: now,
})
b.BufferBase.NotifyRead()
b.Unlock()
return
}
rtcpPackets := b.calc(pkt, &rtpPacket, now, false, false)
// feed received source packets to the FlexFEC decoder so it can reconstruct
// previously lost packets, and inject any recoveries back into this buffer.
if b.fecDecoder != nil {
b.injectRecoveredLocked(b.fecDecoder.Decode(rtpPacket), now)
}
b.Unlock()
if len(rtcpPackets) != 0 {
b.emitRTCPFeedback(rtcpPackets)
}
return
}
// emitRTCPFeedback delivers SFU->publisher feedback (NACK/RR/etc). When the debug uplink
// delay is set it defers delivery by the one-way delay so a NACK->RTX recovery costs a
// full round trip; otherwise it fires immediately.
func (b *Buffer) emitRTCPFeedback(rtcpPackets []rtcp.Packet) {
if b.impair != nil && b.impair.delayFeedback(b, rtcpPackets) {
return
}
if cb := b.getOnRtcpFeedback(); cb != nil {
cb(rtcpPackets)
}
}
func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) {
b.Lock()
b.primaryBufferForRTX = primaryBuffer
pkts := b.pPackets
b.pPackets = nil
b.Unlock()
for _, pp := range pkts {
var rtpPacket rtp.Packet
err := rtpPacket.Unmarshal(pp.packet)
if err != nil {
continue
}
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
continue
}
primaryBuffer.writeRTX(&rtpPacket, pp.arrivalTime)
}
}
// SetPrimaryBufferForFEC associates this (repair/FEC) buffer with the source buffer it
// protects. It installs a FlexFEC decoder on the source buffer and replays any FEC
// packets that arrived before the association was known.
func (b *Buffer) SetPrimaryBufferForFEC(primaryBuffer *Buffer) {
primaryBuffer.enableFECDecoder(b.BufferBase.SSRC())
b.Lock()
b.primaryBufferForFEC = primaryBuffer
pkts := b.pPackets
b.pPackets = nil
b.Unlock()
for _, pp := range pkts {
var rtpPacket rtp.Packet
if err := rtpPacket.Unmarshal(pp.packet); err != nil {
continue
}
primaryBuffer.writeFEC(&rtpPacket, pp.arrivalTime)
}
}
// enableFECDecoder installs a FlexFEC decoder on this (source) buffer for the given
// repair SSRC. Safe to call before the buffer is bound.
func (b *Buffer) enableFECDecoder(fecSSRC uint32) {
b.Lock()
defer b.Unlock()
if b.fecDecoder == nil {
b.fecDecoder = flexfec.NewDecoder(fecSSRC, b.BufferBase.SSRC())
b.logger.Infow("flexfec decoder enabled", "fecSSRC", fecSSRC, "mediaSSRC", b.BufferBase.SSRC())
}
}
// writeFEC feeds a received FlexFEC repair packet to this (source) buffer's decoder and
// injects any recovered source packets back into the buffer. Called on the source buffer.
func (b *Buffer) writeFEC(fecPkt *rtp.Packet, arrivalTime int64) {
b.Lock()
defer b.Unlock()
if b.fecDecoder == nil {
return
}
b.fecPacketsReceived++
b.injectRecoveredLocked(b.fecDecoder.Decode(*fecPkt), arrivalTime)
b.maybeLogFECStatsLocked(arrivalTime)
}
// maybeLogFECStatsLocked emits a periodic FlexFEC recovery summary at most once per
// fecReportInterval, and only when there has been FEC activity since the last summary.
// b.Lock must be held.
func (b *Buffer) maybeLogFECStatsLocked(now int64) {
if b.fecReportedAt == 0 {
b.fecReportedAt = now
b.fecReportedReceived = b.fecPacketsReceived
b.fecReportedRecovered = b.fecPacketsRecovered
return
}
if now-b.fecReportedAt < fecReportInterval {
return
}
receivedDelta := b.fecPacketsReceived - b.fecReportedReceived
recoveredDelta := b.fecPacketsRecovered - b.fecReportedRecovered
b.fecReportedAt = now
b.fecReportedReceived = b.fecPacketsReceived
b.fecReportedRecovered = b.fecPacketsRecovered
if receivedDelta == 0 && recoveredDelta == 0 {
return
}
b.logger.Infow(
"flexfec recovery stats",
"mediaSSRC", b.BufferBase.SSRC(),
"fecPacketsReceived", b.fecPacketsReceived,
"fecPacketsReceivedDelta", receivedDelta,
"packetsRecovered", b.fecPacketsRecovered,
"packetsRecoveredDelta", recoveredDelta,
)
}
// injectRecoveredLocked re-injects FlexFEC-recovered source packets into the buffer as
// repaired packets (treated like RTX repairs, so they become NACK/forward eligible).
// b.Lock must be held.
func (b *Buffer) injectRecoveredLocked(recovered []rtp.Packet, arrivalTime int64) {
if len(recovered) == 0 || !b.isBound {
return
}
if b.fecPktBuf == nil {
b.fecPktBuf = make([]byte, bucket.RTPMaxPktSize)
}
for i := range recovered {
pkt := recovered[i]
n, err := pkt.MarshalTo(b.fecPktBuf)
if err != nil {
b.logger.Errorw("could not marshal flexfec recovered packet", err, "sn", pkt.SequenceNumber)
continue
}
b.calc(b.fecPktBuf[:n], &pkt, arrivalTime, false, true)
b.fecPacketsRecovered++
}
}
func (b *Buffer) NotifyRTX(ssrc uint32, repairSSRC uint32, rsid string) {
if onNotifyRTX := b.getOnNotifyRTX(); onNotifyRTX != nil {
onNotifyRTX(ssrc, repairSSRC, rsid)
}
}
func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) {
b.Lock()
defer b.Unlock()
if !b.isBound {
return
}
if rtxPkt.PayloadType != b.rtxPayloadType {
b.logger.Debugw("unexpected rtx payload type", "expected", b.rtxPayloadType, "actual", rtxPkt.PayloadType)
return
}
if b.rtxPktBuf == nil {
b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize)
}
if len(rtxPkt.Payload) < 2 {
b.logger.Warnw("rtx payload too short", nil, "size", len(rtxPkt.Payload))
return
}
repairedPkt := *rtxPkt
repairedPkt.PayloadType = b.payloadType
repairedPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2])
repairedPkt.SSRC = b.BufferBase.SSRC()
repairedPkt.Payload = rtxPkt.Payload[2:]
n, err := repairedPkt.MarshalTo(b.rtxPktBuf)
if err != nil {
b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.BufferBase.SSRC(), "sn", repairedPkt.SequenceNumber)
return
}
b.calc(b.rtxPktBuf[:n], &repairedPkt, arrivalTime, false, true)
}
func (b *Buffer) Read(buff []byte) (n int, err error) {
b.Lock()
for {
if b.BufferBase.IsClosed() {
b.Unlock()
return 0, io.EOF
}
if b.pPackets != nil && len(b.pPackets) > b.lastPacketRead {
if len(buff) < len(b.pPackets[b.lastPacketRead].packet) {
b.Unlock()
return 0, bucket.ErrBufferTooSmall
}
n = copy(buff, b.pPackets[b.lastPacketRead].packet)
b.lastPacketRead++
b.Unlock()
return
}
b.BufferBase.WaitRead()
}
}
func (b *Buffer) Close() error {
stats, err := b.BufferBase.CloseWithReason("close")
if err != nil {
return err
}
if stats != nil {
if cb := b.getOnFinalRtpStats(); cb != nil {
cb(stats)
}
}
if cb := b.getOnClose(); cb != nil {
cb()
}
return nil
}
func (b *Buffer) OnClose(fn func()) {
b.Lock()
b.onClose = fn
b.Unlock()
}
func (b *Buffer) getOnClose() func() {
b.RLock()
defer b.RUnlock()
return b.onClose
}
func (b *Buffer) sendPLI() {
ssrc := b.BufferBase.SSRC()
if ssrc == 0 {
return
}
b.logger.Debugw("send pli", "mediaSSRC", ssrc)
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{
SenderSSRC: ssrc,
MediaSSRC: ssrc,
},
}
if cb := b.getOnRtcpFeedback(); cb != nil {
cb(pli)
}
}
func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, isRTX bool) []rtcp.Packet {
b.BufferBase.HandleIncomingPacketLocked(
rawPkt,
rtpPacket,
arrivalTime,
isBuffered,
isRTX,
nil,
0,
)
return b.getRTCPPackets(arrivalTime)
}
func (b *Buffer) getRTCPPackets(arrivalTime int64) []rtcp.Packet {
var pkts []rtcp.Packet
if nackPkt := b.getNACKPacket(); nackPkt != nil {
pkts = append(pkts, nackPkt)
}
if receiverReport := b.getRTCPReceiverReport(arrivalTime); receiverReport != nil {
pkts = append(pkts, receiverReport)
}
return pkts
}
func (b *Buffer) getNACKPacket() *rtcp.TransportLayerNack {
if nacks := b.BufferBase.GetNACKPairsLocked(); len(nacks) > 0 {
ssrc := b.BufferBase.SSRC()
return &rtcp.TransportLayerNack{
SenderSSRC: ssrc,
MediaSSRC: ssrc,
Nacks: nacks,
}
}
return nil
}
func (b *Buffer) getRTCPReceiverReport(arrivalTime int64) *rtcp.ReceiverReport {
if arrivalTime-b.lastReportAt < rtcpReceiverReportDelta {
return nil
}
b.lastReportAt = arrivalTime
proxyLoss := b.lastFractionLostToReport
if b.codecType == webrtc.RTPCodecTypeAudio && !b.enableAudioLossProxying {
proxyLoss = 0
}
if receptionReport := b.BufferBase.GetRtcpReceptionReportLocked(proxyLoss); receptionReport != nil {
return &rtcp.ReceiverReport{
SSRC: b.BufferBase.SSRC(),
Reports: []rtcp.ReceptionReport{*receptionReport},
}
}
return nil
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
b.Lock()
defer b.Unlock()
b.lastFractionLostToReport = lost
}
func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
b.Lock()
b.onRtcpFeedback = fn
b.Unlock()
}
func (b *Buffer) getOnRtcpFeedback() func(fb []rtcp.Packet) {
b.RLock()
defer b.RUnlock()
return b.onRtcpFeedback
}
func (b *Buffer) OnFinalRtpStats(fn func(*livekit.RTPStats)) {
b.Lock()
b.onFinalRtpStats = fn
b.Unlock()
}
func (b *Buffer) getOnFinalRtpStats() func(*livekit.RTPStats) {
b.RLock()
defer b.RUnlock()
return b.onFinalRtpStats
}
func (b *Buffer) OnNotifyRTX(fn func(ssrc uint32, repairSSRC uint32, rsid string)) {
b.Lock()
b.onNotifyRTX = fn
b.Unlock()
}
func (b *Buffer) getOnNotifyRTX() func(ssrc uint32, repairSSRC uint32, rsid string) {
b.RLock()
defer b.RUnlock()
return b.onNotifyRTX
}