resolve downtrack.bind/close timing issue (#833)

* resolve downtrack.bind/close timing issue

* fix test case
This commit is contained in:
cnderrauber
2022-07-15 14:09:45 +08:00
committed by GitHub
parent a2bf32797b
commit a0578db3ed
4 changed files with 13 additions and 5 deletions
+2 -2
View File
@@ -974,8 +974,8 @@ func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID,
subTrack := p.subscribedTracks[trackID]
if subTrack == nil {
p.lock.Unlock()
p.params.Logger.Warnw("could not find subscribed track", nil, "trackID", trackID)
return errors.New("could not find subscribed track")
p.params.Logger.Infow("could not find subscribed track", "trackID", trackID)
return nil
}
p.lock.Unlock()
+2 -2
View File
@@ -46,7 +46,7 @@ func TestJoinedState(t *testing.T) {
t.Run("should be current time when a participant joins", func(t *testing.T) {
s := time.Now().Unix()
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
require.Equal(t, s, rm.FirstJoinedAt())
require.LessOrEqual(t, s, rm.FirstJoinedAt())
require.Equal(t, int64(0), rm.LastLeftAt())
})
@@ -55,7 +55,7 @@ func TestJoinedState(t *testing.T) {
p0 := rm.GetParticipants()[0]
s := time.Now().Unix()
rm.RemoveParticipant(p0.Identity(), types.ParticipantCloseReasonClientRequestLeave)
require.Equal(t, s, rm.LastLeftAt())
require.LessOrEqual(t, s, rm.LastLeftAt())
})
t.Run("LastLeftAt should not be set when there are still participants in the room", func(t *testing.T) {
+8
View File
@@ -108,6 +108,7 @@ type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
// - closed
// once closed, a DownTrack cannot be re-used.
type DownTrack struct {
bindLock sync.Mutex
logger logger.Logger
id livekit.TrackID
subscriberID livekit.ParticipantID
@@ -260,7 +261,9 @@ func NewDownTrack(
// This asserts that the code requested is supported by the remote peer.
// If so it sets up all the state (SSRC and PayloadType) to have a call
func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
d.bindLock.Lock()
if d.bound.Load() {
d.bindLock.Unlock()
return webrtc.RTPCodecParameters{}, ErrDownTrackAlreadyBound
}
var codec webrtc.RTPCodecParameters
@@ -273,12 +276,14 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
}
if codec.MimeType == "" {
d.bindLock.Unlock()
return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec
}
// if a downtrack is closed before bind, it already unsubscribed from client, don't do subsequent operation and return here.
if d.IsClosed() {
d.logger.Debugw("DownTrack closed before bind")
d.bindLock.Unlock()
return codec, nil
}
@@ -299,6 +304,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.onBind()
}
d.bound.Store(true)
d.bindLock.Unlock()
d.connectionStats.SetTrackSource(d.receiver.TrackSource())
d.connectionStats.Start()
@@ -642,6 +648,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
return
}
d.bindLock.Lock()
d.logger.Infow("close down track", "flushBlankFrame", flush)
if d.bound.Load() {
if d.forwarder != nil {
@@ -670,6 +677,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.receiver.DeleteDownTrack(d.subscriberID)
}
d.bindLock.Unlock()
d.connectionStats.Close()
d.rtpStats.Stop()
d.logger.Infow("rtp stats", "stats", d.rtpStats.ToString())
+1 -1
View File
@@ -372,7 +372,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
}
if w.downTrackSpreader.HasDownTrack(track.SubscriberID()) {
return ErrDownTrackAlreadyExist
w.logger.Infow("subscriberID already exists, replace the downtrack", "subscriberID", track.SubscriberID())
}
if w.Kind() == webrtc.RTPCodecTypeVideo {