From 924be2fbb7dc0efa07005259270b9dab3c3c04da Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 19 Sep 2022 08:27:51 +0530 Subject: [PATCH] Supervisor tweaks (#1017) --- pkg/rtc/participant.go | 4 +-- pkg/rtc/supervisor/publication_monitor.go | 16 ++++++++- pkg/rtc/supervisor/subscription_monitor.go | 38 +++++++++++++++++----- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 65398b3e8..c578948bf 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1564,7 +1564,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } else { p.pendingTracks[req.Cid].trackInfos = append(p.pendingTracks[req.Cid].trackInfos, ti) } - p.params.Logger.Debugw("pending track queued", "track", ti.String(), "request", req.String()) + p.params.Logger.Debugw("pending track queued", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) return nil } @@ -1572,7 +1572,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} - p.params.Logger.Debugw("pending track added", "track", ti.String(), "request", req.String()) + p.params.Logger.Debugw("pending track added", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) return ti } diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go index 13a7248dd..a0fc171be 100644 --- a/pkg/rtc/supervisor/publication_monitor.go +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -12,7 +12,7 @@ import ( ) const ( - publishWaitDuration = 10 * time.Second + publishWaitDuration = 30 * time.Second ) var ( @@ -37,6 +37,8 @@ type PublicationMonitor struct { publishedTrack types.LocalMediaTrack isMuted bool unmutedAt time.Time + + lastError error } func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor { @@ -108,6 +110,12 @@ func (p *PublicationMonitor) clearPublishedTrack(pubTrack types.LocalMediaTrack) func (p *PublicationMonitor) Check() error { p.lock.RLock() + if p.lastError != nil { + p.lock.RUnlock() + // return an error only once + return nil + } + var pub *publish if p.desiredPublishes.Len() > 0 { pub = p.desiredPublishes.Front().(*publish) @@ -123,6 +131,10 @@ func (p *PublicationMonitor) Check() error { if pub.isStart && !isMuted && !unmutedAt.IsZero() && time.Since(unmutedAt) > publishWaitDuration { // timed out waiting for publish + p.lock.Lock() + p.lastError = errPublishTimeout + p.lock.Unlock() + return errPublishTimeout } @@ -154,5 +166,7 @@ func (p *PublicationMonitor) update() { p.desiredPublishes.PushFront(pub) return } + + p.lastError = nil } } diff --git a/pkg/rtc/supervisor/subscription_monitor.go b/pkg/rtc/supervisor/subscription_monitor.go index 6a4912b19..0edf5d438 100644 --- a/pkg/rtc/supervisor/subscription_monitor.go +++ b/pkg/rtc/supervisor/subscription_monitor.go @@ -16,12 +16,13 @@ const ( ) var ( - errTransitionTimeout = errors.New("transition time out") + errSubscribeTimeout = errors.New("subscribe time out") + errUnsubscribeTimeout = errors.New("unsubscribe time out") ) type transition struct { - isSubscribed bool - at time.Time + isSubscribe bool + at time.Time } type SubscriptionMonitorParams struct { @@ -36,6 +37,8 @@ type SubscriptionMonitor struct { desiredTransitions deque.Deque subscribedTrack types.SubscribedTrack + + lastError error } func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor { @@ -57,12 +60,12 @@ func (s *SubscriptionMonitor) PostEvent(ome types.OperationMonitorEvent, omd typ } } -func (s *SubscriptionMonitor) updateSubscription(isSubscribed bool) { +func (s *SubscriptionMonitor) updateSubscription(isSubscribe bool) { s.lock.Lock() s.desiredTransitions.PushBack( &transition{ - isSubscribed: isSubscribed, - at: time.Now(), + isSubscribe: isSubscribe, + at: time.Now(), }, ) s.update() @@ -90,6 +93,12 @@ func (s *SubscriptionMonitor) clearSubscribedTrack(subTrack types.SubscribedTrac func (s *SubscriptionMonitor) Check() error { s.lock.RLock() + if s.lastError != nil { + s.lock.RUnlock() + // return an error only once + return nil + } + var tx *transition if s.desiredTransitions.Len() > 0 { tx = s.desiredTransitions.Front().(*transition) @@ -102,7 +111,18 @@ func (s *SubscriptionMonitor) Check() error { if time.Since(tx.at) > transitionWaitDuration { // timed out waiting for transition - return errTransitionTimeout + var err error + if tx.isSubscribe { + err = errSubscribeTimeout + } else { + err = errUnsubscribeTimeout + } + + s.lock.Lock() + s.lastError = err + s.lock.Unlock() + + return err } // give more time for transition to happen @@ -127,10 +147,12 @@ func (s *SubscriptionMonitor) update() { return } - if (tx.isSubscribed && s.subscribedTrack == nil) || (!tx.isSubscribed && s.subscribedTrack != nil) { + if (tx.isSubscribe && s.subscribedTrack == nil) || (!tx.isSubscribe && s.subscribedTrack != nil) { // put it back as the condition is not satisfied s.desiredTransitions.PushFront(tx) return } + + s.lastError = nil } }