diff --git a/go.mod b/go.mod index ce698994b..e32aacef3 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.15 require ( github.com/bep/debounce v1.2.0 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect - github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d // indirect github.com/go-logr/zapr v0.4.0 github.com/go-redis/redis/v8 v8.7.1 github.com/google/wire v0.5.0 @@ -47,4 +46,4 @@ require ( replace github.com/pion/webrtc/v3 => github.com/livekit/pion-webrtc/v3 v3.0.30 -replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95 +replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864 diff --git a/go.sum b/go.sum index cae7c473d..2c9ed399c 100644 --- a/go.sum +++ b/go.sum @@ -89,8 +89,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= -github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d h1:rvSueMilKro0jF+VfxoVR42wazKPl+cUBL3rFbiBGso= -github.com/frostbyte73/go-throttle v0.0.0-20210621200530-8018c891361d/go.mod h1:jhHJXDlUaWA2g15qvWdleCmRe0Z3noWIQQvjbTZOhGY= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -232,8 +230,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95 h1:4Vx5bjf5zD95MEsmIKennsFnHu4CzN4wfy+9JxKTX5Q= -github.com/livekit/ion-sfu v1.10.7-0.20210621205540-f74d53541f95/go.mod h1:0Kn3ywQ/4PneeIsshuSnCNzQrbWSgRlKrgWXiGRLUp0= +github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864 h1:9j7EYJ1h0nUkXLsZxEoRP8EAzMIzWVbi6KzKWLqr/qM= +github.com/livekit/ion-sfu v1.10.7-0.20210623183123-16d9c5c00864/go.mod h1:Wx6b4qGUjvSo1kGl+/fHl0ZF48g2IJOjzUFg0yCo9qY= github.com/livekit/pion-webrtc/v3 v3.0.30 h1:2RomVfztLegWIePXcyPFJDiF5pbT64aIX4RtpXdDksU= github.com/livekit/pion-webrtc/v3 v3.0.30/go.mod h1:XFQeLYBf++bWWA0sJqh6zF1ouWluosxwTOMOoTZGaD0= github.com/livekit/protocol v0.5.3 h1:u4J6poLLX3zRFsJVGAz5X8KFv9+6dwffksMW/4EI+14= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index a73b56421..412151107 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -285,7 +285,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra }) t.params.Stats.AddPublishedTrack(t.kind.String()) } - t.receiver.AddUpTrack(track, buff, true) + t.receiver.AddUpTrack(track, buff, false) // when RID is set, track is simulcasted t.simulcasted = track.RID() != "" diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fd8b98872..ff4bb2a93 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -401,7 +401,6 @@ func (p *ParticipantImpl) Close() error { if onClose != nil { onClose(p) } - p.pliThrottle.close() p.publisher.Close() p.subscriber.Close() close(p.rtcpCh) @@ -839,9 +838,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) { if p.IsReady() && p.onTrackUpdated != nil { p.onTrackUpdated(p, track) } - if mt, ok := track.(*MediaTrack); ok { - p.pliThrottle.removeTrack(uint32(mt.ssrc)) - } track.OnClose(nil) }) @@ -933,36 +929,35 @@ func (p *ParticipantImpl) downTracksRTCPWorker() { func (p *ParticipantImpl) rtcpSendWorker() { defer Recover() - write := func(pkts []rtcp.Packet) { - if err := p.publisher.pc.WriteRTCP(pkts); err != nil { - logger.Errorw("could not write RTCP to participant", err, - "participant", p.Identity()) - } - } - // read from rtcpChan for pkts := range p.rtcpCh { if pkts == nil { return } + fwdPkts := make([]rtcp.Packet, 0, len(pkts)) for _, pkt := range pkts { - var mediaSSRC uint32 switch pkt.(type) { case *rtcp.PictureLossIndication: - mediaSSRC = pkt.(*rtcp.PictureLossIndication).MediaSSRC + mediaSSRC := pkt.(*rtcp.PictureLossIndication).MediaSSRC + if p.pliThrottle.canSend(mediaSSRC) { + fwdPkts = append(fwdPkts, pkt) + } case *rtcp.FullIntraRequest: - mediaSSRC = pkt.(*rtcp.FullIntraRequest).MediaSSRC + mediaSSRC := pkt.(*rtcp.FullIntraRequest).MediaSSRC + if p.pliThrottle.canSend(mediaSSRC) { + fwdPkts = append(fwdPkts, pkt) + } default: fwdPkts = append(fwdPkts, pkt) - continue } - - p.pliThrottle.add(mediaSSRC, func() { write([]rtcp.Packet{pkt}) }) } if len(fwdPkts) > 0 { - write(fwdPkts) + if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil { + logger.Errorw("could not write RTCP to participant", err, + "participant", p.Identity()) + } } } } diff --git a/pkg/rtc/plithrottle.go b/pkg/rtc/plithrottle.go index e8e907f67..f97d815d6 100644 --- a/pkg/rtc/plithrottle.go +++ b/pkg/rtc/plithrottle.go @@ -4,15 +4,14 @@ import ( "sync" "time" - "github.com/frostbyte73/go-throttle" - "github.com/livekit/livekit-server/pkg/config" ) type pliThrottle struct { - config config.PLIThrottleConfig - mu sync.RWMutex - throttles map[uint32]func(func()) + config config.PLIThrottleConfig + mu sync.RWMutex + periods map[uint32]int64 + lastSent map[uint32]int64 } // github.com/pion/ion-sfu/pkg/sfu/simulcast.go @@ -24,8 +23,9 @@ const ( func newPLIThrottle(conf config.PLIThrottleConfig) *pliThrottle { return &pliThrottle{ - config: conf, - throttles: make(map[uint32]func(func())), + config: conf, + periods: make(map[uint32]int64), + lastSent: make(map[uint32]int64), } } @@ -45,34 +45,20 @@ func (t *pliThrottle) addTrack(ssrc uint32, rid string) { duration = t.config.MidQuality } - t.throttles[ssrc] = throttle.New(duration) + t.periods[ssrc] = duration.Nanoseconds() } -func (t *pliThrottle) add(ssrc uint32, f func()) { - t.mu.RLock() - defer t.mu.RUnlock() - - if trackThrottle, ok := t.throttles[ssrc]; ok { - trackThrottle(f) - } -} - -func (t *pliThrottle) removeTrack(ssrc uint32) { +func (t *pliThrottle) canSend(ssrc uint32) bool { t.mu.Lock() defer t.mu.Unlock() - if trackThrottle, ok := t.throttles[ssrc]; ok { - trackThrottle(func() {}) - delete(t.throttles, ssrc) - } -} - -func (t *pliThrottle) close() { - t.mu.Lock() - defer t.mu.Unlock() - - for ssrc, trackThrottle := range t.throttles { - trackThrottle(func() {}) - delete(t.throttles, ssrc) + if period, ok := t.periods[ssrc]; ok { + if n := time.Now().UnixNano(); n-t.lastSent[ssrc] > period { + t.lastSent[ssrc] = n + return true + } else { + return false + } } + return true }