From 52a4c8e8c682f79a9fea8f6c439f0020855cea4e Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 4 Jan 2023 09:49:53 +0800 Subject: [PATCH] WIP --- pkg/sfu/audioselection/forwarder.go | 7 ++++++- pkg/sfu/downtrack.go | 13 ++++++++----- pkg/sfu/receiver.go | 1 + 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/sfu/audioselection/forwarder.go b/pkg/sfu/audioselection/forwarder.go index 12f6ac3a4..4d1d3cbed 100644 --- a/pkg/sfu/audioselection/forwarder.go +++ b/pkg/sfu/audioselection/forwarder.go @@ -71,6 +71,11 @@ func NewSelectionForwarder(params SelectionForwarderParams) *SelectionForwarder } func (f *SelectionForwarder) Start() { + f.lock.Lock() + for _, dt := range f.downtracks { + dt.SetConnected() + } + f.lock.Unlock() go f.process() } @@ -189,7 +194,6 @@ func (f *SelectionForwarder) updateForward() { } func (f *SelectionForwarder) activeSource(source *sourceInfo) bool { - f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID()) if len(f.idleDowntracks) == 0 { if len(f.downtracks) < f.params.ActiveDowntracks { dt := f.params.RequestDownTrack(source.receiver) @@ -205,6 +209,7 @@ func (f *SelectionForwarder) activeSource(source *sourceInfo) bool { f.idleDowntracks = f.idleDowntracks[1:] source.downtrack.ResetReceiver(source.receiver) source.receiver.AddDownTrack(source.downtrack) + f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID(), "downtrack", source.downtrack.ID()) return true } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 3a24f03e6..064676177 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -149,6 +149,7 @@ type DownTrack struct { rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter absSendTimeID int dependencyDescriptorID int + receiverLock sync.RWMutex receiver TrackReceiver transceiver *webrtc.RTPTransceiver writeStream webrtc.TrackLocalWriter @@ -240,7 +241,9 @@ func NewDownTrack( kind: kind, codec: codecs[0].RTPCodecCapability, } + d.receiverLock.Lock() d.receiver = r + d.receiverLock.Unlock() d.forwarder = NewForwarder(d.kind, d.logger) d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -272,15 +275,15 @@ func NewDownTrack( } func (d *DownTrack) ResetReceiver(r TrackReceiver) { - d.bindLock.Lock() + d.receiverLock.Lock() d.receiver = r - d.bindLock.Unlock() + d.receiverLock.Unlock() // TODO: log stats } func (d *DownTrack) getReceiver() TrackReceiver { - d.bindLock.Lock() - defer d.bindLock.Unlock() + d.receiverLock.RLock() + defer d.receiverLock.RUnlock() return d.receiver } @@ -727,7 +730,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.bound.Store(false) d.logger.Debugw("closing sender", "kind", d.kind) - d.receiver.DeleteDownTrack(d.subscriberID) + d.getReceiver().DeleteDownTrack(d.subscriberID) if d.rtcpReader != nil { logger.Infow("downtrack close rtcp reader") diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 5118506f6..e8aa9397a 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -366,6 +366,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { track.TrackInfoAvailable() w.downTrackSpreader.Store(track) + w.logger.Debugw("added downtrack", "downtrack", track.ID()) return nil }