diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8dbb3b8a5..163cefc0c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -974,8 +974,8 @@ func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID, subTrack := p.subscribedTracks[trackID] if subTrack == nil { p.lock.Unlock() - p.params.Logger.Warnw("could not find subscribed track", nil, "trackID", trackID) - return errors.New("could not find subscribed track") + p.params.Logger.Infow("could not find subscribed track", "trackID", trackID) + return nil } p.lock.Unlock() diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index b70336938..8e1e3eb82 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -46,7 +46,7 @@ func TestJoinedState(t *testing.T) { t.Run("should be current time when a participant joins", func(t *testing.T) { s := time.Now().Unix() rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) - require.Equal(t, s, rm.FirstJoinedAt()) + require.LessOrEqual(t, s, rm.FirstJoinedAt()) require.Equal(t, int64(0), rm.LastLeftAt()) }) @@ -55,7 +55,7 @@ func TestJoinedState(t *testing.T) { p0 := rm.GetParticipants()[0] s := time.Now().Unix() rm.RemoveParticipant(p0.Identity(), types.ParticipantCloseReasonClientRequestLeave) - require.Equal(t, s, rm.LastLeftAt()) + require.LessOrEqual(t, s, rm.LastLeftAt()) }) t.Run("LastLeftAt should not be set when there are still participants in the room", func(t *testing.T) { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b79214adf..fb2dff69a 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -108,6 +108,7 @@ type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) // - closed // once closed, a DownTrack cannot be re-used. type DownTrack struct { + bindLock sync.Mutex logger logger.Logger id livekit.TrackID subscriberID livekit.ParticipantID @@ -260,7 +261,9 @@ func NewDownTrack( // This asserts that the code requested is supported by the remote peer. // If so it sets up all the state (SSRC and PayloadType) to have a call func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { + d.bindLock.Lock() if d.bound.Load() { + d.bindLock.Unlock() return webrtc.RTPCodecParameters{}, ErrDownTrackAlreadyBound } var codec webrtc.RTPCodecParameters @@ -273,12 +276,14 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, } if codec.MimeType == "" { + d.bindLock.Unlock() return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec } // if a downtrack is closed before bind, it already unsubscribed from client, don't do subsequent operation and return here. if d.IsClosed() { d.logger.Debugw("DownTrack closed before bind") + d.bindLock.Unlock() return codec, nil } @@ -299,6 +304,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.onBind() } d.bound.Store(true) + d.bindLock.Unlock() d.connectionStats.SetTrackSource(d.receiver.TrackSource()) d.connectionStats.Start() @@ -642,6 +648,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { return } + d.bindLock.Lock() d.logger.Infow("close down track", "flushBlankFrame", flush) if d.bound.Load() { if d.forwarder != nil { @@ -670,6 +677,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.receiver.DeleteDownTrack(d.subscriberID) } + d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.logger.Infow("rtp stats", "stats", d.rtpStats.ToString()) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 6a0b9c23e..cbfe061eb 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -372,7 +372,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { } if w.downTrackSpreader.HasDownTrack(track.SubscriberID()) { - return ErrDownTrackAlreadyExist + w.logger.Infow("subscriberID already exists, replace the downtrack", "subscriberID", track.SubscriberID()) } if w.Kind() == webrtc.RTPCodecTypeVideo {