Enhancement: audio speakers (#44)

* refactor: active speakers

1. Observe the loudest adjusted with active ratio instead of linear average of decibel values
2. Follow RFC6465 to convert audio level from decibel to linear value.
3. Quantize audio level for stable slice comparison
4. Switch moving average algorithm from MMA to EMA to have the same center of mass with SMA
5. Minor: remove seenSids map allocation
6. Minor: minimize division arithmetic

* Update pkg/rtc/audiolevel.go

Co-authored-by: David Zhao <david@davidzhao.com>
This commit is contained in:
hn8
2021-07-10 00:46:24 +08:00
committed by GitHub
parent 1fd858a16b
commit dcfe7eaf4f
12 changed files with 103 additions and 86 deletions

View File

@@ -76,8 +76,8 @@ keys:
# customize audio level sensitivity
#audio:
# # minimum level to be considered active, 0-127, where 0 is loudest
# # defaults to 40
# active_level: 40
# # defaults to 30
# active_level: 30
# # percentile to measure, a participant is considered active if it has exceeded the
# # ActiveLevel more than MinPercentile% of the time
# # defaults to 40
@@ -85,7 +85,7 @@ keys:
# # frequency in ms to notify changes to clients, defaults to 500
# update_interval: 500
# # to prevent speaker updates from too jumpy, smooth out values over N samples
# smooth_samples: 8
# smooth_intervals: 4
# turn server
#turn:

View File

@@ -60,8 +60,8 @@ type AudioConfig struct {
// interval to update clients, in ms
UpdateInterval uint32 `yaml:"update_interval"`
// smoothing for audioLevel values sent to the client.
// audioLevel will be an average of `smooth_samples`, 0 to disable
SmoothSamples uint32 `yaml:"smooth_samples"`
// audioLevel will be an average of `smooth_intervals`, 0 to disable
SmoothIntervals uint32 `yaml:"smooth_intervals"`
}
type RedisConfig struct {
@@ -113,10 +113,10 @@ func NewConfig(confString string) (*Config, error) {
},
},
Audio: AudioConfig{
ActiveLevel: 40,
MinPercentile: 40,
UpdateInterval: 500,
SmoothSamples: 8,
ActiveLevel: 30, // -30dBov = 0.03
MinPercentile: 40,
UpdateInterval: 500,
SmoothIntervals: 4,
},
Redis: RedisConfig{},
Room: RoomConfig{

View File

@@ -1,70 +1,75 @@
package rtc
import (
"math"
"sync/atomic"
)
const (
// number of samples to compute moving average over
averageSamples = 30
silentAudioLevel = uint8(127)
// number of audio frames for observe window
observeFrames = 25 // webrtc default opus frame size 20ms, 25*20=500ms matching default UpdateInterval
silentAudioLevel = 127
)
// keeps track of audio level for a participant
type AudioLevel struct {
levelThreshold uint8
minPercentile uint8
currentLevel atomic.Value
// min samples to be considered active
minSamples uint32
currentLevel uint32
// min frames to be considered active
minActiveFrames uint32
// for Observe goroutine use
// keeps track of current running average
sum uint32
activeSamples uint32
numSamples uint32
// keeps track of current activity
observeLevel uint8
activeFrames uint32
numFrames uint32
}
func NewAudioLevel(activeLevel uint8, minPercentile uint8) *AudioLevel {
l := &AudioLevel{
levelThreshold: activeLevel,
minPercentile: minPercentile,
minSamples: uint32(minPercentile) * averageSamples / 100,
levelThreshold: activeLevel,
minActiveFrames: uint32(minPercentile) * observeFrames / 100,
currentLevel: silentAudioLevel,
observeLevel: silentAudioLevel,
}
l.currentLevel.Store(silentAudioLevel)
return l
}
// Observes a new sample, must be called from the same thread
// Observes a new frame, must be called from the same thread
func (l *AudioLevel) Observe(level uint8) {
l.numSamples++
l.numFrames++
if level <= l.levelThreshold {
l.activeSamples++
l.sum += uint32(level)
l.activeFrames++
if l.observeLevel > level {
l.observeLevel = level
}
}
if l.numSamples >= averageSamples {
if l.numFrames >= observeFrames {
// compute and reset
if l.activeSamples >= l.minSamples {
l.currentLevel.Store(uint8(l.sum / l.activeSamples))
if l.activeFrames >= l.minActiveFrames {
const invObserveFrames = 1.0 / observeFrames
level := uint32(l.observeLevel) - uint32(20*math.Log10(float64(l.activeFrames)*invObserveFrames))
atomic.StoreUint32(&l.currentLevel, level)
} else {
// nil to represent not noisy
l.currentLevel.Store(silentAudioLevel)
atomic.StoreUint32(&l.currentLevel, silentAudioLevel)
}
l.sum = 0
l.activeSamples = 0
l.numSamples = 0
l.observeLevel = silentAudioLevel
l.activeFrames = 0
l.numFrames = 0
}
}
// returns current audio level, 0 (loudest) to 127 (silent)
func (l *AudioLevel) GetLevel() (level uint8, noisy bool) {
level = l.currentLevel.Load().(uint8)
noisy = level != silentAudioLevel
return
func (l *AudioLevel) GetLevel() (uint8, bool) {
level := uint8(atomic.LoadUint32(&l.currentLevel))
active := level != silentAudioLevel
return level, active
}
// convert decibel back to linear
func ConvertAudioLevel(level uint8) float32 {
return (127 - float32(level)) / 127
const negInv20 = -1.0 / 20
return float32(math.Pow(10, float64(level)*negInv20))
}

View File

@@ -8,7 +8,7 @@ import (
)
const (
samplesPerBatch = 30
samplesPerBatch = 25
defaultActiveLevel = 30
// requires two noisy samples to count
defaultPercentile = 10
@@ -36,7 +36,7 @@ func TestAudioLevel(t *testing.T) {
t.Run("not noisy when less than percentile samples are above threshold", func(t *testing.T) {
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile)
observeSamples(a, 35, samplesPerBatch-1)
observeSamples(a, 35, samplesPerBatch-2)
observeSamples(a, 25, 1)
observeSamples(a, 35, 1)
@@ -47,9 +47,9 @@ func TestAudioLevel(t *testing.T) {
t.Run("noisy when higher than percentile samples are above threshold", func(t *testing.T) {
a := rtc.NewAudioLevel(defaultActiveLevel, defaultPercentile)
observeSamples(a, 35, samplesPerBatch-4)
observeSamples(a, 25, 2)
observeSamples(a, 29, 2)
observeSamples(a, 35, samplesPerBatch-16)
observeSamples(a, 25, 8)
observeSamples(a, 29, 8)
level, noisy := a.GetLevel()
require.True(t, noisy)

View File

@@ -553,7 +553,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) {
}
}
func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) {
func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) {
p.lock.RLock()
defer p.lock.RUnlock()
level = silentAudioLevel
@@ -562,9 +562,9 @@ func (p *ParticipantImpl) GetAudioLevel() (level uint8, noisy bool) {
if mt.audioLevel == nil {
continue
}
tl, tn := mt.audioLevel.GetLevel()
if tn {
noisy = true
tl, ta := mt.audioLevel.GetLevel()
if ta {
active = true
if tl < level {
level = tl
}

View File

@@ -50,7 +50,7 @@ func TestIsReady(t *testing.T) {
func TestICEStateChange(t *testing.T) {
t.Run("onClose gets called when ICE disconnected", func(t *testing.T) {
p := newParticipantForTest("test")
closeChan := make(chan bool, 1)
closeChan := make(chan struct{})
p.onClose = func(participant types.Participant) {
close(closeChan)
}

View File

@@ -1,6 +1,7 @@
package rtc
import (
"math"
"sort"
"sync"
"sync/atomic"
@@ -20,6 +21,7 @@ import (
const (
DefaultEmptyTimeout = 5 * 60 // 5m
DefaultRoomDepartureGrace = 20
AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal
)
type Room struct {
@@ -39,8 +41,7 @@ type Room struct {
isClosed utils.AtomicFlag
// for active speaker updates
audioConfig *config.AudioConfig
lastActiveSpeakers []*livekit.SpeakerInfo
audioConfig *config.AudioConfig
statsReporter *RoomStatsReporter
@@ -507,52 +508,65 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) {
}
func (r *Room) audioUpdateWorker() {
smoothValues := make(map[string]float32)
smoothSamples := float32(r.audioConfig.SmoothSamples)
activeThreshold := float32(0.5)
var smoothValues map[string]float32
var smoothFactor float32
var activeThreshold float32
if ss := r.audioConfig.SmoothIntervals; ss > 1 {
smoothValues = make(map[string]float32)
// exponential moving average (EMA), same center of mass with simple moving average (SMA)
smoothFactor = 2 / float32(ss+1)
activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel)
}
var lastActiveSpeakers []*livekit.SpeakerInfo
for {
if r.isClosed.Get() {
return
}
speakers := r.GetActiveSpeakers()
if smoothSamples > 1 {
seenSids := make(map[string]bool)
if smoothValues != nil {
for _, speaker := range speakers {
smoothValues[speaker.Sid] += (speaker.Level - smoothValues[speaker.Sid]) / smoothSamples
speaker.Level = smoothValues[speaker.Sid]
seenSids[speaker.Sid] = true
sid := speaker.Sid
level := smoothValues[sid]
delete(smoothValues, sid)
// exponential moving average (EMA)
level += (speaker.Level - level) * smoothFactor
speaker.Level = level
}
// ensure that previous active speakers are also included
for sid, level := range smoothValues {
if seenSids[sid] {
continue
}
level += (0 - level) / smoothSamples
smoothValues[sid] = level
delete(smoothValues, sid)
level += -level * smoothFactor
if level > activeThreshold {
speakers = append(speakers, &livekit.SpeakerInfo{
Sid: sid,
Level: level,
Active: true,
})
} else {
delete(smoothValues, sid)
}
}
// smoothValues map is drained, now repopulate it back
for _, speaker := range speakers {
smoothValues[speaker.Sid] = speaker.Level
}
sort.Slice(speakers, func(i, j int) bool {
return speakers[i].Level > speakers[j].Level
})
}
const invAudioLevelQuantization = 1.0 / AudioLevelQuantization
for _, speaker := range speakers {
speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization)
}
// see if an update is needed
if len(speakers) == len(r.lastActiveSpeakers) {
if len(speakers) == len(lastActiveSpeakers) {
for i, speaker := range speakers {
if speaker.Sid != r.lastActiveSpeakers[i].Sid || speaker.Level != r.lastActiveSpeakers[i].Level {
if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid {
r.sendSpeakerUpdates(speakers)
break
}
@@ -561,7 +575,7 @@ func (r *Room) audioUpdateWorker() {
r.sendSpeakerUpdates(speakers)
}
r.lastActiveSpeakers = speakers
lastActiveSpeakers = speakers
time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond)
}

View File

@@ -338,7 +338,7 @@ func TestActiveSpeakers(t *testing.T) {
})
t.Run("audio level is smoothed", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothSamples: 3})
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothIntervals: 3})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
@@ -355,7 +355,7 @@ func TestActiveSpeakers(t *testing.T) {
if len(lastSpeakers) == 0 {
return false
}
if lastSpeakers[0].Level < convertedLevel/2 {
if lastSpeakers[0].Level > convertedLevel {
return true
}
return false
@@ -370,7 +370,7 @@ func TestActiveSpeakers(t *testing.T) {
if len(lastSpeakers) == 0 {
return false
}
if lastSpeakers[0].Level > convertedLevel*0.99 {
if lastSpeakers[0].Level > convertedLevel {
return true
}
return false
@@ -456,9 +456,9 @@ func TestDataChannel(t *testing.T) {
}
type testRoomOpts struct {
num int
protocol types.ProtocolVersion
audioSmoothSamples uint32
num int
protocol types.ProtocolVersion
audioSmoothIntervals uint32
}
func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
@@ -473,8 +473,8 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
},
},
&config.AudioConfig{
UpdateInterval: audioUpdateInterval,
SmoothSamples: opts.audioSmoothSamples,
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
},
)
for i := 0; i < opts.num; i++ {

View File

@@ -51,7 +51,7 @@ type Participant interface {
SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
SetTrackMuted(trackId string, muted bool)
GetAudioLevel() (level uint8, noisy bool)
GetAudioLevel() (level uint8, active bool)
// permissions

View File

@@ -110,7 +110,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
done := make(chan bool, 1)
done := make(chan struct{})
// function exits when websocket terminates, it'll close the event reading off of response sink as well
defer func() {
logger.Infow("WS connection closed", "participant", pi.Identity, "connectionId", connId)

View File

@@ -34,7 +34,7 @@ type LivekitServer struct {
turnServer *turn.Server
currentNode routing.LocalNode
running utils.AtomicFlag
doneChan chan bool
doneChan chan struct{}
}
func NewLivekitServer(conf *config.Config,
@@ -126,7 +126,7 @@ func (s *LivekitServer) Start() error {
return err
}
s.doneChan = make(chan bool, 1)
s.doneChan = make(chan struct{})
// ensure we could listen
ln, err := net.Listen("tcp", s.httpServer.Addr)

View File

@@ -23,7 +23,6 @@ type TrackWriter struct {
track *webrtc.TrackLocalStaticSample
filePath string
mime string
done chan bool
ogg *oggreader.OggReader
ivfheader *ivfreader.IVFFileHeader
@@ -39,7 +38,6 @@ func NewTrackWriter(ctx context.Context, track *webrtc.TrackLocalStaticSample, f
track: track,
filePath: filePath,
mime: track.Codec().MimeType,
done: make(chan bool),
}
}