diff --git a/go.mod b/go.mod index 3cb4bb11b..39fb55664 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a - github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e + github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7 github.com/livekit/psrpc v0.2.5 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 diff --git a/go.sum b/go.sum index 036f4ba29..96730d68d 100644 --- a/go.sum +++ b/go.sum @@ -234,8 +234,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e h1:T+qUuDHioL5Q5Gzjun9tB65oaC9+zWmeWlcvpG+iilc= -github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= +github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7 h1:PSSiAMb0XmBrhA8xZzpnf9q99Cl64RmCbL7KzmOMb9s= +github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/psrpc v0.2.5 h1:+EZS78MGdBZxzCUwinDQ6pOeqPDURisrGtfyyqwUDSI= github.com/livekit/psrpc v0.2.5/go.mod h1:DyphtRRWvcIuCaldYg9VGpwGhu/HiKmNcysgpN6xKrM= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3105d174c..db98a89bb 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -450,7 +450,7 @@ func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant)) // HandleOffer an offer from remote participant, used when clients make the initial connection func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { - p.params.Logger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER) + p.params.Logger.Infow("received offer", "transport", livekit.SignalTarget_PUBLISHER) shouldPend := false if p.MigrateState() == types.MigrateStateInit { shouldPend = true @@ -464,7 +464,7 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { // HandleAnswer handles a client answer response, with subscriber PC, server initiates the // offer and client answers func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { - p.params.Logger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) + p.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) /* from server received join request to client answer * 1. server send join response & offer @@ -478,7 +478,7 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { } func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error { - p.params.Logger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER) + p.params.Logger.Infow("sending answer", "transport", livekit.SignalTarget_PUBLISHER) answer = p.configurePublisherAnswer(answer) if err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ @@ -1074,7 +1074,7 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) { // when the server has an offer for participant func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) error { - p.params.Logger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER) + p.params.Logger.Infow("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER) return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index b0ad731e7..c488a10ac 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -35,9 +35,10 @@ import ( var ( reconcileInterval = 3 * time.Second // amount of time to give up if a track or publisher isn't found - notFoundTimeout = 5 * time.Second + // giving this a lot of time because during migrations the user could take a lot of time to resume + notFoundTimeout = 20 * time.Second // amount of time to try otherwise before flagging subscription as failed - subscriptionTimeout = 10 * time.Second + subscriptionTimeout = 20 * time.Second ) type SubscriptionManagerParams struct { @@ -295,7 +296,7 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) { // from it. this is the *only* case we'd change desired state if s.durationSinceStart() > notFoundTimeout { s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true) - s.logger.Infow("unsubscribing track since track isn't available") + s.logger.Infow("unsubscribing track since track isn't available", "error", err) s.setDesired(false) m.queueReconcile(s.trackID) } @@ -507,7 +508,6 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w go changedCB(publisherID, false) } - subTrack.OnClose(nil) go m.params.OnTrackUnsubscribed(subTrack) // trigger to decrement unsubscribed counter as long as track has been bound @@ -643,6 +643,7 @@ func (s *trackSubscription) isDesired() bool { func (s *trackSubscription) setSubscribedTrack(track types.SubscribedTrack) { s.lock.Lock() + oldTrack := s.subscribedTrack s.subscribedTrack = track s.bound = false settings := s.settings @@ -652,6 +653,9 @@ func (s *trackSubscription) setSubscribedTrack(track types.SubscribedTrack) { s.logger.Debugw("restoring subscriber settings", "settings", settings) track.UpdateSubscriberSettings(settings) } + if oldTrack != nil { + oldTrack.OnClose(nil) + } } func (s *trackSubscription) getSubscribedTrack() types.SubscribedTrack { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index bf1f11d6c..2117ca265 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -599,7 +599,7 @@ func (t *PCTransport) AddTrack(trackLocal webrtc.TrackLocal, params types.AddTra } } - // if never negotiated with client, can't reuse transeiver for track not subscribed before migration + // if never negotiated with client, can't reuse transceiver for track not subscribed before migration if !canReuse { return t.AddTransceiverFromTrack(trackLocal, params) }