better pli throttling

This commit is contained in:
David Colburn
2021-06-23 12:00:18 -07:00
parent 0ffb8a97e4
commit eda5cfbfd5
5 changed files with 34 additions and 56 deletions

3
go.mod
View File

@@ -5,7 +5,6 @@ go 1.15
require (
github.com/bep/debounce v1.2.0
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d // indirect
github.com/go-logr/zapr v0.4.0
github.com/go-redis/redis/v8 v8.7.1
github.com/google/wire v0.5.0
@@ -47,4 +46,4 @@ require (
replace github.com/pion/webrtc/v3 => github.com/livekit/pion-webrtc/v3 v3.0.30
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864

6
go.sum
View File

@@ -89,8 +89,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d h1:rvSueMilKro0jF+VfxoVR42wazKPl+cUBL3rFbiBGso=
github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d/go.mod h1:jhHJXDlUaWA2g15qvWdleCmRe0Z3noWIQQvjbTZOhGY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -232,8 +230,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95 h1:4Vx5bjf5zD95MEsmIKennsFnHu4CzN4wfy+9JxKTX5Q=
github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95/go.mod h1:0Kn3ywQ/4PneeIsshuSnCNzQrbWSgRlKrgWXiGRLUp0=
github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864 h1:9j7EYJ1h0nUkXLsZxEoRP8EAzMIzWVbi6KzKWLqr/qM=
github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864/go.mod h1:Wx6b4qGUjvSo1kGl+/fHl0ZF48g2IJOjzUFg0yCo9qY=
github.com/livekit/pion-webrtc/v3 v3.0.30 h1:2RomVfztLegWIePXcyPFJDiF5pbT64aIX4RtpXdDksU=
github.com/livekit/pion-webrtc/v3 v3.0.30/go.mod h1:XFQeLYBf++bWWA0sJqh6zF1ouWluosxwTOMOoTZGaD0=
github.com/livekit/protocol v0.5.3 h1:u4J6poLLX3zRFsJVGAz5X8KFv9+6dwffksMW/4EI+14=

View File

@@ -285,7 +285,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
})
t.params.Stats.AddPublishedTrack(t.kind.String())
}
t.receiver.AddUpTrack(track, buff, true)
t.receiver.AddUpTrack(track, buff, false)
// when RID is set, track is simulcasted
t.simulcasted = track.RID() != ""

View File

@@ -401,7 +401,6 @@ func (p *ParticipantImpl) Close() error {
if onClose != nil {
onClose(p)
}
p.pliThrottle.close()
p.publisher.Close()
p.subscriber.Close()
close(p.rtcpCh)
@@ -839,9 +838,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
if p.IsReady() && p.onTrackUpdated != nil {
p.onTrackUpdated(p, track)
}
if mt, ok := track.(*MediaTrack); ok {
p.pliThrottle.removeTrack(uint32(mt.ssrc))
}
track.OnClose(nil)
})
@@ -933,36 +929,35 @@ func (p *ParticipantImpl) downTracksRTCPWorker() {
func (p *ParticipantImpl) rtcpSendWorker() {
defer Recover()
write := func(pkts []rtcp.Packet) {
if err := p.publisher.pc.WriteRTCP(pkts); err != nil {
logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity())
}
}
// read from rtcpChan
for pkts := range p.rtcpCh {
if pkts == nil {
return
}
fwdPkts := make([]rtcp.Packet, 0, len(pkts))
for _, pkt := range pkts {
var mediaSSRC uint32
switch pkt.(type) {
case *rtcp.PictureLossIndication:
mediaSSRC = pkt.(*rtcp.PictureLossIndication).MediaSSRC
mediaSSRC := pkt.(*rtcp.PictureLossIndication).MediaSSRC
if p.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
case *rtcp.FullIntraRequest:
mediaSSRC = pkt.(*rtcp.FullIntraRequest).MediaSSRC
mediaSSRC := pkt.(*rtcp.FullIntraRequest).MediaSSRC
if p.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
default:
fwdPkts = append(fwdPkts, pkt)
continue
}
p.pliThrottle.add(mediaSSRC, func() { write([]rtcp.Packet{pkt}) })
}
if len(fwdPkts) > 0 {
write(fwdPkts)
if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil {
logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity())
}
}
}
}

View File

@@ -4,15 +4,14 @@ import (
"sync"
"time"
"github.com/frostbyte73/go-throttle"
"github.com/livekit/livekit-server/pkg/config"
)
type pliThrottle struct {
config config.PLIThrottleConfig
mu sync.RWMutex
throttles map[uint32]func(func())
config config.PLIThrottleConfig
mu sync.RWMutex
periods map[uint32]int64
lastSent map[uint32]int64
}
// github.com/pion/ion-sfu/pkg/sfu/simulcast.go
@@ -24,8 +23,9 @@ const (
func newPLIThrottle(conf config.PLIThrottleConfig) *pliThrottle {
return &pliThrottle{
config: conf,
throttles: make(map[uint32]func(func())),
config: conf,
periods: make(map[uint32]int64),
lastSent: make(map[uint32]int64),
}
}
@@ -45,34 +45,20 @@ func (t *pliThrottle) addTrack(ssrc uint32, rid string) {
duration = t.config.MidQuality
}
t.throttles[ssrc] = throttle.New(duration)
t.periods[ssrc] = duration.Nanoseconds()
}
func (t *pliThrottle) add(ssrc uint32, f func()) {
t.mu.RLock()
defer t.mu.RUnlock()
if trackThrottle, ok := t.throttles[ssrc]; ok {
trackThrottle(f)
}
}
func (t *pliThrottle) removeTrack(ssrc uint32) {
func (t *pliThrottle) canSend(ssrc uint32) bool {
t.mu.Lock()
defer t.mu.Unlock()
if trackThrottle, ok := t.throttles[ssrc]; ok {
trackThrottle(func() {})
delete(t.throttles, ssrc)
}
}
func (t *pliThrottle) close() {
t.mu.Lock()
defer t.mu.Unlock()
for ssrc, trackThrottle := range t.throttles {
trackThrottle(func() {})
delete(t.throttles, ssrc)
if period, ok := t.periods[ssrc]; ok {
if n := time.Now().UnixNano(); n-t.lastSent[ssrc] > period {
t.lastSent[ssrc] = n
return true
} else {
return false
}
}
return true
}