mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 20:35:27 +00:00
Process header extensions in line (#635)
* WIP commit * Pass audio config * Fix test compile
This commit is contained in:
+3
-32
@@ -27,9 +27,6 @@ type MediaTrack struct {
|
||||
|
||||
layerSSRCs [livekit.VideoQuality_HIGH + 1]uint32
|
||||
|
||||
audioLevelMu sync.RWMutex
|
||||
audioLevel *AudioLevel
|
||||
|
||||
*MediaTrackReceiver
|
||||
|
||||
lock sync.RWMutex
|
||||
@@ -121,24 +118,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
return
|
||||
}
|
||||
|
||||
if t.Kind() == livekit.TrackType_AUDIO {
|
||||
t.audioLevelMu.Lock()
|
||||
t.audioLevel = NewAudioLevel(t.params.AudioConfig.ActiveLevel, t.params.AudioConfig.MinPercentile, t.params.AudioConfig.UpdateInterval)
|
||||
buff.OnAudioLevel(func(level uint8, duration uint32) {
|
||||
t.audioLevelMu.RLock()
|
||||
defer t.audioLevelMu.RUnlock()
|
||||
|
||||
t.audioLevel.Observe(level, duration)
|
||||
})
|
||||
t.audioLevelMu.Unlock()
|
||||
} else if t.Kind() == livekit.TrackType_VIDEO {
|
||||
if twcc != nil {
|
||||
buff.OnTransportWideCC(func(sn uint16, timeNS int64, marker bool) {
|
||||
twcc.Push(sn, timeNS, marker)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
rtcpReader.OnPacket(func(bytes []byte) {
|
||||
pkts, err := rtcp.Unmarshal(bytes)
|
||||
if err != nil {
|
||||
@@ -164,7 +143,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
t.PublisherID(),
|
||||
t.params.TrackInfo.Source,
|
||||
t.params.Logger,
|
||||
sfu.WithPliThrottle(t.params.PLIThrottleConfig),
|
||||
twcc,
|
||||
sfu.WithPliThrottleConfig(t.params.PLIThrottleConfig),
|
||||
sfu.WithAudioConfig(t.params.AudioConfig),
|
||||
sfu.WithLoadBalanceThreshold(20),
|
||||
sfu.WithStreamTrackers(),
|
||||
)
|
||||
@@ -221,16 +202,6 @@ func (t *MediaTrack) TrySetSimulcastSSRC(layer uint8, ssrc uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *MediaTrack) GetAudioLevel() (level uint8, active bool) {
|
||||
t.audioLevelMu.RLock()
|
||||
defer t.audioLevelMu.RUnlock()
|
||||
|
||||
if t.audioLevel == nil {
|
||||
return SilentAudioLevel, false
|
||||
}
|
||||
return t.audioLevel.GetLevel()
|
||||
}
|
||||
|
||||
func (t *MediaTrack) GetConnectionScore() float32 {
|
||||
receiver := t.Receiver()
|
||||
if receiver == nil {
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
)
|
||||
@@ -192,7 +193,6 @@ func (t *MediaTrackReceiver) AddOnClose(f func()) {
|
||||
// AddSubscriber subscribes sub to current mediaTrack
|
||||
func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) error {
|
||||
receiver := t.Receiver()
|
||||
|
||||
if receiver == nil {
|
||||
// cannot add, no receiver
|
||||
return errors.New("cannot subscribe without a receiver in place")
|
||||
@@ -304,6 +304,15 @@ func (t *MediaTrackReceiver) GetQualityForDimension(width, height uint32) liveki
|
||||
return quality
|
||||
}
|
||||
|
||||
func (t *MediaTrackReceiver) GetAudioLevel() (uint8, bool) {
|
||||
receiver := t.Receiver()
|
||||
if receiver == nil {
|
||||
return audio.SilentAudioLevel, false
|
||||
}
|
||||
|
||||
return receiver.GetAudioLevel()
|
||||
}
|
||||
|
||||
// handles max loss for audio streams
|
||||
func (t *MediaTrackReceiver) handleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) {
|
||||
t.downFracLostLock.Lock()
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/twcc"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
@@ -689,7 +690,7 @@ func (p *ParticipantImpl) ICERestart() error {
|
||||
//
|
||||
|
||||
func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) {
|
||||
level = SilentAudioLevel
|
||||
level = audio.SilentAudioLevel
|
||||
for _, pt := range p.GetPublishedTracks() {
|
||||
mediaTrack := pt.(types.LocalMediaTrack)
|
||||
if mediaTrack.Source() == livekit.TrackSource_MICROPHONE {
|
||||
@@ -1466,9 +1467,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
|
||||
if p.twcc == nil {
|
||||
p.twcc = twcc.NewTransportWideCCResponder(ssrc)
|
||||
p.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
|
||||
if err := p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt}); err != nil {
|
||||
p.params.Logger.Errorw("could not write RTCP to participant", err)
|
||||
}
|
||||
p.rtcpCh <- []rtcp.Packet{&pkt}
|
||||
})
|
||||
}
|
||||
p.pendingTracksLock.Unlock()
|
||||
|
||||
+3
-2
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
@@ -130,7 +131,7 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo {
|
||||
}
|
||||
speakers = append(speakers, &livekit.SpeakerInfo{
|
||||
Sid: string(p.ID()),
|
||||
Level: ConvertAudioLevel(level),
|
||||
Level: audio.ConvertAudioLevel(level),
|
||||
Active: active,
|
||||
})
|
||||
}
|
||||
@@ -768,7 +769,7 @@ func (r *Room) audioUpdateWorker() {
|
||||
smoothValues = make(map[livekit.ParticipantID]float32)
|
||||
// exponential moving average (EMA), same center of mass with simple moving average (SMA)
|
||||
smoothFactor = 2 / float32(ss+1)
|
||||
activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel)
|
||||
activeThreshold = audio.ConvertAudioLevel(r.audioConfig.ActiveLevel)
|
||||
}
|
||||
|
||||
lastActiveMap := make(map[livekit.ParticipantID]*livekit.SpeakerInfo)
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes"
|
||||
"github.com/livekit/livekit-server/pkg/testutils"
|
||||
@@ -360,7 +361,7 @@ func TestActiveSpeakers(t *testing.T) {
|
||||
p := participants[0].(*typesfakes.FakeLocalParticipant)
|
||||
op := participants[1].(*typesfakes.FakeLocalParticipant)
|
||||
p.GetAudioLevelReturns(30, true)
|
||||
convertedLevel := rtc.ConvertAudioLevel(30)
|
||||
convertedLevel := audio.ConvertAudioLevel(30)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
updates := getActiveSpeakerUpdates(op)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package rtc
|
||||
package audio
|
||||
|
||||
import (
|
||||
"math"
|
||||
@@ -11,28 +11,33 @@ const (
|
||||
SilentAudioLevel = 127
|
||||
)
|
||||
|
||||
type AudioLevelParams struct {
|
||||
ActiveLevel uint8
|
||||
MinPercentile uint8
|
||||
ObserveDuration uint32
|
||||
}
|
||||
|
||||
// keeps track of audio level for a participant
|
||||
type AudioLevel struct {
|
||||
levelThreshold uint8
|
||||
currentLevel *atomic.Uint32
|
||||
params AudioLevelParams
|
||||
|
||||
currentLevel *atomic.Uint32
|
||||
// min duration to be considered active
|
||||
minActiveDuration uint32
|
||||
|
||||
// for Observe goroutine use
|
||||
// keeps track of current activity
|
||||
observeLevel uint8
|
||||
activeDuration uint32 // ms
|
||||
observedDuration uint32 // ms
|
||||
durationToObserve uint32 // ms
|
||||
observeLevel uint8
|
||||
activeDuration uint32 // ms
|
||||
observedDuration uint32 // ms
|
||||
}
|
||||
|
||||
func NewAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint32) *AudioLevel {
|
||||
func NewAudioLevel(params AudioLevelParams) *AudioLevel {
|
||||
l := &AudioLevel{
|
||||
levelThreshold: activeLevel,
|
||||
minActiveDuration: uint32(minPercentile) * observeDuration / 100,
|
||||
params: params,
|
||||
minActiveDuration: uint32(params.MinPercentile) * params.ObserveDuration / 100,
|
||||
currentLevel: atomic.NewUint32(SilentAudioLevel),
|
||||
observeLevel: SilentAudioLevel,
|
||||
durationToObserve: observeDuration,
|
||||
}
|
||||
return l
|
||||
}
|
||||
@@ -41,17 +46,17 @@ func NewAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint3
|
||||
func (l *AudioLevel) Observe(level uint8, durationMs uint32) {
|
||||
l.observedDuration += durationMs
|
||||
|
||||
if level <= l.levelThreshold {
|
||||
if level <= l.params.ActiveLevel {
|
||||
l.activeDuration += durationMs
|
||||
if l.observeLevel > level {
|
||||
l.observeLevel = level
|
||||
}
|
||||
}
|
||||
|
||||
if l.observedDuration >= l.durationToObserve {
|
||||
if l.observedDuration >= l.params.ObserveDuration {
|
||||
// compute and reset
|
||||
if l.activeDuration >= l.minActiveDuration {
|
||||
level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeDuration)/float64(l.durationToObserve)))
|
||||
level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeDuration)/float64(l.params.ObserveDuration)))
|
||||
l.currentLevel.Store(level)
|
||||
} else {
|
||||
l.currentLevel.Store(SilentAudioLevel)
|
||||
@@ -1,11 +1,9 @@
|
||||
package rtc_test
|
||||
package audio
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -18,7 +16,7 @@ const (
|
||||
|
||||
func TestAudioLevel(t *testing.T) {
|
||||
t.Run("initially to return not noisy, within a few samples", func(t *testing.T) {
|
||||
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
_, noisy := a.GetLevel()
|
||||
require.False(t, noisy)
|
||||
|
||||
@@ -28,7 +26,7 @@ func TestAudioLevel(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("not noisy when all samples are below threshold", func(t *testing.T) {
|
||||
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
|
||||
observeSamples(a, 35, 100)
|
||||
_, noisy := a.GetLevel()
|
||||
@@ -36,7 +34,7 @@ func TestAudioLevel(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("not noisy when less than percentile samples are above threshold", func(t *testing.T) {
|
||||
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
|
||||
observeSamples(a, 35, samplesPerBatch-2)
|
||||
observeSamples(a, 25, 1)
|
||||
@@ -47,7 +45,7 @@ func TestAudioLevel(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("noisy when higher than percentile samples are above threshold", func(t *testing.T) {
|
||||
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
|
||||
|
||||
observeSamples(a, 35, samplesPerBatch-16)
|
||||
observeSamples(a, 25, 8)
|
||||
@@ -60,7 +58,15 @@ func TestAudioLevel(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func observeSamples(a *rtc.AudioLevel, level uint8, count int) {
|
||||
func createAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration uint32) *AudioLevel {
|
||||
return NewAudioLevel(AudioLevelParams{
|
||||
ActiveLevel: activeLevel,
|
||||
MinPercentile: minPercentile,
|
||||
ObserveDuration: observeDuration,
|
||||
})
|
||||
}
|
||||
|
||||
func observeSamples(a *AudioLevel, level uint8, count int) {
|
||||
for i := 0; i < count; i++ {
|
||||
a.Observe(level, 20)
|
||||
}
|
||||
+57
-97
@@ -18,6 +18,8 @@ import (
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/twcc"
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
)
|
||||
|
||||
@@ -25,17 +27,6 @@ const (
|
||||
ReportDelta = 1e9
|
||||
)
|
||||
|
||||
type twccMetadata struct {
|
||||
sn uint16
|
||||
arrivalTime int64
|
||||
marker bool
|
||||
}
|
||||
|
||||
type audioLevelMetadata struct {
|
||||
level uint8
|
||||
duration uint32
|
||||
}
|
||||
|
||||
type pendingPacket struct {
|
||||
arrivalTime int64
|
||||
packet []byte
|
||||
@@ -55,33 +46,30 @@ type ExtPacket struct {
|
||||
// Buffer contains all packets
|
||||
type Buffer struct {
|
||||
sync.RWMutex
|
||||
bucket *Bucket
|
||||
nacker *NackQueue
|
||||
videoPool *sync.Pool
|
||||
audioPool *sync.Pool
|
||||
codecType webrtc.RTPCodecType
|
||||
extPackets deque.Deque
|
||||
pPackets []pendingPacket
|
||||
closeOnce sync.Once
|
||||
mediaSSRC uint32
|
||||
clockRate uint32
|
||||
lastReport int64
|
||||
twccExt uint8
|
||||
audioExt uint8
|
||||
bound bool
|
||||
closed atomic.Bool
|
||||
mime string
|
||||
bucket *Bucket
|
||||
nacker *NackQueue
|
||||
videoPool *sync.Pool
|
||||
audioPool *sync.Pool
|
||||
codecType webrtc.RTPCodecType
|
||||
extPackets deque.Deque
|
||||
pPackets []pendingPacket
|
||||
closeOnce sync.Once
|
||||
mediaSSRC uint32
|
||||
clockRate uint32
|
||||
lastReport int64
|
||||
twccExt uint8
|
||||
audioLevelExt uint8
|
||||
bound bool
|
||||
closed atomic.Bool
|
||||
mime string
|
||||
|
||||
// supported feedbacks
|
||||
remb bool
|
||||
nack bool
|
||||
twcc bool
|
||||
audioLevel bool
|
||||
latestTSForAudioLevelInitialized bool
|
||||
latestTSForAudioLevel uint32
|
||||
|
||||
twccPending []twccMetadata
|
||||
audioLevelPending []audioLevelMetadata
|
||||
twcc *twcc.Responder
|
||||
audioLevelParams audio.AudioLevelParams
|
||||
audioLevel *audio.AudioLevel
|
||||
|
||||
lastPacketRead int
|
||||
|
||||
@@ -95,10 +83,8 @@ type Buffer struct {
|
||||
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
|
||||
|
||||
// callbacks
|
||||
onClose func()
|
||||
onAudioLevel func(level uint8, durationMs uint32)
|
||||
feedbackCB func([]rtcp.Packet)
|
||||
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
|
||||
onClose func()
|
||||
feedbackCB func([]rtcp.Packet)
|
||||
|
||||
callbacksQueue *utils.OpsQueue
|
||||
|
||||
@@ -137,6 +123,20 @@ func (b *Buffer) SetLogger(logger logger.Logger) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) SetTWCC(twcc *twcc.Responder) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
b.twcc = twcc
|
||||
}
|
||||
|
||||
func (b *Buffer) SetAudioLevelParams(audioLevelParams audio.AudioLevelParams) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
b.audioLevelParams = audioLevelParams
|
||||
}
|
||||
|
||||
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
@@ -169,35 +169,31 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
|
||||
b.codecType = webrtc.RTPCodecType(0)
|
||||
}
|
||||
|
||||
for _, ext := range params.HeaderExtensions {
|
||||
if ext.URI == sdp.TransportCCURI {
|
||||
b.twccExt = uint8(ext.ID)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if b.codecType == webrtc.RTPCodecTypeVideo {
|
||||
for _, fb := range codec.RTCPFeedback {
|
||||
switch fb.Type {
|
||||
case webrtc.TypeRTCPFBGoogREMB:
|
||||
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB)
|
||||
b.logger.Warnw("REMB not supported, RTCP feedback will not be generated", nil)
|
||||
b.remb = true
|
||||
case webrtc.TypeRTCPFBTransportCC:
|
||||
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC)
|
||||
b.twcc = true
|
||||
for _, ext := range params.HeaderExtensions {
|
||||
if ext.URI == sdp.TransportCCURI {
|
||||
b.twccExt = uint8(ext.ID)
|
||||
break
|
||||
}
|
||||
}
|
||||
case webrtc.TypeRTCPFBNACK:
|
||||
b.logger.Debugw("Setting feedback", "type", webrtc.TypeRTCPFBNACK)
|
||||
b.nacker = NewNACKQueue()
|
||||
b.nacker.SetRTT(70) // default till it is updated
|
||||
b.nack = true
|
||||
}
|
||||
}
|
||||
} else if b.codecType == webrtc.RTPCodecTypeAudio {
|
||||
for _, h := range params.HeaderExtensions {
|
||||
if h.URI == sdp.AudioLevelURI {
|
||||
b.audioLevel = true
|
||||
b.audioExt = uint8(h.ID)
|
||||
b.audioLevelExt = uint8(h.ID)
|
||||
b.audioLevel = audio.NewAudioLevel(b.audioLevelParams)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -408,37 +404,24 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowStat
|
||||
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
|
||||
// submit to TWCC even if it is a padding only packet. Clients use padding only packets as probes
|
||||
// for bandwidth estimation
|
||||
if b.twcc {
|
||||
if ext := p.GetExtension(b.twccExt); len(ext) > 1 {
|
||||
b.twccPending = append(b.twccPending, twccMetadata{
|
||||
sn: binary.BigEndian.Uint16(ext[0:2]),
|
||||
arrivalTime: arrivalTime,
|
||||
marker: p.Marker,
|
||||
})
|
||||
if len(b.twccPending) == 1 {
|
||||
b.callbacksQueue.Enqueue(b.processTWCCPending)
|
||||
}
|
||||
if b.twcc != nil && b.twccExt != 0 {
|
||||
if ext := p.GetExtension(b.twccExt); ext != nil {
|
||||
b.twcc.Push(binary.BigEndian.Uint16(ext[0:2]), arrivalTime, p.Marker)
|
||||
}
|
||||
}
|
||||
|
||||
if b.audioLevel {
|
||||
if b.audioLevelExt != 0 {
|
||||
if !b.latestTSForAudioLevelInitialized {
|
||||
b.latestTSForAudioLevelInitialized = true
|
||||
b.latestTSForAudioLevel = p.Timestamp
|
||||
}
|
||||
if e := p.GetExtension(b.audioExt); e != nil && b.onAudioLevel != nil {
|
||||
if e := p.GetExtension(b.audioLevelExt); e != nil {
|
||||
ext := rtp.AudioLevelExtension{}
|
||||
if err := ext.Unmarshal(e); err == nil {
|
||||
if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) {
|
||||
duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate)
|
||||
if duration > 0 {
|
||||
b.audioLevelPending = append(b.audioLevelPending, audioLevelMetadata{
|
||||
level: ext.Level,
|
||||
duration: uint32(duration),
|
||||
})
|
||||
if len(b.audioLevelPending) == 1 {
|
||||
b.callbacksQueue.Enqueue(b.processAudioLevelPending)
|
||||
}
|
||||
b.audioLevel.Observe(ext.Level, uint32(duration))
|
||||
}
|
||||
|
||||
b.latestTSForAudioLevel = p.Timestamp
|
||||
@@ -583,18 +566,10 @@ func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) {
|
||||
return b.bucket.GetPacket(buff, sn)
|
||||
}
|
||||
|
||||
func (b *Buffer) OnTransportWideCC(fn func(sn uint16, timeNS int64, marker bool)) {
|
||||
b.feedbackTWCC = fn
|
||||
}
|
||||
|
||||
func (b *Buffer) OnFeedback(fn func(fb []rtcp.Packet)) {
|
||||
b.feedbackCB = fn
|
||||
}
|
||||
|
||||
func (b *Buffer) OnAudioLevel(fn func(level uint8, durationMs uint32)) {
|
||||
b.onAudioLevel = fn
|
||||
}
|
||||
|
||||
// GetMediaSSRC returns the associated SSRC of the RTP stream
|
||||
func (b *Buffer) GetMediaSSRC() uint32 {
|
||||
return b.mediaSSRC
|
||||
@@ -653,28 +628,13 @@ func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) processTWCCPending() {
|
||||
b.Lock()
|
||||
pending := b.twccPending
|
||||
b.twccPending = nil
|
||||
b.Unlock()
|
||||
func (b *Buffer) GetAudioLevel() (uint8, bool) {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
if b.feedbackTWCC != nil {
|
||||
for _, p := range pending {
|
||||
b.feedbackTWCC(p.sn, p.arrivalTime, p.marker)
|
||||
}
|
||||
if b.audioLevel == nil {
|
||||
return audio.SilentAudioLevel, false
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) processAudioLevelPending() {
|
||||
b.Lock()
|
||||
pending := b.audioLevelPending
|
||||
b.audioLevelPending = nil
|
||||
b.Unlock()
|
||||
|
||||
if b.onAudioLevel != nil {
|
||||
for _, p := range pending {
|
||||
b.onAudioLevel(p.level, p.duration)
|
||||
}
|
||||
}
|
||||
return b.audioLevel.GetLevel()
|
||||
}
|
||||
|
||||
+44
-2
@@ -17,8 +17,10 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/twcc"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -38,6 +40,8 @@ type TrackReceiver interface {
|
||||
ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
|
||||
GetBitrateTemporalCumulative() Bitrates
|
||||
|
||||
GetAudioLevel() (uint8, bool)
|
||||
|
||||
SendPLI(layer int32)
|
||||
|
||||
SetUpTrackPaused(paused bool)
|
||||
@@ -54,6 +58,7 @@ type WebRTCReceiver struct {
|
||||
logger logger.Logger
|
||||
|
||||
pliThrottleConfig config.PLIThrottleConfig
|
||||
audioConfig config.AudioConfig
|
||||
|
||||
peerID livekit.ParticipantID
|
||||
trackID livekit.TrackID
|
||||
@@ -69,6 +74,8 @@ type WebRTCReceiver struct {
|
||||
|
||||
rtcpCh chan []rtcp.Packet
|
||||
|
||||
twcc *twcc.Responder
|
||||
|
||||
bufferMu sync.RWMutex
|
||||
buffers [DefaultMaxLayerSpatial + 1]*buffer.Buffer
|
||||
rtt uint32
|
||||
@@ -104,14 +111,22 @@ func RidToLayer(rid string) int32 {
|
||||
|
||||
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
|
||||
|
||||
// WithPliThrottle indicates minimum time(ms) between sending PLIs
|
||||
func WithPliThrottle(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts {
|
||||
// WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
|
||||
func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts {
|
||||
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
||||
w.pliThrottleConfig = pliThrottleConfig
|
||||
return w
|
||||
}
|
||||
}
|
||||
|
||||
// WithAudioConfig sets up parameters for active speaker detection
|
||||
func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts {
|
||||
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
||||
w.audioConfig = audioConfig
|
||||
return w
|
||||
}
|
||||
}
|
||||
|
||||
// WithStreamTrackers enables StreamTracker use for simulcast
|
||||
func WithStreamTrackers() ReceiverOpts {
|
||||
return func(w *WebRTCReceiver) *WebRTCReceiver {
|
||||
@@ -139,6 +154,7 @@ func NewWebRTCReceiver(
|
||||
pid livekit.ParticipantID,
|
||||
source livekit.TrackSource,
|
||||
logger logger.Logger,
|
||||
twcc *twcc.Responder,
|
||||
opts ...ReceiverOpts,
|
||||
) *WebRTCReceiver {
|
||||
w := &WebRTCReceiver{
|
||||
@@ -151,6 +167,7 @@ func NewWebRTCReceiver(
|
||||
kind: track.Kind(),
|
||||
// LK-TODO: this should be based on VideoLayers protocol message rather than RID based
|
||||
isSimulcast: len(track.RID()) > 0,
|
||||
twcc: twcc,
|
||||
downTracks: make([]TrackSender, 0),
|
||||
index: make(map[livekit.ParticipantID]int),
|
||||
free: make(map[int]struct{}),
|
||||
@@ -253,6 +270,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
|
||||
|
||||
layer := RidToLayer(track.RID())
|
||||
buff.SetLogger(logger.Logger(logr.Logger(w.logger).WithValues("layer", layer)))
|
||||
buff.SetTWCC(w.twcc)
|
||||
buff.SetAudioLevelParams(audio.AudioLevelParams{
|
||||
ActiveLevel: w.audioConfig.ActiveLevel,
|
||||
MinPercentile: w.audioConfig.MinPercentile,
|
||||
ObserveDuration: w.audioConfig.UpdateInterval,
|
||||
})
|
||||
buff.OnFeedback(w.sendRTCP)
|
||||
|
||||
var duration time.Duration
|
||||
@@ -427,6 +450,25 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
|
||||
return buffer.AggregateRTPStats(stats)
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) GetAudioLevel() (uint8, bool) {
|
||||
if w.Kind() == webrtc.RTPCodecTypeVideo {
|
||||
return audio.SilentAudioLevel, false
|
||||
}
|
||||
|
||||
w.bufferMu.RLock()
|
||||
defer w.bufferMu.RUnlock()
|
||||
|
||||
for _, buff := range w.buffers {
|
||||
if buff == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return buff.GetAudioLevel()
|
||||
}
|
||||
|
||||
return audio.SilentAudioLevel, false
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) getQualityParams() *buffer.ConnectionQualityParams {
|
||||
w.bufferMu.RLock()
|
||||
defer w.bufferMu.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user