mirror of
https://github.com/livekit/livekit.git
synced 2026-05-24 16:55:35 +00:00
add throttle config
This commit is contained in:
+2
-1
@@ -16,12 +16,13 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/livekit/protocol/auth"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/logger"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/service"
|
||||
"github.com/livekit/livekit-server/version"
|
||||
"github.com/livekit/protocol/auth"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"github.com/urfave/cli/v2"
|
||||
@@ -37,6 +38,15 @@ type RTCConfig struct {
|
||||
|
||||
// Max bitrate for REMB
|
||||
MaxBitrate uint64 `yaml:"max_bitrate"`
|
||||
|
||||
// Throttle periods for rtcp packets
|
||||
Throttle RTCPThrottleConfig `yaml:"rtcp_throttle"`
|
||||
}
|
||||
|
||||
type RTCPThrottleConfig struct {
|
||||
LowQuality time.Duration `yaml:"low_quality"`
|
||||
MidQuality time.Duration `yaml:"mid_quality"`
|
||||
HighQuality time.Duration `yaml:"high_quality"`
|
||||
}
|
||||
|
||||
type AudioConfig struct {
|
||||
@@ -80,6 +90,11 @@ func NewConfig(confString string) (*Config, error) {
|
||||
},
|
||||
MaxBitrate: 3 * 1024 * 1024, // 3 mbps
|
||||
PacketBufferSize: 500,
|
||||
Throttle: RTCPThrottleConfig{
|
||||
LowQuality: time.Second,
|
||||
MidQuality: time.Second * 2,
|
||||
HighQuality: time.Second * 3,
|
||||
},
|
||||
},
|
||||
Audio: AudioConfig{
|
||||
ActiveLevel: 40,
|
||||
|
||||
+6
-49
@@ -7,7 +7,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/frostbyte73/go-throttle"
|
||||
"github.com/pion/ion-sfu/pkg/sfu"
|
||||
"github.com/pion/ion-sfu/pkg/twcc"
|
||||
"github.com/pion/rtcp"
|
||||
@@ -38,6 +37,7 @@ type ParticipantParams struct {
|
||||
AudioConfig config.AudioConfig
|
||||
ProtocolVersion types.ProtocolVersion
|
||||
Stats *RoomStatsReporter
|
||||
ThrottleConfig config.RTCPThrottleConfig
|
||||
}
|
||||
|
||||
type ParticipantImpl struct {
|
||||
@@ -88,12 +88,10 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
// TODO: check to ensure params are valid, id and identity can't be empty
|
||||
|
||||
p := &ParticipantImpl{
|
||||
params: params,
|
||||
id: utils.NewGuid(utils.ParticipantPrefix),
|
||||
rtcpCh: make(chan []rtcp.Packet, 50),
|
||||
rtcpThrottle: &rtcpThrottle{
|
||||
throttles: make(map[uint32]func(func())),
|
||||
},
|
||||
params: params,
|
||||
id: utils.NewGuid(utils.ParticipantPrefix),
|
||||
rtcpCh: make(chan []rtcp.Packet, 50),
|
||||
rtcpThrottle: newRtcpThrottle(params.ThrottleConfig),
|
||||
subscribedTracks: make(map[string][]types.SubscribedTrack),
|
||||
publishedTracks: make(map[string]types.PublishedTrack, 0),
|
||||
pendingTracks: make(map[string]*livekit.TrackInfo),
|
||||
@@ -741,7 +739,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
|
||||
}
|
||||
|
||||
ssrc := uint32(track.SSRC())
|
||||
p.rtcpThrottle.addTrack(ssrc)
|
||||
p.rtcpThrottle.addTrack(ssrc, track.RID())
|
||||
if p.twcc == nil {
|
||||
p.twcc = twcc.NewTransportWideCCResponder(ssrc)
|
||||
p.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
|
||||
@@ -968,44 +966,3 @@ func (p *ParticipantImpl) rtcpSendWorker() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type rtcpThrottle struct {
|
||||
mu sync.RWMutex
|
||||
throttles map[uint32]func(func())
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) addTrack(ssrc uint32) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.throttles[ssrc] = throttle.New(time.Millisecond * 500)
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) add(ssrc uint32, f func()) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if trackThrottle, ok := t.throttles[ssrc]; ok {
|
||||
trackThrottle(f)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) removeTrack(ssrc uint32) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if trackThrottle, ok := t.throttles[ssrc]; ok {
|
||||
trackThrottle(func() {})
|
||||
delete(t.throttles, ssrc)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) close() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
for ssrc, trackThrottle := range t.throttles {
|
||||
trackThrottle(func() {})
|
||||
delete(t.throttles, ssrc)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ func newParticipantForTest(identity string) *ParticipantImpl {
|
||||
Config: rtcConf,
|
||||
Sink: &routingfakes.FakeMessageSink{},
|
||||
ProtocolVersion: 0,
|
||||
ThrottleConfig: conf.RTC.Throttle,
|
||||
})
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/frostbyte73/go-throttle"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
)
|
||||
|
||||
type rtcpThrottle struct {
|
||||
config config.RTCPThrottleConfig
|
||||
mu sync.RWMutex
|
||||
throttles map[uint32]func(func())
|
||||
}
|
||||
|
||||
// github.com/pion/ion-sfu/pkg/sfu/simulcast.go
|
||||
const (
|
||||
fullResolution = "f"
|
||||
halfResolution = "h"
|
||||
quarterResolution = "q"
|
||||
)
|
||||
|
||||
func newRtcpThrottle(conf config.RTCPThrottleConfig) *rtcpThrottle {
|
||||
return &rtcpThrottle{
|
||||
config: conf,
|
||||
throttles: make(map[uint32]func(func())),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) addTrack(ssrc uint32, rid string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
var duration time.Duration
|
||||
switch rid {
|
||||
case fullResolution:
|
||||
duration = t.config.HighQuality
|
||||
case halfResolution:
|
||||
duration = t.config.MidQuality
|
||||
case quarterResolution:
|
||||
duration = t.config.LowQuality
|
||||
default:
|
||||
duration = t.config.MidQuality
|
||||
}
|
||||
|
||||
t.throttles[ssrc] = throttle.New(duration)
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) add(ssrc uint32, f func()) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if trackThrottle, ok := t.throttles[ssrc]; ok {
|
||||
trackThrottle(f)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) removeTrack(ssrc uint32) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if trackThrottle, ok := t.throttles[ssrc]; ok {
|
||||
trackThrottle(func() {})
|
||||
delete(t.throttles, ssrc)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *rtcpThrottle) close() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
for ssrc, trackThrottle := range t.throttles {
|
||||
trackThrottle(func() {})
|
||||
delete(t.throttles, ssrc)
|
||||
}
|
||||
}
|
||||
@@ -261,6 +261,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
|
||||
AudioConfig: r.config.Audio,
|
||||
ProtocolVersion: pv,
|
||||
Stats: room.GetStatsReporter(),
|
||||
ThrottleConfig: r.config.RTC.Throttle,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorw("could not create participant", err)
|
||||
|
||||
Reference in New Issue
Block a user