diff --git a/.github/workflows/buildtest.yaml b/.github/workflows/buildtest.yaml index 9c3f5b740..e3cfb326f 100644 --- a/.github/workflows/buildtest.yaml +++ b/.github/workflows/buildtest.yaml @@ -57,7 +57,7 @@ jobs: - name: Test run: | set -euo pipefail - go test -json -v ./... 2>&1 | tee /tmp/gotest.log | gotestfmt + MallocNanoZone=0 go test -race -json -v ./... 2>&1 | tee /tmp/gotest.log | gotestfmt # Upload the original go test log as an artifact for later review. - name: Upload test log diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 403b5986d..4444cd3a6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -559,6 +559,9 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { } func (b *Buffer) SetLastFractionLostReport(lost uint8) { + b.Lock() + defer b.Unlock() + b.lastFractionLostToReport = lost } diff --git a/pkg/sfu/downtrackspreader.go b/pkg/sfu/downtrackspreader.go index 3d0e2a5f9..c407e208f 100644 --- a/pkg/sfu/downtrackspreader.go +++ b/pkg/sfu/downtrackspreader.go @@ -19,19 +19,16 @@ type DownTrackSpreaderParams struct { type DownTrackSpreader struct { params DownTrackSpreaderParams - downTrackMu sync.RWMutex - downTracks []TrackSender - index map[livekit.ParticipantID]int - free map[int]struct{} - numProcs int + downTrackMu sync.RWMutex + downTracks map[livekit.ParticipantID]TrackSender + downTracksShadow []TrackSender + numProcs int } func NewDownTrackSpreader(params DownTrackSpreaderParams) *DownTrackSpreader { d := &DownTrackSpreader{ params: params, - downTracks: make([]TrackSender, 0), - index: make(map[livekit.ParticipantID]int), - free: make(map[int]struct{}), + downTracks: make(map[livekit.ParticipantID]TrackSender), numProcs: runtime.NumCPU(), } @@ -46,18 +43,17 @@ func (d *DownTrackSpreader) GetDownTracks() []TrackSender { d.downTrackMu.RLock() defer d.downTrackMu.RUnlock() - return d.downTracks + return d.downTracksShadow } func (d *DownTrackSpreader) ResetAndGetDownTracks() []TrackSender { d.downTrackMu.Lock() defer d.downTrackMu.Unlock() - downTracks := d.downTracks + downTracks := d.downTracksShadow - d.index = make(map[livekit.ParticipantID]int) - d.free = make(map[int]struct{}) - d.downTracks = make([]TrackSender, 0) + d.downTracks = make(map[livekit.ParticipantID]TrackSender) + d.downTracksShadow = nil return downTracks } @@ -66,52 +62,32 @@ func (d *DownTrackSpreader) Store(ts TrackSender) { d.downTrackMu.Lock() defer d.downTrackMu.Unlock() - peerID := ts.PeerID() - for idx := range d.free { - d.index[peerID] = idx - delete(d.free, idx) - d.downTracks[idx] = ts - return - } - - d.index[peerID] = len(d.downTracks) - d.downTracks = append(d.downTracks, ts) + d.downTracks[ts.PeerID()] = ts + d.shadowDownTracks() } func (d *DownTrackSpreader) Free(peerID livekit.ParticipantID) { d.downTrackMu.Lock() defer d.downTrackMu.Unlock() - idx, ok := d.index[peerID] - if !ok { - return - } - - delete(d.index, peerID) - d.downTracks[idx] = nil - d.free[idx] = struct{}{} + delete(d.downTracks, peerID) + d.shadowDownTracks() } func (d *DownTrackSpreader) HasDownTrack(peerID livekit.ParticipantID) bool { d.downTrackMu.RLock() defer d.downTrackMu.RUnlock() - _, ok := d.index[peerID] + _, ok := d.downTracks[peerID] return ok } func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) { - d.downTrackMu.RLock() - downTracks := d.downTracks - numFree := len(d.free) - d.downTrackMu.RUnlock() - - if d.params.Threshold == 0 || (len(downTracks)-numFree) < d.params.Threshold { + downTracks := d.GetDownTracks() + if d.params.Threshold == 0 || (len(downTracks)) < d.params.Threshold { // serial - not enough down tracks for parallelization to outweigh overhead for _, dt := range downTracks { - if dt != nil { - d.writeRTP(layer, dt, pkt) - } + d.writeRTP(layer, dt, pkt) } } else { // parallel - enables much more efficient multi-core utilization @@ -134,9 +110,7 @@ func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) { } for i := n - step; i < n && i < end; i++ { - if dt := downTracks[i]; dt != nil { - d.writeRTP(layer, dt, pkt) - } + d.writeRTP(layer, downTracks[i], pkt) } } }() @@ -150,3 +124,10 @@ func (d *DownTrackSpreader) writeRTP(layer int32, dt TrackSender, pkt *buffer.Ex d.params.Logger.Errorw("failed writing to down track", err) } } + +func (d *DownTrackSpreader) shadowDownTracks() { + d.downTracksShadow = make([]TrackSender, 0, len(d.downTracks)) + for _, dt := range d.downTracks { + d.downTracksShadow = append(d.downTracksShadow, dt) + } +} diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 83f459345..a40ab23b0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -345,17 +345,13 @@ func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32) { func (w *WebRTCReceiver) downTrackLayerChange(layers []int32) { for _, dt := range w.downTrackSpreader.GetDownTracks() { - if dt != nil { - dt.UpTrackLayersChange(layers) - } + dt.UpTrackLayersChange(layers) } } func (w *WebRTCReceiver) downTrackBitrateAvailabilityChange() { for _, dt := range w.downTrackSpreader.GetDownTracks() { - if dt != nil { - dt.UpTrackBitrateAvailabilityChange() - } + dt.UpTrackBitrateAvailabilityChange() } } @@ -576,9 +572,7 @@ func (w *WebRTCReceiver) closeTracks() { w.connectionStats.Close() for _, dt := range w.downTrackSpreader.ResetAndGetDownTracks() { - if dt != nil { - dt.Close() - } + dt.Close() } if w.onCloseHandler != nil {