diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 6d41fa99f..c7ca7e30d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -27,9 +27,6 @@ type MediaTrack struct { layerSSRCs [livekit.VideoQuality_HIGH + 1]uint32 - audioLevelMu sync.RWMutex - audioLevel *AudioLevel - *MediaTrackReceiver lock sync.RWMutex @@ -121,24 +118,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra return } - if t.Kind() == livekit.TrackType_AUDIO { - t.audioLevelMu.Lock() - t.audioLevel = NewAudioLevel(t.params.AudioConfig.ActiveLevel, t.params.AudioConfig.MinPercentile, t.params.AudioConfig.UpdateInterval) - buff.OnAudioLevel(func(level uint8, duration uint32) { - t.audioLevelMu.RLock() - defer t.audioLevelMu.RUnlock() - - t.audioLevel.Observe(level, duration) - }) - t.audioLevelMu.Unlock() - } else if t.Kind() == livekit.TrackType_VIDEO { - if twcc != nil { - buff.OnTransportWideCC(func(sn uint16, timeNS int64, marker bool) { - twcc.Push(sn, timeNS, marker) - }) - } - } - rtcpReader.OnPacket(func(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { @@ -164,7 +143,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.PublisherID(), t.params.TrackInfo.Source, t.params.Logger, - sfu.WithPliThrottle(t.params.PLIThrottleConfig), + twcc, + sfu.WithPliThrottleConfig(t.params.PLIThrottleConfig), + sfu.WithAudioConfig(t.params.AudioConfig), sfu.WithLoadBalanceThreshold(20), sfu.WithStreamTrackers(), ) @@ -221,16 +202,6 @@ func (t *MediaTrack) TrySetSimulcastSSRC(layer uint8, ssrc uint32) { } } -func (t *MediaTrack) GetAudioLevel() (level uint8, active bool) { - t.audioLevelMu.RLock() - defer t.audioLevelMu.RUnlock() - - if t.audioLevel == nil { - return SilentAudioLevel, false - } - return t.audioLevel.GetLevel() -} - func (t *MediaTrack) GetConnectionScore() float32 { receiver := t.Receiver() if receiver == nil { diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index daceda756..44b904752 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -15,6 +15,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" ) @@ -192,7 +193,6 @@ func (t *MediaTrackReceiver) AddOnClose(f func()) { // AddSubscriber subscribes sub to current mediaTrack func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) error { receiver := t.Receiver() - if receiver == nil { // cannot add, no receiver return errors.New("cannot subscribe without a receiver in place") @@ -304,6 +304,15 @@ func (t *MediaTrackReceiver) GetQualityForDimension(width, height uint32) liveki return quality } +func (t *MediaTrackReceiver) GetAudioLevel() (uint8, bool) { + receiver := t.Receiver() + if receiver == nil { + return audio.SilentAudioLevel, false + } + + return receiver.GetAudioLevel() +} + // handles max loss for audio streams func (t *MediaTrackReceiver) handleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) { t.downFracLostLock.Lock() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 384b4813b..c4322b819 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -23,6 +23,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/telemetry" @@ -689,7 +690,7 @@ func (p *ParticipantImpl) ICERestart() error { // func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) { - level = SilentAudioLevel + level = audio.SilentAudioLevel for _, pt := range p.GetPublishedTracks() { mediaTrack := pt.(types.LocalMediaTrack) if mediaTrack.Source() == livekit.TrackSource_MICROPHONE { @@ -1466,9 +1467,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei if p.twcc == nil { p.twcc = twcc.NewTransportWideCCResponder(ssrc) p.twcc.OnFeedback(func(pkt rtcp.RawPacket) { - if err := p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt}); err != nil { - p.params.Logger.Errorw("could not write RTCP to participant", err) - } + p.rtcpCh <- []rtcp.Packet{&pkt} }) } p.pendingTracksLock.Unlock() diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 12308cb9a..661610b63 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -16,6 +16,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" @@ -130,7 +131,7 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { } speakers = append(speakers, &livekit.SpeakerInfo{ Sid: string(p.ID()), - Level: ConvertAudioLevel(level), + Level: audio.ConvertAudioLevel(level), Active: active, }) } @@ -768,7 +769,7 @@ func (r *Room) audioUpdateWorker() { smoothValues = make(map[livekit.ParticipantID]float32) // exponential moving average (EMA), same center of mass with simple moving average (SMA) smoothFactor = 2 / float32(ss+1) - activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel) + activeThreshold = audio.ConvertAudioLevel(r.audioConfig.ActiveLevel) } lastActiveMap := make(map[livekit.ParticipantID]*livekit.SpeakerInfo) diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index e06c9c7d0..223015ff1 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -16,6 +16,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" + "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" "github.com/livekit/livekit-server/pkg/testutils" @@ -360,7 +361,7 @@ func TestActiveSpeakers(t *testing.T) { p := participants[0].(*typesfakes.FakeLocalParticipant) op := participants[1].(*typesfakes.FakeLocalParticipant) p.GetAudioLevelReturns(30, true) - convertedLevel := rtc.ConvertAudioLevel(30) + convertedLevel := audio.ConvertAudioLevel(30) testutils.WithTimeout(t, func() string { updates := getActiveSpeakerUpdates(op) diff --git a/pkg/rtc/audiolevel.go b/pkg/sfu/audio/audiolevel.go similarity index 71% rename from pkg/rtc/audiolevel.go rename to pkg/sfu/audio/audiolevel.go index c839ede68..986113782 100644 --- a/pkg/rtc/audiolevel.go +++ b/pkg/sfu/audio/audiolevel.go @@ -1,4 +1,4 @@ -package rtc +package audio import ( "math" @@ -11,28 +11,33 @@ const ( SilentAudioLevel = 127 ) +type AudioLevelParams struct { + ActiveLevel uint8 + MinPercentile uint8 + ObserveDuration uint32 +} + // keeps track of audio level for a participant type AudioLevel struct { - levelThreshold uint8 - currentLevel *atomic.Uint32 + params AudioLevelParams + + currentLevel *atomic.Uint32 // min duration to be considered active minActiveDuration uint32 // for Observe goroutine use // keeps track of current activity - observeLevel uint8 - activeDuration uint32 // ms - observedDuration uint32 // ms - durationToObserve uint32 // ms + observeLevel uint8 + activeDuration uint32 // ms + observedDuration uint32 // ms } -func NewAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint32) *AudioLevel { +func NewAudioLevel(params AudioLevelParams) *AudioLevel { l := &AudioLevel{ - levelThreshold: activeLevel, - minActiveDuration: uint32(minPercentile) * observeDuration / 100, + params: params, + minActiveDuration: uint32(params.MinPercentile) * params.ObserveDuration / 100, currentLevel: atomic.NewUint32(SilentAudioLevel), observeLevel: SilentAudioLevel, - durationToObserve: observeDuration, } return l } @@ -41,17 +46,17 @@ func NewAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint3 func (l *AudioLevel) Observe(level uint8, durationMs uint32) { l.observedDuration += durationMs - if level <= l.levelThreshold { + if level <= l.params.ActiveLevel { l.activeDuration += durationMs if l.observeLevel > level { l.observeLevel = level } } - if l.observedDuration >= l.durationToObserve { + if l.observedDuration >= l.params.ObserveDuration { // compute and reset if l.activeDuration >= l.minActiveDuration { - level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeDuration)/float64(l.durationToObserve))) + level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeDuration)/float64(l.params.ObserveDuration))) l.currentLevel.Store(level) } else { l.currentLevel.Store(SilentAudioLevel) diff --git a/pkg/rtc/audiolevel_test.go b/pkg/sfu/audio/audiolevel_test.go similarity index 66% rename from pkg/rtc/audiolevel_test.go rename to pkg/sfu/audio/audiolevel_test.go index aa56711a7..8c6da378e 100644 --- a/pkg/rtc/audiolevel_test.go +++ b/pkg/sfu/audio/audiolevel_test.go @@ -1,11 +1,9 @@ -package rtc_test +package audio import ( "testing" "github.com/stretchr/testify/require" - - "github.com/livekit/livekit-server/pkg/rtc" ) const ( @@ -18,7 +16,7 @@ const ( func TestAudioLevel(t *testing.T) { t.Run("initially to return not noisy, within a few samples", func(t *testing.T) { - a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) + a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) _, noisy := a.GetLevel() require.False(t, noisy) @@ -28,7 +26,7 @@ func TestAudioLevel(t *testing.T) { }) t.Run("not noisy when all samples are below threshold", func(t *testing.T) { - a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) + a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) observeSamples(a, 35, 100) _, noisy := a.GetLevel() @@ -36,7 +34,7 @@ func TestAudioLevel(t *testing.T) { }) t.Run("not noisy when less than percentile samples are above threshold", func(t *testing.T) { - a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) + a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) observeSamples(a, 35, samplesPerBatch-2) observeSamples(a, 25, 1) @@ -47,7 +45,7 @@ func TestAudioLevel(t *testing.T) { }) t.Run("noisy when higher than percentile samples are above threshold", func(t *testing.T) { - a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) + a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) observeSamples(a, 35, samplesPerBatch-16) observeSamples(a, 25, 8) @@ -60,7 +58,15 @@ func TestAudioLevel(t *testing.T) { }) } -func observeSamples(a *rtc.AudioLevel, level uint8, count int) { +func createAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint32) *AudioLevel { + return NewAudioLevel(AudioLevelParams{ + ActiveLevel: activeLevel, + MinPercentile: minPercentile, + ObserveDuration: observeDuration, + }) +} + +func observeSamples(a *AudioLevel, level uint8, count int) { for i := 0; i < count; i++ { a.Observe(level, 20) } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index e5384a13a..a8b0f088f 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -18,6 +18,8 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/livekit-server/pkg/sfu/audio" + "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/utils" ) @@ -25,17 +27,6 @@ const ( ReportDelta = 1e9 ) -type twccMetadata struct { - sn uint16 - arrivalTime int64 - marker bool -} - -type audioLevelMetadata struct { - level uint8 - duration uint32 -} - type pendingPacket struct { arrivalTime int64 packet []byte @@ -55,33 +46,30 @@ type ExtPacket struct { // Buffer contains all packets type Buffer struct { sync.RWMutex - bucket *Bucket - nacker *NackQueue - videoPool *sync.Pool - audioPool *sync.Pool - codecType webrtc.RTPCodecType - extPackets deque.Deque - pPackets []pendingPacket - closeOnce sync.Once - mediaSSRC uint32 - clockRate uint32 - lastReport int64 - twccExt uint8 - audioExt uint8 - bound bool - closed atomic.Bool - mime string + bucket *Bucket + nacker *NackQueue + videoPool *sync.Pool + audioPool *sync.Pool + codecType webrtc.RTPCodecType + extPackets deque.Deque + pPackets []pendingPacket + closeOnce sync.Once + mediaSSRC uint32 + clockRate uint32 + lastReport int64 + twccExt uint8 + audioLevelExt uint8 + bound bool + closed atomic.Bool + mime string // supported feedbacks - remb bool - nack bool - twcc bool - audioLevel bool latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 - twccPending []twccMetadata - audioLevelPending []audioLevelMetadata + twcc *twcc.Responder + audioLevelParams audio.AudioLevelParams + audioLevel *audio.AudioLevel lastPacketRead int @@ -95,10 +83,8 @@ type Buffer struct { lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only // callbacks - onClose func() - onAudioLevel func(level uint8, durationMs uint32) - feedbackCB func([]rtcp.Packet) - feedbackTWCC func(sn uint16, timeNS int64, marker bool) + onClose func() + feedbackCB func([]rtcp.Packet) callbacksQueue *utils.OpsQueue @@ -137,6 +123,20 @@ func (b *Buffer) SetLogger(logger logger.Logger) { } } +func (b *Buffer) SetTWCC(twcc *twcc.Responder) { + b.Lock() + defer b.Unlock() + + b.twcc = twcc +} + +func (b *Buffer) SetAudioLevelParams(audioLevelParams audio.AudioLevelParams) { + b.Lock() + defer b.Unlock() + + b.audioLevelParams = audioLevelParams +} + func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability) { b.Lock() defer b.Unlock() @@ -169,35 +169,31 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.codecType = webrtc.RTPCodecType(0) } - for _, ext := range params.HeaderExtensions { - if ext.URI == sdp.TransportCCURI { - b.twccExt = uint8(ext.ID) - break - } - } - if b.codecType == webrtc.RTPCodecTypeVideo { for _, fb := range codec.RTCPFeedback { switch fb.Type { case webrtc.TypeRTCPFBGoogREMB: b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB) b.logger.Warnw("REMB not supported, RTCP feedback will not be generated", nil) - b.remb = true case webrtc.TypeRTCPFBTransportCC: b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC) - b.twcc = true + for _, ext := range params.HeaderExtensions { + if ext.URI == sdp.TransportCCURI { + b.twccExt = uint8(ext.ID) + break + } + } case webrtc.TypeRTCPFBNACK: b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBNACK) b.nacker = NewNACKQueue() b.nacker.SetRTT(70) // default till it is updated - b.nack = true } } } else if b.codecType == webrtc.RTPCodecTypeAudio { for _, h := range params.HeaderExtensions { if h.URI == sdp.AudioLevelURI { - b.audioLevel = true - b.audioExt = uint8(h.ID) + b.audioLevelExt = uint8(h.ID) + b.audioLevel = audio.NewAudioLevel(b.audioLevelParams) } } } @@ -408,37 +404,24 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowStat func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { // submit to TWCC even if it is a padding only packet. Clients use padding only packets as probes // for bandwidth estimation - if b.twcc { - if ext := p.GetExtension(b.twccExt); len(ext) > 1 { - b.twccPending = append(b.twccPending, twccMetadata{ - sn: binary.BigEndian.Uint16(ext[0:2]), - arrivalTime: arrivalTime, - marker: p.Marker, - }) - if len(b.twccPending) == 1 { - b.callbacksQueue.Enqueue(b.processTWCCPending) - } + if b.twcc != nil && b.twccExt != 0 { + if ext := p.GetExtension(b.twccExt); ext != nil { + b.twcc.Push(binary.BigEndian.Uint16(ext[0:2]), arrivalTime, p.Marker) } } - if b.audioLevel { + if b.audioLevelExt != 0 { if !b.latestTSForAudioLevelInitialized { b.latestTSForAudioLevelInitialized = true b.latestTSForAudioLevel = p.Timestamp } - if e := p.GetExtension(b.audioExt); e != nil && b.onAudioLevel != nil { + if e := p.GetExtension(b.audioLevelExt); e != nil { ext := rtp.AudioLevelExtension{} if err := ext.Unmarshal(e); err == nil { if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) { duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate) if duration > 0 { - b.audioLevelPending = append(b.audioLevelPending, audioLevelMetadata{ - level: ext.Level, - duration: uint32(duration), - }) - if len(b.audioLevelPending) == 1 { - b.callbacksQueue.Enqueue(b.processAudioLevelPending) - } + b.audioLevel.Observe(ext.Level, uint32(duration)) } b.latestTSForAudioLevel = p.Timestamp @@ -583,18 +566,10 @@ func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) { return b.bucket.GetPacket(buff, sn) } -func (b *Buffer) OnTransportWideCC(fn func(sn uint16, timeNS int64, marker bool)) { - b.feedbackTWCC = fn -} - func (b *Buffer) OnFeedback(fn func(fb []rtcp.Packet)) { b.feedbackCB = fn } -func (b *Buffer) OnAudioLevel(fn func(level uint8, durationMs uint32)) { - b.onAudioLevel = fn -} - // GetMediaSSRC returns the associated SSRC of the RTP stream func (b *Buffer) GetMediaSSRC() uint32 { return b.mediaSSRC @@ -653,28 +628,13 @@ func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers { } } -func (b *Buffer) processTWCCPending() { - b.Lock() - pending := b.twccPending - b.twccPending = nil - b.Unlock() +func (b *Buffer) GetAudioLevel() (uint8, bool) { + b.RLock() + defer b.RUnlock() - if b.feedbackTWCC != nil { - for _, p := range pending { - b.feedbackTWCC(p.sn, p.arrivalTime, p.marker) - } + if b.audioLevel == nil { + return audio.SilentAudioLevel, false } -} -func (b *Buffer) processAudioLevelPending() { - b.Lock() - pending := b.audioLevelPending - b.audioLevelPending = nil - b.Unlock() - - if b.onAudioLevel != nil { - for _, p := range pending { - b.onAudioLevel(p.level, p.duration) - } - } + return b.audioLevel.GetLevel() } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index cb8e08981..798b74cab 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -17,8 +17,10 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/sfu/audio" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" + "github.com/livekit/livekit-server/pkg/sfu/twcc" ) var ( @@ -38,6 +40,8 @@ type TrackReceiver interface { ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) GetBitrateTemporalCumulative() Bitrates + GetAudioLevel() (uint8, bool) + SendPLI(layer int32) SetUpTrackPaused(paused bool) @@ -54,6 +58,7 @@ type WebRTCReceiver struct { logger logger.Logger pliThrottleConfig config.PLIThrottleConfig + audioConfig config.AudioConfig peerID livekit.ParticipantID trackID livekit.TrackID @@ -69,6 +74,8 @@ type WebRTCReceiver struct { rtcpCh chan []rtcp.Packet + twcc *twcc.Responder + bufferMu sync.RWMutex buffers [DefaultMaxLayerSpatial + 1]*buffer.Buffer rtt uint32 @@ -104,14 +111,22 @@ func RidToLayer(rid string) int32 { type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver -// WithPliThrottle indicates minimum time(ms) between sending PLIs -func WithPliThrottle(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts { +// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs +func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts { return func(w *WebRTCReceiver) *WebRTCReceiver { w.pliThrottleConfig = pliThrottleConfig return w } } +// WithAudioConfig sets up parameters for active speaker detection +func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts { + return func(w *WebRTCReceiver) *WebRTCReceiver { + w.audioConfig = audioConfig + return w + } +} + // WithStreamTrackers enables StreamTracker use for simulcast func WithStreamTrackers() ReceiverOpts { return func(w *WebRTCReceiver) *WebRTCReceiver { @@ -139,6 +154,7 @@ func NewWebRTCReceiver( pid livekit.ParticipantID, source livekit.TrackSource, logger logger.Logger, + twcc *twcc.Responder, opts ...ReceiverOpts, ) *WebRTCReceiver { w := &WebRTCReceiver{ @@ -151,6 +167,7 @@ func NewWebRTCReceiver( kind: track.Kind(), // LK-TODO: this should be based on VideoLayers protocol message rather than RID based isSimulcast: len(track.RID()) > 0, + twcc: twcc, downTracks: make([]TrackSender, 0), index: make(map[livekit.ParticipantID]int), free: make(map[int]struct{}), @@ -253,6 +270,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff layer := RidToLayer(track.RID()) buff.SetLogger(logger.Logger(logr.Logger(w.logger).WithValues("layer", layer))) + buff.SetTWCC(w.twcc) + buff.SetAudioLevelParams(audio.AudioLevelParams{ + ActiveLevel: w.audioConfig.ActiveLevel, + MinPercentile: w.audioConfig.MinPercentile, + ObserveDuration: w.audioConfig.UpdateInterval, + }) buff.OnFeedback(w.sendRTCP) var duration time.Duration @@ -427,6 +450,25 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { return buffer.AggregateRTPStats(stats) } +func (w *WebRTCReceiver) GetAudioLevel() (uint8, bool) { + if w.Kind() == webrtc.RTPCodecTypeVideo { + return audio.SilentAudioLevel, false + } + + w.bufferMu.RLock() + defer w.bufferMu.RUnlock() + + for _, buff := range w.buffers { + if buff == nil { + continue + } + + return buff.GetAudioLevel() + } + + return audio.SilentAudioLevel, false +} + func (w *WebRTCReceiver) getQualityParams() *buffer.ConnectionQualityParams { w.bufferMu.RLock() defer w.bufferMu.RUnlock()