Increase subscription manager notFound timeout (#1358)

Giving it more time before triggering the nuclear option of unsubscribing
from the track.
This commit is contained in:
David Zhao
2023-01-31 00:52:12 -08:00
committed by GitHub
parent 80c647ef15
commit 2048cfbfc7
5 changed files with 16 additions and 12 deletions

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a
github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e
github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7
github.com/livekit/psrpc v0.2.5
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3

4
go.sum
View File

@@ -234,8 +234,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e h1:T+qUuDHioL5Q5Gzjun9tB65oaC9+zWmeWlcvpG+iilc=
github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7 h1:PSSiAMb0XmBrhA8xZzpnf9q99Cl64RmCbL7KzmOMb9s=
github.com/livekit/protocol v1.3.3-0.20230131012249-9987dca3a3e7/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/psrpc v0.2.5 h1:+EZS78MGdBZxzCUwinDQ6pOeqPDURisrGtfyyqwUDSI=
github.com/livekit/psrpc v0.2.5/go.mod h1:DyphtRRWvcIuCaldYg9VGpwGhu/HiKmNcysgpN6xKrM=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=

View File

@@ -450,7 +450,7 @@ func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant))
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
p.params.Logger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER)
p.params.Logger.Infow("received offer", "transport", livekit.SignalTarget_PUBLISHER)
shouldPend := false
if p.MigrateState() == types.MigrateStateInit {
shouldPend = true
@@ -464,7 +464,7 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
p.params.Logger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER)
p.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER)
/* from server received join request to client answer
* 1. server send join response & offer
@@ -478,7 +478,7 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
}
func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error {
p.params.Logger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER)
p.params.Logger.Infow("sending answer", "transport", livekit.SignalTarget_PUBLISHER)
answer = p.configurePublisherAnswer(answer)
if err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
@@ -1074,7 +1074,7 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
// when the server has an offer for participant
func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) error {
p.params.Logger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER)
p.params.Logger.Infow("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER)
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),

View File

@@ -35,9 +35,10 @@ import (
var (
reconcileInterval = 3 * time.Second
// amount of time to give up if a track or publisher isn't found
notFoundTimeout = 5 * time.Second
// giving this a lot of time because during migrations the user could take a lot of time to resume
notFoundTimeout = 20 * time.Second
// amount of time to try otherwise before flagging subscription as failed
subscriptionTimeout = 10 * time.Second
subscriptionTimeout = 20 * time.Second
)
type SubscriptionManagerParams struct {
@@ -295,7 +296,7 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) {
// from it. this is the *only* case we'd change desired state
if s.durationSinceStart() > notFoundTimeout {
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
s.logger.Infow("unsubscribing track since track isn't available")
s.logger.Infow("unsubscribing track since track isn't available", "error", err)
s.setDesired(false)
m.queueReconcile(s.trackID)
}
@@ -507,7 +508,6 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, w
go changedCB(publisherID, false)
}
subTrack.OnClose(nil)
go m.params.OnTrackUnsubscribed(subTrack)
// trigger to decrement unsubscribed counter as long as track has been bound
@@ -643,6 +643,7 @@ func (s *trackSubscription) isDesired() bool {
func (s *trackSubscription) setSubscribedTrack(track types.SubscribedTrack) {
s.lock.Lock()
oldTrack := s.subscribedTrack
s.subscribedTrack = track
s.bound = false
settings := s.settings
@@ -652,6 +653,9 @@ func (s *trackSubscription) setSubscribedTrack(track types.SubscribedTrack) {
s.logger.Debugw("restoring subscriber settings", "settings", settings)
track.UpdateSubscriberSettings(settings)
}
if oldTrack != nil {
oldTrack.OnClose(nil)
}
}
func (s *trackSubscription) getSubscribedTrack() types.SubscribedTrack {

View File

@@ -599,7 +599,7 @@ func (t *PCTransport) AddTrack(trackLocal webrtc.TrackLocal, params types.AddTra
}
}
// if never negotiated with client, can't reuse transeiver for track not subscribed before migration
// if never negotiated with client, can't reuse transceiver for track not subscribed before migration
if !canReuse {
return t.AddTransceiverFromTrack(trackLocal, params)
}