diff --git a/config-sample.yaml b/config-sample.yaml index ab97753a4..17337d0cc 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -76,8 +76,8 @@ keys: # customize audio level sensitivity #audio: # # minimum level to be considered active, 0-127, where 0 is loudest -# # defaults to 40 -# active_level: 40 +# # defaults to 30 +# active_level: 30 # # percentile to measure, a participant is considered active if it has exceeded the # # ActiveLevel more than MinPercentile% of the time # # defaults to 40 @@ -85,7 +85,7 @@ keys: # # frequency in ms to notify changes to clients, defaults to 500 # update_interval: 500 # # to prevent speaker updates from too jumpy, smooth out values over N samples -# smooth_samples: 8 +# smooth_intervals: 4 # turn server #turn: diff --git a/pkg/config/config.go b/pkg/config/config.go index 5904022da..4d8352f0b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -60,8 +60,8 @@ type AudioConfig struct { // interval to update clients, in ms UpdateInterval uint32 `yaml:"update_interval"` // smoothing for audioLevel values sent to the client. - // audioLevel will be an average of `smooth_samples`, 0 to disable - SmoothSamples uint32 `yaml:"smooth_samples"` + // audioLevel will be an average of `smooth_intervals`, 0 to disable + SmoothIntervals uint32 `yaml:"smooth_intervals"` } type RedisConfig struct { @@ -113,10 +113,10 @@ func NewConfig(confString string) (*Config, error) { }, }, Audio: AudioConfig{ - ActiveLevel: 40, - MinPercentile: 40, - UpdateInterval: 500, - SmoothSamples: 8, + ActiveLevel: 30, // -30dBov = 0.03 + MinPercentile: 40, + UpdateInterval: 500, + SmoothIntervals: 4, }, Redis: RedisConfig{}, Room: RoomConfig{ diff --git a/pkg/rtc/audiolevel.go b/pkg/rtc/audiolevel.go index 18c37e8d1..bb8f5acae 100644 --- a/pkg/rtc/audiolevel.go +++ b/pkg/rtc/audiolevel.go @@ -1,70 +1,75 @@ package rtc import ( + "math" "sync/atomic" ) const ( - // number of samples to compute moving average over - averageSamples = 30 - silentAudioLevel = uint8(127) + // number of audio frames for observe window + observeFrames = 25 // webrtc default opus frame size 20ms, 25*20=500ms matching default UpdateInterval + silentAudioLevel = 127 ) // keeps track of audio level for a participant type AudioLevel struct { levelThreshold uint8 - minPercentile uint8 - currentLevel atomic.Value - // min samples to be considered active - minSamples uint32 + currentLevel uint32 + // min frames to be considered active + minActiveFrames uint32 // for Observe goroutine use - // keeps track of current running average - sum uint32 - activeSamples uint32 - numSamples uint32 + // keeps track of current activity + observeLevel uint8 + activeFrames uint32 + numFrames uint32 } func NewAudioLevel(activeLevel uint8, minPercentile uint8) *AudioLevel { l := &AudioLevel{ - levelThreshold: activeLevel, - minPercentile: minPercentile, - minSamples: uint32(minPercentile) * averageSamples / 100, + levelThreshold: activeLevel, + minActiveFrames: uint32(minPercentile) * observeFrames / 100, + currentLevel: silentAudioLevel, + observeLevel: silentAudioLevel, } - l.currentLevel.Store(silentAudioLevel) return l } -// Observes a new sample, must be called from the same thread +// Observes a new frame, must be called from the same thread func (l *AudioLevel) Observe(level uint8) { - l.numSamples++ + l.numFrames++ if level <= l.levelThreshold { - l.activeSamples++ - l.sum += uint32(level) + l.activeFrames++ + if l.observeLevel > level { + l.observeLevel = level + } } - if l.numSamples >= averageSamples { + if l.numFrames >= observeFrames { // compute and reset - if l.activeSamples >= l.minSamples { - l.currentLevel.Store(uint8(l.sum / l.activeSamples)) + if l.activeFrames >= l.minActiveFrames { + const invObserveFrames = 1.0 / observeFrames + level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeFrames)*invObserveFrames)) + atomic.StoreUint32(&l.currentLevel, level) } else { - // nil to represent not noisy - l.currentLevel.Store(silentAudioLevel) + atomic.StoreUint32(&l.currentLevel, silentAudioLevel) } - l.sum = 0 - l.activeSamples = 0 - l.numSamples = 0 + l.observeLevel = silentAudioLevel + l.activeFrames = 0 + l.numFrames = 0 } } // returns current audio level, 0 (loudest) to 127 (silent) -func (l *AudioLevel) GetLevel() (level uint8, noisy bool) { - level = l.currentLevel.Load().(uint8) - noisy = level != silentAudioLevel - return +func (l *AudioLevel) GetLevel() (uint8, bool) { + level := uint8(atomic.LoadUint32(&l.currentLevel)) + active := level != silentAudioLevel + return level, active } +// convert decibel back to linear func ConvertAudioLevel(level uint8) float32 { - return (127 - float32(level)) / 127 + const negInv20 = -1.0 / 20 + return float32(math.Pow(10, float64(level)*negInv20)) } diff --git a/pkg/rtc/audiolevel_test.go b/pkg/rtc/audiolevel_test.go index c130616ca..a2d7a1b2f 100644 --- a/pkg/rtc/audiolevel_test.go +++ b/pkg/rtc/audiolevel_test.go @@ -8,7 +8,7 @@ import ( ) const ( - samplesPerBatch = 30 + samplesPerBatch = 25 defaultActiveLevel = 30 // requires two noisy samples to count defaultPercentile = 10 @@ -36,7 +36,7 @@ func TestAudioLevel(t *testing.T) { t.Run("not noisy when less than percentile samples are above threshold", func(t *testing.T) { a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile) - observeSamples(a, 35, samplesPerBatch-1) + observeSamples(a, 35, samplesPerBatch-2) observeSamples(a, 25, 1) observeSamples(a, 35, 1) @@ -47,9 +47,9 @@ func TestAudioLevel(t *testing.T) { t.Run("noisy when higher than percentile samples are above threshold", func(t *testing.T) { a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile) - observeSamples(a, 35, samplesPerBatch-4) - observeSamples(a, 25, 2) - observeSamples(a, 29, 2) + observeSamples(a, 35, samplesPerBatch-16) + observeSamples(a, 25, 8) + observeSamples(a, 29, 8) level, noisy := a.GetLevel() require.True(t, noisy) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7c2f486b2..53ef81aae 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -553,7 +553,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) { } } -func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) { +func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) { p.lock.RLock() defer p.lock.RUnlock() level = silentAudioLevel @@ -562,9 +562,9 @@ func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) { if mt.audioLevel == nil { continue } - tl, tn := mt.audioLevel.GetLevel() - if tn { - noisy = true + tl, ta := mt.audioLevel.GetLevel() + if ta { + active = true if tl < level { level = tl } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 1e5ac8ec5..95307b96d 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -50,7 +50,7 @@ func TestIsReady(t *testing.T) { func TestICEStateChange(t *testing.T) { t.Run("onClose gets called when ICE disconnected", func(t *testing.T) { p := newParticipantForTest("test") - closeChan := make(chan bool, 1) + closeChan := make(chan struct{}) p.onClose = func(participant types.Participant) { close(closeChan) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 7355d94e2..42a8a1c5f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1,6 +1,7 @@ package rtc import ( + "math" "sort" "sync" "sync/atomic" @@ -20,6 +21,7 @@ import ( const ( DefaultEmptyTimeout = 5 * 60 // 5m DefaultRoomDepartureGrace = 20 + AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal ) type Room struct { @@ -39,8 +41,7 @@ type Room struct { isClosed utils.AtomicFlag // for active speaker updates - audioConfig *config.AudioConfig - lastActiveSpeakers []*livekit.SpeakerInfo + audioConfig *config.AudioConfig statsReporter *RoomStatsReporter @@ -507,52 +508,65 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { } func (r *Room) audioUpdateWorker() { - smoothValues := make(map[string]float32) - smoothSamples := float32(r.audioConfig.SmoothSamples) - activeThreshold := float32(0.5) + var smoothValues map[string]float32 + var smoothFactor float32 + var activeThreshold float32 + if ss := r.audioConfig.SmoothIntervals; ss > 1 { + smoothValues = make(map[string]float32) + // exponential moving average (EMA), same center of mass with simple moving average (SMA) + smoothFactor = 2 / float32(ss+1) + activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel) + } + var lastActiveSpeakers []*livekit.SpeakerInfo for { if r.isClosed.Get() { return } speakers := r.GetActiveSpeakers() - if smoothSamples > 1 { - seenSids := make(map[string]bool) + if smoothValues != nil { for _, speaker := range speakers { - smoothValues[speaker.Sid] += (speaker.Level - smoothValues[speaker.Sid]) / smoothSamples - speaker.Level = smoothValues[speaker.Sid] - seenSids[speaker.Sid] = true + sid := speaker.Sid + level := smoothValues[sid] + delete(smoothValues, sid) + // exponential moving average (EMA) + level += (speaker.Level - level) * smoothFactor + speaker.Level = level } // ensure that previous active speakers are also included for sid, level := range smoothValues { - if seenSids[sid] { - continue - } - level += (0 - level) / smoothSamples - smoothValues[sid] = level - + delete(smoothValues, sid) + level += -level * smoothFactor if level > activeThreshold { speakers = append(speakers, &livekit.SpeakerInfo{ Sid: sid, Level: level, Active: true, }) - } else { - delete(smoothValues, sid) } } + // smoothValues map is drained, now repopulate it back + for _, speaker := range speakers { + smoothValues[speaker.Sid] = speaker.Level + } + sort.Slice(speakers, func(i, j int) bool { return speakers[i].Level > speakers[j].Level }) } + const invAudioLevelQuantization = 1.0 / AudioLevelQuantization + for _, speaker := range speakers { + speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization) + } + // see if an update is needed - if len(speakers) == len(r.lastActiveSpeakers) { + if len(speakers) == len(lastActiveSpeakers) { for i, speaker := range speakers { - if speaker.Sid != r.lastActiveSpeakers[i].Sid || speaker.Level != r.lastActiveSpeakers[i].Level { + if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid { r.sendSpeakerUpdates(speakers) break } @@ -561,7 +575,7 @@ func (r *Room) audioUpdateWorker() { r.sendSpeakerUpdates(speakers) } - r.lastActiveSpeakers = speakers + lastActiveSpeakers = speakers time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond) } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 0abbc0dae..5cd3448fb 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -338,7 +338,7 @@ func TestActiveSpeakers(t *testing.T) { }) t.Run("audio level is smoothed", func(t *testing.T) { - rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothSamples: 3}) + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothIntervals: 3}) defer rm.Close() participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeParticipant) @@ -355,7 +355,7 @@ func TestActiveSpeakers(t *testing.T) { if len(lastSpeakers) == 0 { return false } - if lastSpeakers[0].Level < convertedLevel/2 { + if lastSpeakers[0].Level > convertedLevel { return true } return false @@ -370,7 +370,7 @@ func TestActiveSpeakers(t *testing.T) { if len(lastSpeakers) == 0 { return false } - if lastSpeakers[0].Level > convertedLevel*0.99 { + if lastSpeakers[0].Level > convertedLevel { return true } return false @@ -456,9 +456,9 @@ func TestDataChannel(t *testing.T) { } type testRoomOpts struct { - num int - protocol types.ProtocolVersion - audioSmoothSamples uint32 + num int + protocol types.ProtocolVersion + audioSmoothIntervals uint32 } func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { @@ -473,8 +473,8 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { }, }, &config.AudioConfig{ - UpdateInterval: audioUpdateInterval, - SmoothSamples: opts.audioSmoothSamples, + UpdateInterval: audioUpdateInterval, + SmoothIntervals: opts.audioSmoothIntervals, }, ) for i := 0; i < opts.num; i++ { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 301bf515c..954703a1f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -51,7 +51,7 @@ type Participant interface { SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error SetTrackMuted(trackId string, muted bool) - GetAudioLevel() (level uint8, noisy bool) + GetAudioLevel() (level uint8, active bool) // permissions diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ea633010f..d31ad778d 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -110,7 +110,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - done := make(chan bool, 1) + done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { logger.Infow("WS connection closed", "participant", pi.Identity, "connectionId", connId) diff --git a/pkg/service/server.go b/pkg/service/server.go index 317e80f10..eaeebc086 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -34,7 +34,7 @@ type LivekitServer struct { turnServer *turn.Server currentNode routing.LocalNode running utils.AtomicFlag - doneChan chan bool + doneChan chan struct{} } func NewLivekitServer(conf *config.Config, @@ -126,7 +126,7 @@ func (s *LivekitServer) Start() error { return err } - s.doneChan = make(chan bool, 1) + s.doneChan = make(chan struct{}) // ensure we could listen ln, err := net.Listen("tcp", s.httpServer.Addr) diff --git a/test/client/trackwriter.go b/test/client/trackwriter.go index 04d0fe7db..865421f6f 100644 --- a/test/client/trackwriter.go +++ b/test/client/trackwriter.go @@ -23,7 +23,6 @@ type TrackWriter struct { track *webrtc.TrackLocalStaticSample filePath string mime string - done chan bool ogg *oggreader.OggReader ivfheader *ivfreader.IVFFileHeader @@ -39,7 +38,6 @@ func NewTrackWriter(ctx context.Context, track *webrtc.TrackLocalStaticSample, f track: track, filePath: filePath, mime: track.Codec().MimeType, - done: make(chan bool), } }