From d69e9f451ecffa4fd65ca578a98c36b7de3a95a3 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Mon, 21 Jun 2021 18:14:20 -0700 Subject: [PATCH] Smooth audio levels by averaging over samples. --- pkg/config/config.go | 4 +++ pkg/rtc/audiolevel.go | 2 +- pkg/rtc/room.go | 62 ++++++++++++++++++++++++++++++-------- pkg/rtc/room_test.go | 35 +++++++++++++++++++-- pkg/service/roommanager.go | 2 +- 5 files changed, 87 insertions(+), 18 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 741d4df4b..29af8a9c4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -57,6 +57,9 @@ type AudioConfig struct { MinPercentile uint8 `yaml:"min_percentile"` // interval to update clients, in ms UpdateInterval uint32 `yaml:"update_interval"` + // smoothing for audioLevel values sent to the client. + // audioLevel will be an average of `smooth_samples`, 0 to disable + SmoothSamples uint32 `yaml:"smooth_samples"` } type RedisConfig struct { @@ -100,6 +103,7 @@ func NewConfig(confString string) (*Config, error) { ActiveLevel: 40, MinPercentile: 15, UpdateInterval: 500, + SmoothSamples: 5, }, Redis: RedisConfig{}, TURN: TURNConfig{ diff --git a/pkg/rtc/audiolevel.go b/pkg/rtc/audiolevel.go index 2cb8b28c6..18c37e8d1 100644 --- a/pkg/rtc/audiolevel.go +++ b/pkg/rtc/audiolevel.go @@ -65,6 +65,6 @@ func (l *AudioLevel) GetLevel() (level uint8, noisy bool) { return } -func convertAudioLevel(level uint8) float32 { +func ConvertAudioLevel(level uint8) float32 { return (127 - float32(level)) / 127 } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1ede889d9..6888ed94f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-logr/zapr" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/utils" "github.com/pion/ion-sfu/pkg/buffer" "google.golang.org/protobuf/proto" @@ -38,8 +39,8 @@ type Room struct { isClosed utils.AtomicFlag // for active speaker updates - audioUpdateInterval uint32 - lastActiveSpeakers []*livekit.SpeakerInfo + audioConfig *config.AudioConfig + lastActiveSpeakers []*livekit.SpeakerInfo statsReporter *RoomStatsReporter @@ -51,16 +52,16 @@ type ParticipantOptions struct { AutoSubscribe bool } -func NewRoom(room *livekit.Room, config WebRTCConfig, iceServers []*livekit.ICEServer, audioUpdateInterval uint32) *Room { +func NewRoom(room *livekit.Room, config WebRTCConfig, iceServers []*livekit.ICEServer, audioConfig *config.AudioConfig) *Room { r := &Room{ - Room: proto.Clone(room).(*livekit.Room), - config: config, - iceServers: iceServers, - audioUpdateInterval: audioUpdateInterval, - statsReporter: NewRoomStatsReporter(room.Name), - participants: make(map[string]types.Participant), - participantOpts: make(map[string]*ParticipantOptions), - bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, zapr.NewLogger(logger.GetLogger())), + Room: proto.Clone(room).(*livekit.Room), + config: config, + iceServers: iceServers, + audioConfig: audioConfig, + statsReporter: NewRoomStatsReporter(room.Name), + participants: make(map[string]types.Participant), + participantOpts: make(map[string]*ParticipantOptions), + bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, zapr.NewLogger(logger.GetLogger())), } if r.Room.EmptyTimeout == 0 { r.Room.EmptyTimeout = DefaultEmptyTimeout @@ -99,7 +100,7 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { } speakers = append(speakers, &livekit.SpeakerInfo{ Sid: p.ID(), - Level: convertAudioLevel(level), + Level: ConvertAudioLevel(level), Active: active, }) } @@ -494,12 +495,47 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { } func (r *Room) audioUpdateWorker() { + smoothValues := make(map[string]float32) + smoothSamples := float32(r.audioConfig.SmoothSamples) + activeThreshold := float32(0.5) + for { if r.isClosed.Get() { return } speakers := r.GetActiveSpeakers() + if smoothSamples > 1 { + seenSids := make(map[string]bool) + for _, speaker := range speakers { + speaker.Level += (speaker.Level - smoothValues[speaker.Sid]) / smoothSamples + smoothValues[speaker.Sid] = speaker.Level + seenSids[speaker.Sid] = true + } + + // ensure that previous active speakers are also included + for sid, level := range smoothValues { + if seenSids[sid] { + continue + } + level += (0 - level) / smoothSamples + smoothValues[sid] = level + + if level > activeThreshold { + speakers = append(speakers, &livekit.SpeakerInfo{ + Sid: sid, + Level: level, + Active: true, + }) + } else { + delete(smoothValues, sid) + } + } + + sort.Slice(speakers, func(i, j int) bool { + return speakers[i].Level > speakers[j].Level + }) + } // see if an update is needed if len(speakers) == len(r.lastActiveSpeakers) { @@ -515,6 +551,6 @@ func (r *Room) audioUpdateWorker() { r.lastActiveSpeakers = speakers - time.Sleep(time.Duration(r.audioUpdateInterval) * time.Millisecond) + time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond) } } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 8a852c139..827840cf5 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/testutils" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/logger" @@ -335,6 +337,29 @@ func TestActiveSpeakers(t *testing.T) { require.Len(t, updates, 2) require.Empty(t, updates[1].Speakers) }) + + t.Run("audio level is smoothed", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothSamples: 3}) + participants := rm.GetParticipants() + p := participants[0].(*typesfakes.FakeParticipant) + op := participants[1].(*typesfakes.FakeParticipant) + p.GetAudioLevelReturns(30, true) + convertedLevel := rtc.ConvertAudioLevel(30) + testutils.WithTimeout(t, "checking first update is received", func() bool { + updates := getActiveSpeakerUpdates(op) + if len(updates) == 0 { + return false + } + lastSpeakers := updates[len(updates)-1].Speakers + if len(lastSpeakers) == 0 { + return false + } + if lastSpeakers[0].Level < convertedLevel/2 { + return true + } + return false + }) + }) } func TestDataChannel(t *testing.T) { @@ -399,8 +424,9 @@ func TestDataChannel(t *testing.T) { } type testRoomOpts struct { - num int - protocol types.ProtocolVersion + num int + protocol types.ProtocolVersion + audioSmoothSamples uint32 } func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { @@ -414,7 +440,10 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { }, }, }, - audioUpdateInterval, + &config.AudioConfig{ + UpdateInterval: audioUpdateInterval, + SmoothSamples: opts.audioSmoothSamples, + }, ) for i := 0; i < opts.num; i++ { identity := fmt.Sprintf("p%d", i) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index efe62d22a..0cd407041 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -304,7 +304,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { } // construct ice servers - room = rtc.NewRoom(ri, *r.rtcConfig, r.iceServersForRoom(ri), r.config.Audio.UpdateInterval) + room = rtc.NewRoom(ri, *r.rtcConfig, r.iceServersForRoom(ri), &r.config.Audio) room.OnClose(func() { if err := r.DeleteRoom(roomName); err != nil { logger.Errorw("could not delete room", err)