diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index c7ca7e30d..20617079f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -7,6 +7,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -96,15 +97,17 @@ func (t *MediaTrack) SdpCid() string { } func (t *MediaTrack) ToProto() *livekit.TrackInfo { - info := t.MediaTrackReceiver.TrackInfo() + info := proto.Clone(t.MediaTrackReceiver.TrackInfo()).(*livekit.TrackInfo) info.Muted = t.IsMuted() info.Simulcast = t.IsSimulcast() layers := t.MediaTrackReceiver.GetVideoLayers() + t.lock.RLock() for _, layer := range layers { if int(layer.Quality) < len(t.layerSSRCs) { layer.Ssrc = t.layerSSRCs[layer.Quality] } } + t.lock.RUnlock() info.Layers = layers return info @@ -135,8 +138,11 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra } }) + isNew := false t.lock.Lock() if t.Receiver() == nil { + isNew = true + wr := sfu.NewWebRTCReceiver( receiver, track, @@ -165,12 +171,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra wr.OnStatsUpdate(func(_ *sfu.WebRTCReceiver, stat *livekit.AnalyticsStat) { t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat) }) - t.params.Telemetry.TrackPublished( - context.Background(), - t.PublisherID(), - t.PublisherIdentity(), - t.ToProto(), - ) t.buffer = buff @@ -188,15 +188,29 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra if t.IsSimulcast() { layer := sfu.RidToLayer(track.RID()) + t.lock.Lock() if int(layer) < len(t.layerSSRCs) { t.layerSSRCs[layer] = uint32(track.SSRC()) } + t.lock.Unlock() + } + + if isNew { + t.params.Telemetry.TrackPublished( + context.Background(), + t.PublisherID(), + t.PublisherIdentity(), + t.ToProto(), + ) } buff.Bind(receiver.GetParameters(), track.Codec().RTPCodecCapability) } func (t *MediaTrack) TrySetSimulcastSSRC(layer uint8, ssrc uint32) { + t.lock.Lock() + defer t.lock.Unlock() + if int(layer) < len(t.layerSSRCs) && t.layerSSRCs[layer] == 0 { t.layerSSRCs[layer] = ssrc } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index a80cedc75..a1cb074de 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -8,6 +8,7 @@ import ( "github.com/pion/rtcp" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -31,7 +32,7 @@ type MediaTrackReceiver struct { lock sync.RWMutex receiver sfu.TrackReceiver - layerDimensions sync.Map // livekit.VideoQuality => *livekit.VideoLayer + layerDimensions map[livekit.VideoQuality]*livekit.VideoLayer // track audio fraction lost downFracLostLock sync.Mutex @@ -60,7 +61,8 @@ type MediaTrackReceiverParams struct { func NewMediaTrackReceiver(params MediaTrackReceiverParams) *MediaTrackReceiver { t := &MediaTrackReceiver{ - params: params, + params: params, + layerDimensions: make(map[livekit.VideoQuality]*livekit.VideoLayer), } t.MediaTrackSubscriptions = NewMediaTrackSubscriptions(MediaTrackSubscriptionsParams{ @@ -234,9 +236,11 @@ func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo { } func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) { + t.lock.Lock() for _, layer := range layers { - t.layerDimensions.Store(layer.Quality, layer) + t.layerDimensions[layer.Quality] = layer } + t.lock.Unlock() t.MediaTrackSubscriptions.UpdateVideoLayers() if t.onVideoLayerUpdate != nil { @@ -248,12 +252,11 @@ func (t *MediaTrackReceiver) UpdateVideoLayers(layers []*livekit.VideoLayer) { func (t *MediaTrackReceiver) GetVideoLayers() []*livekit.VideoLayer { layers := make([]*livekit.VideoLayer, 0) - t.layerDimensions.Range(func(q, val interface{}) bool { - if layer, ok := val.(*livekit.VideoLayer); ok { - layers = append(layers, layer) - } - return true - }) + t.lock.RLock() + for _, layer := range t.layerDimensions { + layers = append(layers, proto.Clone(layer).(*livekit.VideoLayer)) + } + t.lock.RUnlock() return layers } @@ -276,12 +279,11 @@ func (t *MediaTrackReceiver) GetQualityForDimension(width, height uint32) liveki // default sizes representing qualities low - high layerSizes := []uint32{180, 360, origSize} var providedSizes []uint32 - t.layerDimensions.Range(func(_, val interface{}) bool { - if layer, ok := val.(*livekit.VideoLayer); ok { - providedSizes = append(providedSizes, layer.Height) - } - return true - }) + t.lock.RLock() + for _, layer := range t.layerDimensions { + providedSizes = append(providedSizes, layer.Height) + } + t.lock.RUnlock() if len(providedSizes) > 0 { layerSizes = providedSizes // comparing height always diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1bace48ef..982a79d26 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -865,9 +865,10 @@ func (r *Room) audioUpdateWorker() { // changedSpeakers need to include previous speakers that are no longer speaking for sid, speaker := range lastActiveMap { if nextActiveMap[sid] == nil { - speaker.Level = 0 - speaker.Active = false - changedSpeakers = append(changedSpeakers, speaker) + inactiveSpeaker := proto.Clone(speaker).(*livekit.SpeakerInfo) + inactiveSpeaker.Level = 0 + inactiveSpeaker.Active = false + changedSpeakers = append(changedSpeakers, inactiveSpeaker) } } diff --git a/pkg/sfu/downtrackspreader.go b/pkg/sfu/downtrackspreader.go index f7f1e828e..3d0e2a5f9 100644 --- a/pkg/sfu/downtrackspreader.go +++ b/pkg/sfu/downtrackspreader.go @@ -103,10 +103,10 @@ func (d *DownTrackSpreader) HasDownTrack(peerID livekit.ParticipantID) bool { func (d *DownTrackSpreader) Broadcast(layer int32, pkt *buffer.ExtPacket) { d.downTrackMu.RLock() downTracks := d.downTracks - free := d.free + numFree := len(d.free) d.downTrackMu.RUnlock() - if d.params.Threshold == 0 || len(downTracks)-len(free) < d.params.Threshold { + if d.params.Threshold == 0 || (len(downTracks)-numFree) < d.params.Threshold { // serial - not enough down tracks for parallelization to outweigh overhead for _, dt := range downTracks { if dt != nil { diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 0a54fb339..604d9a26d 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -116,7 +116,12 @@ func (s *StreamTrackerManager) AddTracker(layer int32) *StreamTracker { break } } - if !exempt || layer > s.maxExpectedLayer { + + s.lock.RLock() + maxExpectedLayer := s.maxExpectedLayer + s.lock.RUnlock() + + if !exempt || layer > maxExpectedLayer { s.removeAvailableLayer(layer) } else { s.logger.Debugw("not removing exempt layer", "layer", layer) @@ -294,7 +299,9 @@ func (s *StreamTrackerManager) addAvailableLayer(layer int32) { s.availableLayers = append(s.availableLayers, layer) sort.Slice(s.availableLayers, func(i, j int) bool { return s.availableLayers[i] < s.availableLayers[j] }) - layers := s.availableLayers + + var layers []int32 + layers = append(layers, s.availableLayers...) s.lock.Unlock() s.logger.Debugw("available layers changed - layer seen", "added", layer, "layers", layers)