From eebc2f5acd3c316dd6dddd34a655f5f628d74f0e Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 21 Jun 2021 17:13:58 -0700 Subject: [PATCH] add throttle config --- cmd/server/main.go | 3 +- pkg/config/config.go | 15 ++++++ pkg/rtc/participant.go | 55 +++----------------- pkg/rtc/participant_internal_test.go | 1 + pkg/rtc/rtcpthrottle.go | 78 ++++++++++++++++++++++++++++ pkg/service/roommanager.go | 1 + 6 files changed, 103 insertions(+), 50 deletions(-) create mode 100644 pkg/rtc/rtcpthrottle.go diff --git a/cmd/server/main.go b/cmd/server/main.go index bb535f0b4..7089557e4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,12 +16,13 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli/v2" + "github.com/livekit/protocol/auth" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/version" - "github.com/livekit/protocol/auth" ) func init() { diff --git a/pkg/config/config.go b/pkg/config/config.go index ffc3fc9cf..741d4df4b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -2,6 +2,7 @@ package config import ( "os" + "time" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" @@ -37,6 +38,15 @@ type RTCConfig struct { // Max bitrate for REMB MaxBitrate uint64 `yaml:"max_bitrate"` + + // Throttle periods for rtcp packets + Throttle RTCPThrottleConfig `yaml:"rtcp_throttle"` +} + +type RTCPThrottleConfig struct { + LowQuality time.Duration `yaml:"low_quality"` + MidQuality time.Duration `yaml:"mid_quality"` + HighQuality time.Duration `yaml:"high_quality"` } type AudioConfig struct { @@ -80,6 +90,11 @@ func NewConfig(confString string) (*Config, error) { }, MaxBitrate: 3 * 1024 * 1024, // 3 mbps PacketBufferSize: 500, + Throttle: RTCPThrottleConfig{ + LowQuality: time.Second, + MidQuality: time.Second * 2, + HighQuality: time.Second * 3, + }, }, Audio: AudioConfig{ ActiveLevel: 40, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 41eab98a1..90dd5c91c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/frostbyte73/go-throttle" "github.com/pion/ion-sfu/pkg/sfu" "github.com/pion/ion-sfu/pkg/twcc" "github.com/pion/rtcp" @@ -38,6 +37,7 @@ type ParticipantParams struct { AudioConfig config.AudioConfig ProtocolVersion types.ProtocolVersion Stats *RoomStatsReporter + ThrottleConfig config.RTCPThrottleConfig } type ParticipantImpl struct { @@ -88,12 +88,10 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { // TODO: check to ensure params are valid, id and identity can't be empty p := &ParticipantImpl{ - params: params, - id: utils.NewGuid(utils.ParticipantPrefix), - rtcpCh: make(chan []rtcp.Packet, 50), - rtcpThrottle: &rtcpThrottle{ - throttles: make(map[uint32]func(func())), - }, + params: params, + id: utils.NewGuid(utils.ParticipantPrefix), + rtcpCh: make(chan []rtcp.Packet, 50), + rtcpThrottle: newRtcpThrottle(params.ThrottleConfig), subscribedTracks: make(map[string][]types.SubscribedTrack), publishedTracks: make(map[string]types.PublishedTrack, 0), pendingTracks: make(map[string]*livekit.TrackInfo), @@ -741,7 +739,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } ssrc := uint32(track.SSRC()) - p.rtcpThrottle.addTrack(ssrc) + p.rtcpThrottle.addTrack(ssrc, track.RID()) if p.twcc == nil { p.twcc = twcc.NewTransportWideCCResponder(ssrc) p.twcc.OnFeedback(func(pkt rtcp.RawPacket) { @@ -968,44 +966,3 @@ func (p *ParticipantImpl) rtcpSendWorker() { } } } - -type rtcpThrottle struct { - mu sync.RWMutex - throttles map[uint32]func(func()) -} - -func (t *rtcpThrottle) addTrack(ssrc uint32) { - t.mu.Lock() - defer t.mu.Unlock() - - t.throttles[ssrc] = throttle.New(time.Millisecond * 500) -} - -func (t *rtcpThrottle) add(ssrc uint32, f func()) { - t.mu.RLock() - defer t.mu.RUnlock() - - if trackThrottle, ok := t.throttles[ssrc]; ok { - trackThrottle(f) - } -} - -func (t *rtcpThrottle) removeTrack(ssrc uint32) { - t.mu.Lock() - defer t.mu.Unlock() - - if trackThrottle, ok := t.throttles[ssrc]; ok { - trackThrottle(func() {}) - delete(t.throttles, ssrc) - } -} - -func (t *rtcpThrottle) close() { - t.mu.Lock() - defer t.mu.Unlock() - - for ssrc, trackThrottle := range t.throttles { - trackThrottle(func() {}) - delete(t.throttles, ssrc) - } -} diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 0558b9abb..c4150acc9 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -173,6 +173,7 @@ func newParticipantForTest(identity string) *ParticipantImpl { Config: rtcConf, Sink: &routingfakes.FakeMessageSink{}, ProtocolVersion: 0, + ThrottleConfig: conf.RTC.Throttle, }) return p } diff --git a/pkg/rtc/rtcpthrottle.go b/pkg/rtc/rtcpthrottle.go new file mode 100644 index 000000000..9ca644a29 --- /dev/null +++ b/pkg/rtc/rtcpthrottle.go @@ -0,0 +1,78 @@ +package rtc + +import ( + "sync" + "time" + + "github.com/frostbyte73/go-throttle" + + "github.com/livekit/livekit-server/pkg/config" +) + +type rtcpThrottle struct { + config config.RTCPThrottleConfig + mu sync.RWMutex + throttles map[uint32]func(func()) +} + +// github.com/pion/ion-sfu/pkg/sfu/simulcast.go +const ( + fullResolution = "f" + halfResolution = "h" + quarterResolution = "q" +) + +func newRtcpThrottle(conf config.RTCPThrottleConfig) *rtcpThrottle { + return &rtcpThrottle{ + config: conf, + throttles: make(map[uint32]func(func())), + } +} + +func (t *rtcpThrottle) addTrack(ssrc uint32, rid string) { + t.mu.Lock() + defer t.mu.Unlock() + + var duration time.Duration + switch rid { + case fullResolution: + duration = t.config.HighQuality + case halfResolution: + duration = t.config.MidQuality + case quarterResolution: + duration = t.config.LowQuality + default: + duration = t.config.MidQuality + } + + t.throttles[ssrc] = throttle.New(duration) +} + +func (t *rtcpThrottle) add(ssrc uint32, f func()) { + t.mu.RLock() + defer t.mu.RUnlock() + + if trackThrottle, ok := t.throttles[ssrc]; ok { + trackThrottle(f) + } +} + +func (t *rtcpThrottle) removeTrack(ssrc uint32) { + t.mu.Lock() + defer t.mu.Unlock() + + if trackThrottle, ok := t.throttles[ssrc]; ok { + trackThrottle(func() {}) + delete(t.throttles, ssrc) + } +} + +func (t *rtcpThrottle) close() { + t.mu.Lock() + defer t.mu.Unlock() + + for ssrc, trackThrottle := range t.throttles { + trackThrottle(func() {}) + delete(t.throttles, ssrc) + } +} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 09ff82c54..efe62d22a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -261,6 +261,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, AudioConfig: r.config.Audio, ProtocolVersion: pv, Stats: room.GetStatsReporter(), + ThrottleConfig: r.config.RTC.Throttle, }) if err != nil { logger.Errorw("could not create participant", err)