Fix some races reported by go -race (#706)

* Fix some races reported by go -race

* Avoid copy
This commit is contained in:
Raja Subramanian
2022-05-23 11:51:13 +05:30
committed by GitHub
parent cc5b12709e
commit 96b095504b
5 changed files with 53 additions and 29 deletions
+21 -7
View File
@@ -7,6 +7,7 @@ import (
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -96,15 +97,17 @@ func (t *MediaTrack) SdpCid() string {
}
func (t *MediaTrack) ToProto() *livekit.TrackInfo {
info := t.MediaTrackReceiver.TrackInfo()
info := proto.Clone(t.MediaTrackReceiver.TrackInfo()).(*livekit.TrackInfo)
info.Muted = t.IsMuted()
info.Simulcast = t.IsSimulcast()
layers := t.MediaTrackReceiver.GetVideoLayers()
t.lock.RLock()
for _, layer := range layers {
if int(layer.Quality) < len(t.layerSSRCs) {
layer.Ssrc = t.layerSSRCs[layer.Quality]
}
}
t.lock.RUnlock()
info.Layers = layers
return info
@@ -135,8 +138,11 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
}
})
isNew := false
t.lock.Lock()
if t.Receiver() == nil {
isNew = true
wr := sfu.NewWebRTCReceiver(
receiver,
track,
@@ -165,12 +171,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
wr.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) {
t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat)
})
t.params.Telemetry.TrackPublished(
context.Background(),
t.PublisherID(),
t.PublisherIdentity(),
t.ToProto(),
)
t.buffer = buff
@@ -188,15 +188,29 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
if t.IsSimulcast() {
layer := sfu.RidToLayer(track.RID())
t.lock.Lock()
if int(layer) < len(t.layerSSRCs) {
t.layerSSRCs[layer] = uint32(track.SSRC())
}
t.lock.Unlock()
}
if isNew {
t.params.Telemetry.TrackPublished(
context.Background(),
t.PublisherID(),
t.PublisherIdentity(),
t.ToProto(),
)
}
buff.Bind(receiver.GetParameters(), track.Codec().RTPCodecCapability)
}
func (t *MediaTrack) TrySetSimulcastSSRC(layer uint8, ssrc uint32) {
t.lock.Lock()
defer t.lock.Unlock()
if int(layer) < len(t.layerSSRCs) && t.layerSSRCs[layer] == 0 {
t.layerSSRCs[layer] = ssrc
}
+17 -15
View File
@@ -8,6 +8,7 @@ import (
"github.com/pion/rtcp"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -31,7 +32,7 @@ type MediaTrackReceiver struct {
lock sync.RWMutex
receiver sfu.TrackReceiver
layerDimensions sync.Map // livekit.VideoQuality => *livekit.VideoLayer
layerDimensions map[livekit.VideoQuality]*livekit.VideoLayer
// track audio fraction lost
downFracLostLock sync.Mutex
@@ -60,7 +61,8 @@ type MediaTrackReceiverParams struct {
func NewMediaTrackReceiver(params MediaTrackReceiverParams) *MediaTrackReceiver {
t := &MediaTrackReceiver{
params: params,
params: params,
layerDimensions: make(map[livekit.VideoQuality]*livekit.VideoLayer),
}
t.MediaTrackSubscriptions = NewMediaTrackSubscriptions(MediaTrackSubscriptionsParams{
@@ -234,9 +236,11 @@ func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo {
}
func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) {
t.lock.Lock()
for _, layer := range layers {
t.layerDimensions.Store(layer.Quality, layer)
t.layerDimensions[layer.Quality] = layer
}
t.lock.Unlock()
t.MediaTrackSubscriptions.UpdateVideoLayers()
if t.onVideoLayerUpdate != nil {
@@ -248,12 +252,11 @@ func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) {
func (t *MediaTrackReceiver) GetVideoLayers() []*livekit.VideoLayer {
layers := make([]*livekit.VideoLayer, 0)
t.layerDimensions.Range(func(q, val interface{}) bool {
if layer, ok := val.(*livekit.VideoLayer); ok {
layers = append(layers, layer)
}
return true
})
t.lock.RLock()
for _, layer := range t.layerDimensions {
layers = append(layers, proto.Clone(layer).(*livekit.VideoLayer))
}
t.lock.RUnlock()
return layers
}
@@ -276,12 +279,11 @@ func (t *MediaTrackReceiver) GetQualityForDimension(width, height uint32) liveki
// default sizes representing qualities low - high
layerSizes := []uint32{180, 360, origSize}
var providedSizes []uint32
t.layerDimensions.Range(func(_, val interface{}) bool {
if layer, ok := val.(*livekit.VideoLayer); ok {
providedSizes = append(providedSizes, layer.Height)
}
return true
})
t.lock.RLock()
for _, layer := range t.layerDimensions {
providedSizes = append(providedSizes, layer.Height)
}
t.lock.RUnlock()
if len(providedSizes) > 0 {
layerSizes = providedSizes
// comparing height always
+4 -3
View File
@@ -865,9 +865,10 @@ func (r *Room) audioUpdateWorker() {
// changedSpeakers need to include previous speakers that are no longer speaking
for sid, speaker := range lastActiveMap {
if nextActiveMap[sid] == nil {
speaker.Level = 0
speaker.Active = false
changedSpeakers = append(changedSpeakers, speaker)
inactiveSpeaker := proto.Clone(speaker).(*livekit.SpeakerInfo)
inactiveSpeaker.Level = 0
inactiveSpeaker.Active = false
changedSpeakers = append(changedSpeakers, inactiveSpeaker)
}
}
+2 -2
View File
@@ -103,10 +103,10 @@ func (d *DownTrackSpreader) HasDownTrack(peerID livekit.ParticipantID) bool {
func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) {
d.downTrackMu.RLock()
downTracks := d.downTracks
free := d.free
numFree := len(d.free)
d.downTrackMu.RUnlock()
if d.params.Threshold == 0 || len(downTracks)-len(free) < d.params.Threshold {
if d.params.Threshold == 0 || (len(downTracks)-numFree) < d.params.Threshold {
// serial - not enough down tracks for parallelization to outweigh overhead
for _, dt := range downTracks {
if dt != nil {
+9 -2
View File
@@ -116,7 +116,12 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *StreamTracker {
break
}
}
if !exempt || layer > s.maxExpectedLayer {
s.lock.RLock()
maxExpectedLayer := s.maxExpectedLayer
s.lock.RUnlock()
if !exempt || layer > maxExpectedLayer {
s.removeAvailableLayer(layer)
} else {
s.logger.Debugw("not removing exempt layer", "layer", layer)
@@ -294,7 +299,9 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) {
s.availableLayers = append(s.availableLayers, layer)
sort.Slice(s.availableLayers, func(i, j int) bool { return s.availableLayers[i] < s.availableLayers[j] })
layers := s.availableLayers
var layers []int32
layers = append(layers, s.availableLayers...)
s.lock.Unlock()
s.logger.Debugw("available layers changed - layer seen", "added", layer, "layers", layers)