mirror of
https://github.com/livekit/livekit.git
synced 2026-05-24 19:05:36 +00:00
Fixed deadlocks occurring in Receiver writeRTP (#189)
When we RLock during write cycles, the mutex spends the majority of its time staying locked. As new participants join, they have to acquire the WLock before downtracks could be add it. In load test scenarios (25 participants joining together), it's common to see goroutine dump showing MediaTrack.AddSubscriber -> DownTrack.storeDownTrack trying to acquire mutex, and never able to acquire it.
This commit is contained in:
@@ -199,7 +199,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
//
|
||||
// AddTrack will create a new transceiver or re-use an unused one
|
||||
// if the attributes match. This prevents SDP from bloating
|
||||
// because of dormant transceivers buidling up.
|
||||
// because of dormant transceivers building up.
|
||||
//
|
||||
sender, err = sub.SubscriberPC().AddTrack(downTrack)
|
||||
if err != nil {
|
||||
|
||||
@@ -472,12 +472,7 @@ func (p *ParticipantImpl) ICERestart() error {
|
||||
|
||||
// AddSubscriber subscribes op to all publishedTracks
|
||||
func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
|
||||
p.lock.RLock()
|
||||
tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks))
|
||||
for _, t := range p.publishedTracks {
|
||||
tracks = append(tracks, t)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
tracks := p.GetPublishedTracks()
|
||||
|
||||
if len(tracks) == 0 {
|
||||
return 0, nil
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// Code generated by Wire. DO NOT EDIT.
|
||||
|
||||
//go:generate go run github.com/google/wire/cmd/wire
|
||||
//+build !wireinject
|
||||
//go:build !wireinject
|
||||
// +build !wireinject
|
||||
|
||||
package service
|
||||
|
||||
|
||||
+10
-7
@@ -461,8 +461,9 @@ func (w *WebRTCReceiver) DeleteDownTrack(peerID string) {
|
||||
func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) {
|
||||
if _, ok := p[0].(*rtcp.PictureLossIndication); ok {
|
||||
w.rtcpMu.Lock()
|
||||
defer w.rtcpMu.Unlock()
|
||||
if time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle {
|
||||
throttled := time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle
|
||||
w.rtcpMu.Unlock()
|
||||
if throttled {
|
||||
return
|
||||
}
|
||||
w.lastPli.set(time.Now().UnixNano())
|
||||
@@ -568,9 +569,12 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
|
||||
w.downTrackMu.RLock()
|
||||
if w.lbThreshold == 0 || len(w.downTracks)-len(w.free) < w.lbThreshold {
|
||||
downTracks := w.downTracks
|
||||
free := w.free
|
||||
w.downTrackMu.RUnlock()
|
||||
if w.lbThreshold == 0 || len(downTracks)-len(free) < w.lbThreshold {
|
||||
// serial - not enough down tracks for parallelization to outweigh overhead
|
||||
for _, dt := range w.downTracks {
|
||||
for _, dt := range downTracks {
|
||||
if dt != nil {
|
||||
w.writeRTP(layer, dt, pkt, pli)
|
||||
}
|
||||
@@ -578,7 +582,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
} else {
|
||||
// parallel - enables much more efficient multi-core utilization
|
||||
start := uint64(0)
|
||||
end := uint64(len(w.downTracks))
|
||||
end := uint64(len(downTracks))
|
||||
|
||||
// 100µs is enough to amortize the overhead and provide sufficient load balancing.
|
||||
// WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop.
|
||||
@@ -596,7 +600,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
|
||||
for i := n - step; i < n && i < end; i++ {
|
||||
if dt := w.downTracks[i]; dt != nil {
|
||||
if dt := downTracks[i]; dt != nil {
|
||||
w.writeRTP(layer, dt, pkt, pli)
|
||||
}
|
||||
}
|
||||
@@ -605,7 +609,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
w.downTrackMu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user