Fix a couple of more races (#717)

* Use grants clone

* Fix a couple of more races

Use a shadow copy of down tracks in DownTrackSpreader.
Read always uses the shadow.
On Add/Delete of down track, make a new copy.
Copying is done only on add/delete.
If somebody is holding reference to a shadow, it will be in tact as Add/Delete create a new slice.

With this, not seeing any more races in test. So, enabling CI tests with `-race`.

Also fixing another race reported in #603

There are a couple of more races in that bug report that needs to be
chased down.

* Use env suggested in https://lifesaver.codes/answer/runtime-race-detector-sigabrt-or-sigsegv-on-macos-monterey-49138

* staticcheck, did not fail locally, but reported by CI

* use API to get down tracks
This commit is contained in:
Raja Subramanian
2022-05-25 16:14:07 +05:30
committed by GitHub
parent 73db950ea4
commit 77b8e9eecb
4 changed files with 32 additions and 54 deletions

View File

@@ -57,7 +57,7 @@ jobs:
- name: Test
run: |
set -euo pipefail
go test -json -v ./... 2>&1 | tee /tmp/gotest.log | gotestfmt
MallocNanoZone=0 go test -race -json -v ./... 2>&1 | tee /tmp/gotest.log | gotestfmt
# Upload the original go test log as an artifact for later review.
- name: Upload test log

View File

@@ -559,6 +559,9 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
b.Lock()
defer b.Unlock()
b.lastFractionLostToReport = lost
}

View File

@@ -19,19 +19,16 @@ type DownTrackSpreaderParams struct {
type DownTrackSpreader struct {
params DownTrackSpreaderParams
downTrackMu sync.RWMutex
downTracks []TrackSender
index map[livekit.ParticipantID]int
free map[int]struct{}
numProcs int
downTrackMu sync.RWMutex
downTracks map[livekit.ParticipantID]TrackSender
downTracksShadow []TrackSender
numProcs int
}
func NewDownTrackSpreader(params DownTrackSpreaderParams) *DownTrackSpreader {
d := &DownTrackSpreader{
params: params,
downTracks: make([]TrackSender, 0),
index: make(map[livekit.ParticipantID]int),
free: make(map[int]struct{}),
downTracks: make(map[livekit.ParticipantID]TrackSender),
numProcs: runtime.NumCPU(),
}
@@ -46,18 +43,17 @@ func (d *DownTrackSpreader) GetDownTracks() []TrackSender {
d.downTrackMu.RLock()
defer d.downTrackMu.RUnlock()
return d.downTracks
return d.downTracksShadow
}
func (d *DownTrackSpreader) ResetAndGetDownTracks() []TrackSender {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
downTracks := d.downTracks
downTracks := d.downTracksShadow
d.index = make(map[livekit.ParticipantID]int)
d.free = make(map[int]struct{})
d.downTracks = make([]TrackSender, 0)
d.downTracks = make(map[livekit.ParticipantID]TrackSender)
d.downTracksShadow = nil
return downTracks
}
@@ -66,52 +62,32 @@ func (d *DownTrackSpreader) Store(ts TrackSender) {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
peerID := ts.PeerID()
for idx := range d.free {
d.index[peerID] = idx
delete(d.free, idx)
d.downTracks[idx] = ts
return
}
d.index[peerID] = len(d.downTracks)
d.downTracks = append(d.downTracks, ts)
d.downTracks[ts.PeerID()] = ts
d.shadowDownTracks()
}
func (d *DownTrackSpreader) Free(peerID livekit.ParticipantID) {
d.downTrackMu.Lock()
defer d.downTrackMu.Unlock()
idx, ok := d.index[peerID]
if !ok {
return
}
delete(d.index, peerID)
d.downTracks[idx] = nil
d.free[idx] = struct{}{}
delete(d.downTracks, peerID)
d.shadowDownTracks()
}
func (d *DownTrackSpreader) HasDownTrack(peerID livekit.ParticipantID) bool {
d.downTrackMu.RLock()
defer d.downTrackMu.RUnlock()
_, ok := d.index[peerID]
_, ok := d.downTracks[peerID]
return ok
}
func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) {
d.downTrackMu.RLock()
downTracks := d.downTracks
numFree := len(d.free)
d.downTrackMu.RUnlock()
if d.params.Threshold == 0 || (len(downTracks)-numFree) < d.params.Threshold {
downTracks := d.GetDownTracks()
if d.params.Threshold == 0 || (len(downTracks)) < d.params.Threshold {
// serial - not enough down tracks for parallelization to outweigh overhead
for _, dt := range downTracks {
if dt != nil {
d.writeRTP(layer, dt, pkt)
}
d.writeRTP(layer, dt, pkt)
}
} else {
// parallel - enables much more efficient multi-core utilization
@@ -134,9 +110,7 @@ func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) {
}
for i := n - step; i < n && i < end; i++ {
if dt := downTracks[i]; dt != nil {
d.writeRTP(layer, dt, pkt)
}
d.writeRTP(layer, downTracks[i], pkt)
}
}
}()
@@ -150,3 +124,10 @@ func (d *DownTrackSpreader) writeRTP(layer int32, dt TrackSender, pkt *buffer.Ex
d.params.Logger.Errorw("failed writing to down track", err)
}
}
func (d *DownTrackSpreader) shadowDownTracks() {
d.downTracksShadow = make([]TrackSender, 0, len(d.downTracks))
for _, dt := range d.downTracks {
d.downTracksShadow = append(d.downTracksShadow, dt)
}
}

View File

@@ -345,17 +345,13 @@ func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) {
func (w *WebRTCReceiver) downTrackLayerChange(layers []int32) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
if dt != nil {
dt.UpTrackLayersChange(layers)
}
dt.UpTrackLayersChange(layers)
}
}
func (w *WebRTCReceiver) downTrackBitrateAvailabilityChange() {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
if dt != nil {
dt.UpTrackBitrateAvailabilityChange()
}
dt.UpTrackBitrateAvailabilityChange()
}
}
@@ -576,9 +572,7 @@ func (w *WebRTCReceiver) closeTracks() {
w.connectionStats.Close()
for _, dt := range w.downTrackSpreader.ResetAndGetDownTracks() {
if dt != nil {
dt.Close()
}
dt.Close()
}
if w.onCloseHandler != nil {