mirror of
https://github.com/livekit/livekit.git
synced 2026-07-03 00:22:02 +00:00
195adeb38c
Implements FlexFEC-03 (RFC 8627) end to end in the SFU, in both directions: - Upstream: decode FlexFEC repair packets from publishers and recover lost media in the receive buffer before forwarding (pkg/sfu/flexfec/decoder.go, pkg/sfu/buffer). Recovered packets are surfaced via Prometheus counters. - Downstream: generate FlexFEC for subscribers off the downtrack, paced alongside media (pkg/sfu/downtrack_fec.go, pkg/sfu/pacer). - Negotiation: advertise and match flexfec-03 in the media engine and transport SDP, gated by new config knobs (pkg/rtc, pkg/config, config-sample.yaml). - Telemetry: livekit_fec_* metrics for sent/received/recovered packets (pkg/telemetry/prometheus/packets.go). Tests: unit coverage for the decoder, buffer recovery, downtrack generation, and transport negotiation, plus an end-to-end integration test and the test-client support it needs (test/flexfec_test.go, test/client).
623 lines
15 KiB
Go
623 lines
15 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package buffer
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"io"
|
|
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/webrtc/v4"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu/flexfec"
|
|
sutils "github.com/livekit/livekit-server/pkg/utils"
|
|
"github.com/livekit/mediatransportutil/pkg/bucket"
|
|
"github.com/livekit/mediatransportutil/pkg/twcc"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/utils/mono"
|
|
)
|
|
|
|
const (
|
|
rtcpReceiverReportDelta = 1e9
|
|
|
|
InitPacketBufferSizeVideo = 300
|
|
InitPacketBufferSizeAudio = 70
|
|
)
|
|
|
|
var (
|
|
errInvalidCodec = errors.New("invalid codec")
|
|
)
|
|
|
|
var _ BufferProvider = (*Buffer)(nil)
|
|
|
|
type pendingPacket struct {
|
|
arrivalTime int64
|
|
packet []byte
|
|
}
|
|
|
|
// Buffer contains all packets
|
|
type Buffer struct {
|
|
*BufferBase
|
|
|
|
pPackets []pendingPacket
|
|
lastReportAt int64
|
|
isBound bool
|
|
|
|
twcc *twcc.Responder
|
|
twccExtID uint8
|
|
|
|
enableAudioLossProxying bool
|
|
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
|
|
|
|
lastPacketRead int
|
|
|
|
// callbacks
|
|
onClose func()
|
|
onRtcpFeedback func([]rtcp.Packet)
|
|
onFinalRtpStats func(*livekit.RTPStats)
|
|
onNotifyRTX func(uint32, uint32, string)
|
|
|
|
primaryBufferForRTX *Buffer
|
|
rtxPktBuf []byte
|
|
|
|
primaryBufferForFEC *Buffer
|
|
fecSSRC uint32
|
|
fecDecoder *flexfec.Decoder
|
|
fecPktBuf []byte
|
|
onFECRecovery func(recovered int, received int, discarded int, bytesReceived int)
|
|
}
|
|
|
|
func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer {
|
|
b := &Buffer{}
|
|
b.BufferBase = NewBufferBase(BufferBaseParams{
|
|
SSRC: ssrc,
|
|
MaxVideoPkts: maxVideoPkts,
|
|
MaxAudioPkts: maxAudioPkts,
|
|
LoggerComponents: []string{sutils.ComponentPub, sutils.ComponentSFU},
|
|
SendPLI: b.sendPLI,
|
|
IsReportingEnabled: true,
|
|
})
|
|
return b
|
|
}
|
|
|
|
func (b *Buffer) SetTWCCAndExtID(twcc *twcc.Responder, extID uint8) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.twcc = twcc
|
|
b.twccExtID = extID
|
|
}
|
|
|
|
func (b *Buffer) SetAudioLossProxying(enable bool) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.enableAudioLossProxying = enable
|
|
}
|
|
|
|
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrates int) error {
|
|
b.Lock()
|
|
if b.isBound {
|
|
b.Unlock()
|
|
return nil
|
|
}
|
|
|
|
if err := b.BufferBase.BindLocked(params, codec, bitrates); err != nil {
|
|
b.Unlock()
|
|
return err
|
|
}
|
|
|
|
b.lastReportAt = mono.UnixNano()
|
|
|
|
if len(b.pPackets) != 0 {
|
|
b.logger.Debugw("releasing queued packets on bind", "count", len(b.pPackets))
|
|
}
|
|
var rtcpPackets []rtcp.Packet
|
|
for _, pp := range b.pPackets {
|
|
rtcpPackets = append(rtcpPackets, b.calc(pp.packet, nil, pp.arrivalTime, true, false)...)
|
|
}
|
|
b.pPackets = nil
|
|
|
|
b.isBound = true
|
|
b.maybeCreateFECDecoderLocked()
|
|
b.Unlock()
|
|
|
|
if len(rtcpPackets) != 0 {
|
|
if cb := b.getOnRtcpFeedback(); cb != nil {
|
|
cb(rtcpPackets)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Write adds an RTP Packet, ordering is not guaranteed, newer packets may arrive later
|
|
func (b *Buffer) Write(pkt []byte) (n int, err error) {
|
|
var rtpPacket rtp.Packet
|
|
err = rtpPacket.Unmarshal(pkt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
b.Lock()
|
|
if b.BufferBase.IsClosed() {
|
|
b.Unlock()
|
|
err = io.EOF
|
|
return
|
|
}
|
|
|
|
now := mono.UnixNano()
|
|
if b.twcc != nil && b.twccExtID != 0 {
|
|
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
|
|
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker)
|
|
}
|
|
}
|
|
|
|
// libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since
|
|
// the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816
|
|
if rtpPacket.SSRC == 0 {
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
// handle RTX packet
|
|
if pb := b.primaryBufferForRTX; pb != nil {
|
|
b.Unlock()
|
|
|
|
// skip padding only packets
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
return
|
|
}
|
|
|
|
pb.writeRTX(&rtpPacket, now)
|
|
return
|
|
}
|
|
|
|
// handle FlexFEC packet
|
|
if pb := b.primaryBufferForFEC; pb != nil {
|
|
b.Unlock()
|
|
|
|
// skip padding only packets
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
return
|
|
}
|
|
|
|
pb.writeFEC(&rtpPacket, now)
|
|
return
|
|
}
|
|
|
|
if !b.isBound {
|
|
packet := make([]byte, len(pkt))
|
|
copy(packet, pkt)
|
|
|
|
if len(b.pPackets) == 0 {
|
|
b.logger.Debugw("received first packet")
|
|
}
|
|
|
|
startIdx := 0
|
|
overflow := len(b.pPackets) - max(b.BufferBase.MaxVideoPkts(), b.BufferBase.MaxAudioPkts())
|
|
if overflow > 0 {
|
|
startIdx = overflow
|
|
}
|
|
b.pPackets = append(b.pPackets[startIdx:], pendingPacket{
|
|
packet: packet,
|
|
arrivalTime: now,
|
|
})
|
|
|
|
b.BufferBase.NotifyRead()
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
rtcpPackets := b.calc(pkt, &rtpPacket, now, false, false)
|
|
if b.fecDecoder != nil {
|
|
// feed media into the FEC decoder, a media arrival can complete a
|
|
// previously unrecoverable FEC window
|
|
b.feedFECLocked(&rtpPacket, now)
|
|
}
|
|
b.Unlock()
|
|
|
|
if len(rtcpPackets) != 0 {
|
|
if cb := b.getOnRtcpFeedback(); cb != nil {
|
|
cb(rtcpPackets)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) {
|
|
b.Lock()
|
|
b.primaryBufferForRTX = primaryBuffer
|
|
pkts := b.pPackets
|
|
b.pPackets = nil
|
|
b.Unlock()
|
|
|
|
for _, pp := range pkts {
|
|
var rtpPacket rtp.Packet
|
|
err := rtpPacket.Unmarshal(pp.packet)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
continue
|
|
}
|
|
primaryBuffer.writeRTX(&rtpPacket, pp.arrivalTime)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) NotifyRTX(ssrc uint32, repairSSRC uint32, rsid string) {
|
|
if onNotifyRTX := b.getOnNotifyRTX(); onNotifyRTX != nil {
|
|
onNotifyRTX(ssrc, repairSSRC, rsid)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if !b.isBound {
|
|
return
|
|
}
|
|
|
|
if rtxPkt.PayloadType != b.rtxPayloadType {
|
|
b.logger.Debugw("unexpected rtx payload type", "expected", b.rtxPayloadType, "actual", rtxPkt.PayloadType)
|
|
return
|
|
}
|
|
|
|
if b.rtxPktBuf == nil {
|
|
b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize)
|
|
}
|
|
|
|
if len(rtxPkt.Payload) < 2 {
|
|
b.logger.Warnw("rtx payload too short", nil, "size", len(rtxPkt.Payload))
|
|
return
|
|
}
|
|
|
|
repairedPkt := *rtxPkt
|
|
repairedPkt.PayloadType = b.payloadType
|
|
repairedPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2])
|
|
repairedPkt.SSRC = b.BufferBase.SSRC()
|
|
repairedPkt.Payload = rtxPkt.Payload[2:]
|
|
n, err := repairedPkt.MarshalTo(b.rtxPktBuf)
|
|
if err != nil {
|
|
b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.BufferBase.SSRC(), "sn", repairedPkt.SequenceNumber)
|
|
return
|
|
}
|
|
|
|
b.calc(b.rtxPktBuf[:n], &repairedPkt, arrivalTime, false, true)
|
|
if b.fecDecoder != nil {
|
|
b.feedFECLocked(&repairedPkt, arrivalTime)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) SetPrimaryBufferForFEC(primaryBuffer *Buffer) {
|
|
b.Lock()
|
|
b.primaryBufferForFEC = primaryBuffer
|
|
pkts := b.pPackets
|
|
b.pPackets = nil
|
|
ssrc := b.BufferBase.SSRC()
|
|
b.Unlock()
|
|
|
|
// let the primary know the repair stream SSRC so its decoder starts
|
|
// filling with media before the first FEC packet shows up
|
|
primaryBuffer.setFECSSRC(ssrc)
|
|
|
|
for _, pp := range pkts {
|
|
var rtpPacket rtp.Packet
|
|
err := rtpPacket.Unmarshal(pp.packet)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if rtpPacket.Padding && len(rtpPacket.Payload) == 0 {
|
|
continue
|
|
}
|
|
primaryBuffer.writeFEC(&rtpPacket, pp.arrivalTime)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) setFECSSRC(ssrc uint32) {
|
|
b.Lock()
|
|
b.fecSSRC = ssrc
|
|
b.maybeCreateFECDecoderLocked()
|
|
b.Unlock()
|
|
}
|
|
|
|
// maybeCreateFECDecoderLocked creates the FEC decoder as soon as the repair
|
|
// stream SSRC is known and the buffer is bound with a negotiated flexfec
|
|
// payload type. Eager creation lets the decoder track media packets before
|
|
// the first FEC packet arrives, otherwise the leading protection windows
|
|
// would be unrecoverable.
|
|
func (b *Buffer) maybeCreateFECDecoderLocked() {
|
|
if b.fecDecoder != nil || b.fecSSRC == 0 || !b.isBound || b.fecPayloadType == 0 {
|
|
return
|
|
}
|
|
|
|
b.fecDecoder = flexfec.NewDecoder(b.fecSSRC, b.BufferBase.SSRC(), b.logger)
|
|
b.logger.Debugw("flexfec decoder created", "fecSSRC", b.fecSSRC, "mediaSSRC", b.BufferBase.SSRC())
|
|
}
|
|
|
|
// OnFECRecovery is called with counter deltas whenever FEC packets are
|
|
// processed: recovered media packets, FEC packets received and FEC packets
|
|
// discarded since the previous callback.
|
|
func (b *Buffer) OnFECRecovery(fn func(recovered int, received int, discarded int, bytesReceived int)) {
|
|
b.Lock()
|
|
b.onFECRecovery = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
// FECDecoderStats returns cumulative FlexFEC decode counters of the buffer.
|
|
func (b *Buffer) FECDecoderStats() flexfec.DecoderStats {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
if b.fecDecoder == nil {
|
|
return flexfec.DecoderStats{}
|
|
}
|
|
return b.fecDecoder.Stats()
|
|
}
|
|
|
|
// writeFEC handles a packet of the coupled FlexFEC repair stream, recovered
|
|
// media packets are injected into the regular packet pipeline.
|
|
func (b *Buffer) writeFEC(fecPkt *rtp.Packet, arrivalTime int64) {
|
|
b.Lock()
|
|
if !b.isBound {
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
// the FEC stream is never bound in pion, run its TWCC accounting here so
|
|
// publisher send side BWE sees the FEC packets acked. The repair stream
|
|
// shares the media m-line, extension ids match the primary stream.
|
|
if b.twcc != nil && b.twccExtID != 0 {
|
|
if ext := fecPkt.GetExtension(b.twccExtID); ext != nil {
|
|
b.twcc.Push(fecPkt.SSRC, binary.BigEndian.Uint16(ext[0:2]), arrivalTime, fecPkt.Marker)
|
|
}
|
|
}
|
|
|
|
if b.fecPayloadType == 0 || fecPkt.PayloadType != b.fecPayloadType {
|
|
b.logger.Debugw("unexpected fec payload type", "expected", b.fecPayloadType, "actual", fecPkt.PayloadType)
|
|
b.Unlock()
|
|
return
|
|
}
|
|
|
|
if b.fecDecoder == nil {
|
|
// normally created when the pair is declared, fall back to the
|
|
// observed repair stream SSRC
|
|
b.fecSSRC = fecPkt.SSRC
|
|
b.maybeCreateFECDecoderLocked()
|
|
if b.fecDecoder == nil {
|
|
b.Unlock()
|
|
return
|
|
}
|
|
}
|
|
|
|
b.feedFECLocked(fecPkt, arrivalTime)
|
|
b.Unlock()
|
|
}
|
|
|
|
// feedFECLocked runs a media or FEC packet through the FEC decoder and
|
|
// injects recovered packets into the packet pipeline. Must be called with the
|
|
// buffer lock held and a non-nil decoder.
|
|
func (b *Buffer) feedFECLocked(pkt *rtp.Packet, arrivalTime int64) {
|
|
statsBefore := b.fecDecoder.Stats()
|
|
recovered := b.fecDecoder.DecodeFec(pkt)
|
|
|
|
if b.fecPktBuf == nil {
|
|
b.fecPktBuf = make([]byte, bucket.RTPMaxPktSize)
|
|
}
|
|
for _, rp := range recovered {
|
|
n, err := rp.MarshalTo(b.fecPktBuf)
|
|
if err != nil {
|
|
b.logger.Warnw("could not marshal fec recovered packet", err, "ssrc", b.BufferBase.SSRC(), "sn", rp.SequenceNumber)
|
|
continue
|
|
}
|
|
|
|
// recovered packets flow through the regular pipeline: they are
|
|
// forwarded downstream and stop NACKs for the lost sequence numbers.
|
|
// they do not re-enter the decoder, it already has them in its window.
|
|
b.calc(b.fecPktBuf[:n], rp, arrivalTime, false, true)
|
|
}
|
|
|
|
if cb := b.onFECRecovery; cb != nil {
|
|
statsAfter := b.fecDecoder.Stats()
|
|
received := int(statsAfter.FECPacketsReceived - statsBefore.FECPacketsReceived)
|
|
discarded := int(statsAfter.FECPacketsDiscarded - statsBefore.FECPacketsDiscarded)
|
|
bytesReceived := int(statsAfter.FECBytesReceived - statsBefore.FECBytesReceived)
|
|
if len(recovered) > 0 || received > 0 || discarded > 0 {
|
|
cb(len(recovered), received, discarded, bytesReceived)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) Read(buff []byte) (n int, err error) {
|
|
b.Lock()
|
|
for {
|
|
if b.BufferBase.IsClosed() {
|
|
b.Unlock()
|
|
return 0, io.EOF
|
|
}
|
|
|
|
if b.pPackets != nil && len(b.pPackets) > b.lastPacketRead {
|
|
if len(buff) < len(b.pPackets[b.lastPacketRead].packet) {
|
|
b.Unlock()
|
|
return 0, bucket.ErrBufferTooSmall
|
|
}
|
|
|
|
n = copy(buff, b.pPackets[b.lastPacketRead].packet)
|
|
b.lastPacketRead++
|
|
b.Unlock()
|
|
return
|
|
}
|
|
b.BufferBase.WaitRead()
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) Close() error {
|
|
stats, err := b.BufferBase.CloseWithReason("close")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if stats != nil {
|
|
if cb := b.getOnFinalRtpStats(); cb != nil {
|
|
cb(stats)
|
|
}
|
|
}
|
|
|
|
if cb := b.getOnClose(); cb != nil {
|
|
cb()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) OnClose(fn func()) {
|
|
b.Lock()
|
|
b.onClose = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnClose() func() {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onClose
|
|
}
|
|
|
|
func (b *Buffer) sendPLI() {
|
|
ssrc := b.BufferBase.SSRC()
|
|
if ssrc == 0 {
|
|
return
|
|
}
|
|
|
|
b.logger.Debugw("send pli", "mediaSSRC", ssrc)
|
|
pli := []rtcp.Packet{
|
|
&rtcp.PictureLossIndication{
|
|
SenderSSRC: ssrc,
|
|
MediaSSRC: ssrc,
|
|
},
|
|
}
|
|
|
|
if cb := b.getOnRtcpFeedback(); cb != nil {
|
|
cb(pli)
|
|
}
|
|
}
|
|
|
|
func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, isRTX bool) []rtcp.Packet {
|
|
b.BufferBase.HandleIncomingPacketLocked(
|
|
rawPkt,
|
|
rtpPacket,
|
|
arrivalTime,
|
|
isBuffered,
|
|
isRTX,
|
|
nil,
|
|
0,
|
|
)
|
|
|
|
return b.getRTCPPackets(arrivalTime)
|
|
}
|
|
|
|
func (b *Buffer) getRTCPPackets(arrivalTime int64) []rtcp.Packet {
|
|
var pkts []rtcp.Packet
|
|
if nackPkt := b.getNACKPacket(); nackPkt != nil {
|
|
pkts = append(pkts, nackPkt)
|
|
}
|
|
if receiverReport := b.getRTCPReceiverReport(arrivalTime); receiverReport != nil {
|
|
pkts = append(pkts, receiverReport)
|
|
}
|
|
|
|
return pkts
|
|
}
|
|
|
|
func (b *Buffer) getNACKPacket() *rtcp.TransportLayerNack {
|
|
if nacks := b.BufferBase.GetNACKPairsLocked(); len(nacks) > 0 {
|
|
ssrc := b.BufferBase.SSRC()
|
|
return &rtcp.TransportLayerNack{
|
|
SenderSSRC: ssrc,
|
|
MediaSSRC: ssrc,
|
|
Nacks: nacks,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) getRTCPReceiverReport(arrivalTime int64) *rtcp.ReceiverReport {
|
|
if arrivalTime-b.lastReportAt < rtcpReceiverReportDelta {
|
|
return nil
|
|
}
|
|
b.lastReportAt = arrivalTime
|
|
|
|
proxyLoss := b.lastFractionLostToReport
|
|
if b.codecType == webrtc.RTPCodecTypeAudio && !b.enableAudioLossProxying {
|
|
proxyLoss = 0
|
|
}
|
|
|
|
if receptionReport := b.BufferBase.GetRtcpReceptionReportLocked(proxyLoss); receptionReport != nil {
|
|
return &rtcp.ReceiverReport{
|
|
SSRC: b.BufferBase.SSRC(),
|
|
Reports: []rtcp.ReceptionReport{*receptionReport},
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
b.lastFractionLostToReport = lost
|
|
}
|
|
|
|
func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
|
|
b.Lock()
|
|
b.onRtcpFeedback = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnRtcpFeedback() func(fb []rtcp.Packet) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onRtcpFeedback
|
|
}
|
|
|
|
func (b *Buffer) OnFinalRtpStats(fn func(*livekit.RTPStats)) {
|
|
b.Lock()
|
|
b.onFinalRtpStats = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnFinalRtpStats() func(*livekit.RTPStats) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onFinalRtpStats
|
|
}
|
|
|
|
func (b *Buffer) OnNotifyRTX(fn func(ssrc uint32, repairSSRC uint32, rsid string)) {
|
|
b.Lock()
|
|
b.onNotifyRTX = fn
|
|
b.Unlock()
|
|
}
|
|
|
|
func (b *Buffer) getOnNotifyRTX() func(ssrc uint32, repairSSRC uint32, rsid string) {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
|
|
return b.onNotifyRTX
|
|
}
|