From ceae58ac203488b176423c7bbfbec3eaa221104d Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 13 Nov 2021 22:59:53 -0800 Subject: [PATCH] Fixed deadlocks occurring in Receiver writeRTP (#189) When we RLock during write cycles, the mutex spends the majority of its time staying locked. As new participants join, they have to acquire the WLock before downtracks could be add it. In load test scenarios (25 participants joining together), it's common to see goroutine dump showing MediaTrack.AddSubscriber -> DownTrack.storeDownTrack trying to acquire mutex, and never able to acquire it. --- pkg/rtc/mediatrack.go | 2 +- pkg/rtc/participant.go | 7 +------ pkg/service/wire_gen.go | 3 ++- pkg/sfu/receiver.go | 17 ++++++++++------- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 4514972e4..66a7ec613 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -199,7 +199,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { // // AddTrack will create a new transceiver or re-use an unused one // if the attributes match. This prevents SDP from bloating - // because of dormant transceivers buidling up. + // because of dormant transceivers building up. // sender, err = sub.SubscriberPC().AddTrack(downTrack) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 19142314a..5d68beef8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -472,12 +472,7 @@ func (p *ParticipantImpl) ICERestart() error { // AddSubscriber subscribes op to all publishedTracks func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) { - p.lock.RLock() - tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks)) - for _, t := range p.publishedTracks { - tracks = append(tracks, t) - } - p.lock.RUnlock() + tracks := p.GetPublishedTracks() if len(tracks) == 0 { return 0, nil diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 6384229dc..21ccc3e39 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d9a33b314..d0d80e685 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -461,8 +461,9 @@ func (w *WebRTCReceiver) DeleteDownTrack(peerID string) { func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) { if _, ok := p[0].(*rtcp.PictureLossIndication); ok { w.rtcpMu.Lock() - defer w.rtcpMu.Unlock() - if time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle { + throttled := time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle + w.rtcpMu.Unlock() + if throttled { return } w.lastPli.set(time.Now().UnixNano()) @@ -568,9 +569,12 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } w.downTrackMu.RLock() - if w.lbThreshold == 0 || len(w.downTracks)-len(w.free) < w.lbThreshold { + downTracks := w.downTracks + free := w.free + w.downTrackMu.RUnlock() + if w.lbThreshold == 0 || len(downTracks)-len(free) < w.lbThreshold { // serial - not enough down tracks for parallelization to outweigh overhead - for _, dt := range w.downTracks { + for _, dt := range downTracks { if dt != nil { w.writeRTP(layer, dt, pkt, pli) } @@ -578,7 +582,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } else { // parallel - enables much more efficient multi-core utilization start := uint64(0) - end := uint64(len(w.downTracks)) + end := uint64(len(downTracks)) // 100µs is enough to amortize the overhead and provide sufficient load balancing. // WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop. @@ -596,7 +600,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } for i := n - step; i < n && i < end; i++ { - if dt := w.downTracks[i]; dt != nil { + if dt := downTracks[i]; dt != nil { w.writeRTP(layer, dt, pkt, pli) } } @@ -605,7 +609,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } wg.Wait() } - w.downTrackMu.RUnlock() } }