From dcfe7eaf4f7f0961eb057d9ebb3c242ae41224f9 Mon Sep 17 00:00:00 2001 From: hn8 <10730886+hn8@users.noreply.github.com> Date: Sat, 10 Jul 2021 00:46:24 +0800 Subject: [PATCH] Enhancement: audio speakers (#44) * refactor: active speakers 1. Observe the loudest adjusted with active ratio instead of linear average of decibel values 2. Follow RFC6465 to convert audio level from decibel to linear value. 3. Quantize audio level for stable slice comparison 4. Switch moving average algorithm from MMA to EMA to have the same center of mass with SMA 5. Minor: remove seenSids map allocation 6. Minor: minimize division arithmetic * Update pkg/rtc/audiolevel.go Co-authored-by: David Zhao --- config-sample.yaml | 6 +-- pkg/config/config.go | 12 ++--- pkg/rtc/audiolevel.go | 69 +++++++++++++++------------- pkg/rtc/audiolevel_test.go | 10 ++-- pkg/rtc/participant.go | 8 ++-- pkg/rtc/participant_internal_test.go | 2 +- pkg/rtc/room.go | 56 +++++++++++++--------- pkg/rtc/room_test.go | 16 +++---- pkg/rtc/types/interfaces.go | 2 +- pkg/service/rtcservice.go | 2 +- pkg/service/server.go | 4 +- test/client/trackwriter.go | 2 - 12 files changed, 103 insertions(+), 86 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index ab97753a4..17337d0cc 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -76,8 +76,8 @@ keys: # customize audio level sensitivity #audio: # # minimum level to be considered active, 0-127, where 0 is loudest -# # defaults to 40 -# active_level: 40 +# # defaults to 30 +# active_level: 30 # # percentile to measure, a participant is considered active if it has exceeded the # # ActiveLevel more than MinPercentile% of the time # # defaults to 40 @@ -85,7 +85,7 @@ keys: # # frequency in ms to notify changes to clients, defaults to 500 # update_interval: 500 # # to prevent speaker updates from too jumpy, smooth out values over N samples -# smooth_samples: 8 +# smooth_intervals: 4 # turn server #turn: diff --git a/pkg/config/config.go b/pkg/config/config.go index 5904022da..4d8352f0b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -60,8 +60,8 @@ type AudioConfig struct { // interval to update clients, in ms UpdateInterval uint32 `yaml:"update_interval"` // smoothing for audioLevel values sent to the client. - // audioLevel will be an average of `smooth_samples`, 0 to disable - SmoothSamples uint32 `yaml:"smooth_samples"` + // audioLevel will be an average of `smooth_intervals`, 0 to disable + SmoothIntervals uint32 `yaml:"smooth_intervals"` } type RedisConfig struct { @@ -113,10 +113,10 @@ func NewConfig(confString string) (*Config, error) { }, }, Audio: AudioConfig{ - ActiveLevel: 40, - MinPercentile: 40, - UpdateInterval: 500, - SmoothSamples: 8, + ActiveLevel: 30, // -30dBov = 0.03 + MinPercentile: 40, + UpdateInterval: 500, + SmoothIntervals: 4, }, Redis: RedisConfig{}, Room: RoomConfig{ diff --git a/pkg/rtc/audiolevel.go b/pkg/rtc/audiolevel.go index 18c37e8d1..bb8f5acae 100644 --- a/pkg/rtc/audiolevel.go +++ b/pkg/rtc/audiolevel.go @@ -1,70 +1,75 @@ package rtc import ( + "math" "sync/atomic" ) const ( - // number of samples to compute moving average over - averageSamples = 30 - silentAudioLevel = uint8(127) + // number of audio frames for observe window + observeFrames = 25 // webrtc default opus frame size 20ms, 25*20=500ms matching default UpdateInterval + silentAudioLevel = 127 ) // keeps track of audio level for a participant type AudioLevel struct { levelThreshold uint8 - minPercentile uint8 - currentLevel atomic.Value - // min samples to be considered active - minSamples uint32 + currentLevel uint32 + // min frames to be considered active + minActiveFrames uint32 // for Observe goroutine use - // keeps track of current running average - sum uint32 - activeSamples uint32 - numSamples uint32 + // keeps track of current activity + observeLevel uint8 + activeFrames uint32 + numFrames uint32 } func NewAudioLevel(activeLevel uint8, minPercentile uint8) *AudioLevel { l := &AudioLevel{ - levelThreshold: activeLevel, - minPercentile: minPercentile, - minSamples: uint32(minPercentile) * averageSamples / 100, + levelThreshold: activeLevel, + minActiveFrames: uint32(minPercentile) * observeFrames / 100, + currentLevel: silentAudioLevel, + observeLevel: silentAudioLevel, } - l.currentLevel.Store(silentAudioLevel) return l } -// Observes a new sample, must be called from the same thread +// Observes a new frame, must be called from the same thread func (l *AudioLevel) Observe(level uint8) { - l.numSamples++ + l.numFrames++ if level <= l.levelThreshold { - l.activeSamples++ - l.sum += uint32(level) + l.activeFrames++ + if l.observeLevel > level { + l.observeLevel = level + } } - if l.numSamples >= averageSamples { + if l.numFrames >= observeFrames { // compute and reset - if l.activeSamples >= l.minSamples { - l.currentLevel.Store(uint8(l.sum / l.activeSamples)) + if l.activeFrames >= l.minActiveFrames { + const invObserveFrames = 1.0 / observeFrames + level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeFrames)*invObserveFrames)) + atomic.StoreUint32(&l.currentLevel, level) } else { - // nil to represent not noisy - l.currentLevel.Store(silentAudioLevel) + atomic.StoreUint32(&l.currentLevel, silentAudioLevel) } - l.sum = 0 - l.activeSamples = 0 - l.numSamples = 0 + l.observeLevel = silentAudioLevel + l.activeFrames = 0 + l.numFrames = 0 } } // returns current audio level, 0 (loudest) to 127 (silent) -func (l *AudioLevel) GetLevel() (level uint8, noisy bool) { - level = l.currentLevel.Load().(uint8) - noisy = level != silentAudioLevel - return +func (l *AudioLevel) GetLevel() (uint8, bool) { + level := uint8(atomic.LoadUint32(&l.currentLevel)) + active := level != silentAudioLevel + return level, active } +// convert decibel back to linear func ConvertAudioLevel(level uint8) float32 { - return (127 - float32(level)) / 127 + const negInv20 = -1.0 / 20 + return float32(math.Pow(10, float64(level)*negInv20)) } diff --git a/pkg/rtc/audiolevel_test.go b/pkg/rtc/audiolevel_test.go index c130616ca..a2d7a1b2f 100644 --- a/pkg/rtc/audiolevel_test.go +++ b/pkg/rtc/audiolevel_test.go @@ -8,7 +8,7 @@ import ( ) const ( - samplesPerBatch = 30 + samplesPerBatch = 25 defaultActiveLevel = 30 // requires two noisy samples to count defaultPercentile = 10 @@ -36,7 +36,7 @@ func TestAudioLevel(t *testing.T) { t.Run("not noisy when less than percentile samples are above threshold", func(t *testing.T) { a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile) - observeSamples(a, 35, samplesPerBatch-1) + observeSamples(a, 35, samplesPerBatch-2) observeSamples(a, 25, 1) observeSamples(a, 35, 1) @@ -47,9 +47,9 @@ func TestAudioLevel(t *testing.T) { t.Run("noisy when higher than percentile samples are above threshold", func(t *testing.T) { a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile) - observeSamples(a, 35, samplesPerBatch-4) - observeSamples(a, 25, 2) - observeSamples(a, 29, 2) + observeSamples(a, 35, samplesPerBatch-16) + observeSamples(a, 25, 8) + observeSamples(a, 29, 8) level, noisy := a.GetLevel() require.True(t, noisy) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7c2f486b2..53ef81aae 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -553,7 +553,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) { } } -func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) { +func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) { p.lock.RLock() defer p.lock.RUnlock() level = silentAudioLevel @@ -562,9 +562,9 @@ func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) { if mt.audioLevel == nil { continue } - tl, tn := mt.audioLevel.GetLevel() - if tn { - noisy = true + tl, ta := mt.audioLevel.GetLevel() + if ta { + active = true if tl < level { level = tl } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 1e5ac8ec5..95307b96d 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -50,7 +50,7 @@ func TestIsReady(t *testing.T) { func TestICEStateChange(t *testing.T) { t.Run("onClose gets called when ICE disconnected", func(t *testing.T) { p := newParticipantForTest("test") - closeChan := make(chan bool, 1) + closeChan := make(chan struct{}) p.onClose = func(participant types.Participant) { close(closeChan) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 7355d94e2..42a8a1c5f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1,6 +1,7 @@ package rtc import ( + "math" "sort" "sync" "sync/atomic" @@ -20,6 +21,7 @@ import ( const ( DefaultEmptyTimeout = 5 * 60 // 5m DefaultRoomDepartureGrace = 20 + AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal ) type Room struct { @@ -39,8 +41,7 @@ type Room struct { isClosed utils.AtomicFlag // for active speaker updates - audioConfig *config.AudioConfig - lastActiveSpeakers []*livekit.SpeakerInfo + audioConfig *config.AudioConfig statsReporter *RoomStatsReporter @@ -507,52 +508,65 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { } func (r *Room) audioUpdateWorker() { - smoothValues := make(map[string]float32) - smoothSamples := float32(r.audioConfig.SmoothSamples) - activeThreshold := float32(0.5) + var smoothValues map[string]float32 + var smoothFactor float32 + var activeThreshold float32 + if ss := r.audioConfig.SmoothIntervals; ss > 1 { + smoothValues = make(map[string]float32) + // exponential moving average (EMA), same center of mass with simple moving average (SMA) + smoothFactor = 2 / float32(ss+1) + activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel) + } + var lastActiveSpeakers []*livekit.SpeakerInfo for { if r.isClosed.Get() { return } speakers := r.GetActiveSpeakers() - if smoothSamples > 1 { - seenSids := make(map[string]bool) + if smoothValues != nil { for _, speaker := range speakers { - smoothValues[speaker.Sid] += (speaker.Level - smoothValues[speaker.Sid]) / smoothSamples - speaker.Level = smoothValues[speaker.Sid] - seenSids[speaker.Sid] = true + sid := speaker.Sid + level := smoothValues[sid] + delete(smoothValues, sid) + // exponential moving average (EMA) + level += (speaker.Level - level) * smoothFactor + speaker.Level = level } // ensure that previous active speakers are also included for sid, level := range smoothValues { - if seenSids[sid] { - continue - } - level += (0 - level) / smoothSamples - smoothValues[sid] = level - + delete(smoothValues, sid) + level += -level * smoothFactor if level > activeThreshold { speakers = append(speakers, &livekit.SpeakerInfo{ Sid: sid, Level: level, Active: true, }) - } else { - delete(smoothValues, sid) } } + // smoothValues map is drained, now repopulate it back + for _, speaker := range speakers { + smoothValues[speaker.Sid] = speaker.Level + } + sort.Slice(speakers, func(i, j int) bool { return speakers[i].Level > speakers[j].Level }) } + const invAudioLevelQuantization = 1.0 / AudioLevelQuantization + for _, speaker := range speakers { + speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization) + } + // see if an update is needed - if len(speakers) == len(r.lastActiveSpeakers) { + if len(speakers) == len(lastActiveSpeakers) { for i, speaker := range speakers { - if speaker.Sid != r.lastActiveSpeakers[i].Sid || speaker.Level != r.lastActiveSpeakers[i].Level { + if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid { r.sendSpeakerUpdates(speakers) break } @@ -561,7 +575,7 @@ func (r *Room) audioUpdateWorker() { r.sendSpeakerUpdates(speakers) } - r.lastActiveSpeakers = speakers + lastActiveSpeakers = speakers time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond) } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 0abbc0dae..5cd3448fb 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -338,7 +338,7 @@ func TestActiveSpeakers(t *testing.T) { }) t.Run("audio level is smoothed", func(t *testing.T) { - rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothSamples: 3}) + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothIntervals: 3}) defer rm.Close() participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeParticipant) @@ -355,7 +355,7 @@ func TestActiveSpeakers(t *testing.T) { if len(lastSpeakers) == 0 { return false } - if lastSpeakers[0].Level < convertedLevel/2 { + if lastSpeakers[0].Level > convertedLevel { return true } return false @@ -370,7 +370,7 @@ func TestActiveSpeakers(t *testing.T) { if len(lastSpeakers) == 0 { return false } - if lastSpeakers[0].Level > convertedLevel*0.99 { + if lastSpeakers[0].Level > convertedLevel { return true } return false @@ -456,9 +456,9 @@ func TestDataChannel(t *testing.T) { } type testRoomOpts struct { - num int - protocol types.ProtocolVersion - audioSmoothSamples uint32 + num int + protocol types.ProtocolVersion + audioSmoothIntervals uint32 } func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { @@ -473,8 +473,8 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { }, }, &config.AudioConfig{ - UpdateInterval: audioUpdateInterval, - SmoothSamples: opts.audioSmoothSamples, + UpdateInterval: audioUpdateInterval, + SmoothIntervals: opts.audioSmoothIntervals, }, ) for i := 0; i < opts.num; i++ { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 301bf515c..954703a1f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -51,7 +51,7 @@ type Participant interface { SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error SetTrackMuted(trackId string, muted bool) - GetAudioLevel() (level uint8, noisy bool) + GetAudioLevel() (level uint8, active bool) // permissions diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ea633010f..d31ad778d 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -110,7 +110,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - done := make(chan bool, 1) + done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { logger.Infow("WS connection closed", "participant", pi.Identity, "connectionId", connId) diff --git a/pkg/service/server.go b/pkg/service/server.go index 317e80f10..eaeebc086 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -34,7 +34,7 @@ type LivekitServer struct { turnServer *turn.Server currentNode routing.LocalNode running utils.AtomicFlag - doneChan chan bool + doneChan chan struct{} } func NewLivekitServer(conf *config.Config, @@ -126,7 +126,7 @@ func (s *LivekitServer) Start() error { return err } - s.doneChan = make(chan bool, 1) + s.doneChan = make(chan struct{}) // ensure we could listen ln, err := net.Listen("tcp", s.httpServer.Addr) diff --git a/test/client/trackwriter.go b/test/client/trackwriter.go index 04d0fe7db..865421f6f 100644 --- a/test/client/trackwriter.go +++ b/test/client/trackwriter.go @@ -23,7 +23,6 @@ type TrackWriter struct { track *webrtc.TrackLocalStaticSample filePath string mime string - done chan bool ogg *oggreader.OggReader ivfheader *ivfreader.IVFFileHeader @@ -39,7 +38,6 @@ func NewTrackWriter(ctx context.Context, track *webrtc.TrackLocalStaticSample, f track: track, filePath: filePath, mime: track.Codec().MimeType, - done: make(chan bool), } }