mirror of
https://github.com/livekit/livekit.git
synced 2026-04-25 17:42:07 +00:00
Supervisor tweaks (#1017)
This commit is contained in:
@@ -1564,7 +1564,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
|
||||
} else {
|
||||
p.pendingTracks[req.Cid].trackInfos = append(p.pendingTracks[req.Cid].trackInfos, ti)
|
||||
}
|
||||
p.params.Logger.Debugw("pending track queued", "track", ti.String(), "request", req.String())
|
||||
p.params.Logger.Debugw("pending track queued", "trackID", ti.Sid, "track", ti.String(), "request", req.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1572,7 +1572,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
|
||||
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
|
||||
|
||||
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
|
||||
p.params.Logger.Debugw("pending track added", "track", ti.String(), "request", req.String())
|
||||
p.params.Logger.Debugw("pending track added", "trackID", ti.Sid, "track", ti.String(), "request", req.String())
|
||||
return ti
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
publishWaitDuration = 10 * time.Second
|
||||
publishWaitDuration = 30 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -37,6 +37,8 @@ type PublicationMonitor struct {
|
||||
publishedTrack types.LocalMediaTrack
|
||||
isMuted bool
|
||||
unmutedAt time.Time
|
||||
|
||||
lastError error
|
||||
}
|
||||
|
||||
func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor {
|
||||
@@ -108,6 +110,12 @@ func (p *PublicationMonitor) clearPublishedTrack(pubTrack types.LocalMediaTrack)
|
||||
|
||||
func (p *PublicationMonitor) Check() error {
|
||||
p.lock.RLock()
|
||||
if p.lastError != nil {
|
||||
p.lock.RUnlock()
|
||||
// return an error only once
|
||||
return nil
|
||||
}
|
||||
|
||||
var pub *publish
|
||||
if p.desiredPublishes.Len() > 0 {
|
||||
pub = p.desiredPublishes.Front().(*publish)
|
||||
@@ -123,6 +131,10 @@ func (p *PublicationMonitor) Check() error {
|
||||
|
||||
if pub.isStart && !isMuted && !unmutedAt.IsZero() && time.Since(unmutedAt) > publishWaitDuration {
|
||||
// timed out waiting for publish
|
||||
p.lock.Lock()
|
||||
p.lastError = errPublishTimeout
|
||||
p.lock.Unlock()
|
||||
|
||||
return errPublishTimeout
|
||||
}
|
||||
|
||||
@@ -154,5 +166,7 @@ func (p *PublicationMonitor) update() {
|
||||
p.desiredPublishes.PushFront(pub)
|
||||
return
|
||||
}
|
||||
|
||||
p.lastError = nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,13 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
errTransitionTimeout = errors.New("transition time out")
|
||||
errSubscribeTimeout = errors.New("subscribe time out")
|
||||
errUnsubscribeTimeout = errors.New("unsubscribe time out")
|
||||
)
|
||||
|
||||
type transition struct {
|
||||
isSubscribed bool
|
||||
at time.Time
|
||||
isSubscribe bool
|
||||
at time.Time
|
||||
}
|
||||
|
||||
type SubscriptionMonitorParams struct {
|
||||
@@ -36,6 +37,8 @@ type SubscriptionMonitor struct {
|
||||
desiredTransitions deque.Deque
|
||||
|
||||
subscribedTrack types.SubscribedTrack
|
||||
|
||||
lastError error
|
||||
}
|
||||
|
||||
func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor {
|
||||
@@ -57,12 +60,12 @@ func (s *SubscriptionMonitor) PostEvent(ome types.OperationMonitorEvent, omd typ
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) updateSubscription(isSubscribed bool) {
|
||||
func (s *SubscriptionMonitor) updateSubscription(isSubscribe bool) {
|
||||
s.lock.Lock()
|
||||
s.desiredTransitions.PushBack(
|
||||
&transition{
|
||||
isSubscribed: isSubscribed,
|
||||
at: time.Now(),
|
||||
isSubscribe: isSubscribe,
|
||||
at: time.Now(),
|
||||
},
|
||||
)
|
||||
s.update()
|
||||
@@ -90,6 +93,12 @@ func (s *SubscriptionMonitor) clearSubscribedTrack(subTrack types.SubscribedTrac
|
||||
|
||||
func (s *SubscriptionMonitor) Check() error {
|
||||
s.lock.RLock()
|
||||
if s.lastError != nil {
|
||||
s.lock.RUnlock()
|
||||
// return an error only once
|
||||
return nil
|
||||
}
|
||||
|
||||
var tx *transition
|
||||
if s.desiredTransitions.Len() > 0 {
|
||||
tx = s.desiredTransitions.Front().(*transition)
|
||||
@@ -102,7 +111,18 @@ func (s *SubscriptionMonitor) Check() error {
|
||||
|
||||
if time.Since(tx.at) > transitionWaitDuration {
|
||||
// timed out waiting for transition
|
||||
return errTransitionTimeout
|
||||
var err error
|
||||
if tx.isSubscribe {
|
||||
err = errSubscribeTimeout
|
||||
} else {
|
||||
err = errUnsubscribeTimeout
|
||||
}
|
||||
|
||||
s.lock.Lock()
|
||||
s.lastError = err
|
||||
s.lock.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// give more time for transition to happen
|
||||
@@ -127,10 +147,12 @@ func (s *SubscriptionMonitor) update() {
|
||||
return
|
||||
}
|
||||
|
||||
if (tx.isSubscribed && s.subscribedTrack == nil) || (!tx.isSubscribed && s.subscribedTrack != nil) {
|
||||
if (tx.isSubscribe && s.subscribedTrack == nil) || (!tx.isSubscribe && s.subscribedTrack != nil) {
|
||||
// put it back as the condition is not satisfied
|
||||
s.desiredTransitions.PushFront(tx)
|
||||
return
|
||||
}
|
||||
|
||||
s.lastError = nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user