Additional case of subscribing to a closed track (#1465)

When the publisher stops publishing, the individual receivers would close
attached DownTracks first before notifying MediaTrackReceiver callbacks.

This means #1454 does not fix the issue entirely since there is still a window
when we can subscribe to a closing track.
This commit is contained in:
David Zhao
2023-02-23 17:07:16 -08:00
committed by GitHub
parent 3ac2a35c23
commit 34fcf9e496
5 changed files with 31 additions and 1 deletions

View File

@@ -297,10 +297,20 @@ func (t *MediaTrackReceiver) OnVideoLayerUpdate(f func(layers []*livekit.VideoLa
func (t *MediaTrackReceiver) IsOpen() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.state == mediaTrackReceiverStateOpen
if t.state != mediaTrackReceiverStateOpen {
return false
}
// If any one of the receivers has entered closed state, we would not consider the track open
for _, receiver := range t.receivers {
if receiver.IsClosed() {
return false
}
}
return true
}
func (t *MediaTrackReceiver) SetClosing() {
t.params.Logger.Infow("setting track to closing")
t.lock.Lock()
defer t.lock.Unlock()
if t.state == mediaTrackReceiverStateOpen {

View File

@@ -280,6 +280,13 @@ func (d *DummyReceiver) TrackInfo() *livekit.TrackInfo {
return nil
}
func (d *DummyReceiver) IsClosed() bool {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.IsClosed()
}
return false
}
func (d *DummyReceiver) GetPrimaryReceiverForRed() sfu.TrackReceiver {
// DummyReceiver used for video, it should not have RED codec
return d

View File

@@ -39,6 +39,7 @@ type TrackReceiver interface {
StreamID() string
Codec() webrtc.RTPCodecParameters
HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
IsClosed() bool
ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
GetLayeredBitrate() ([]int32, Bitrates)
@@ -249,6 +250,10 @@ func (w *WebRTCReceiver) GetConnectionScore() float32 {
return w.connectionStats.GetScore()
}
func (w *WebRTCReceiver) IsClosed() bool {
return w.closed.Load()
}
func (w *WebRTCReceiver) SetRTT(rtt uint32) {
w.bufferMu.Lock()
if w.rtt == rtt {

View File

@@ -88,6 +88,10 @@ func (r *RedPrimaryReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
r.downTrackSpreader.Free(subscriberID)
}
func (r *RedPrimaryReceiver) IsClosed() bool {
return r.closed.Load()
}
func (r *RedPrimaryReceiver) CanClose() bool {
return r.closed.Load() || r.downTrackSpreader.DownTrackCount() == 0
}

View File

@@ -89,6 +89,10 @@ func (r *RedReceiver) CanClose() bool {
return r.closed.Load() || r.downTrackSpreader.DownTrackCount() == 0
}
func (r *RedReceiver) IsClosed() bool {
return r.closed.Load()
}
func (r *RedReceiver) Close() {
r.closed.Store(true)
for _, dt := range r.downTrackSpreader.ResetAndGetDownTracks() {