From 07c43e097259600571fb70d020ea1f47bfb3d7ca Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 15 Sep 2022 11:16:37 +0530 Subject: [PATCH] Supervisor beginnings (#1005) * Remove VP9 from media engine set up. * Remove vp9 from config sample * Supervisor beginnings Eventual goal is to have a reconciler which moves state from actual -> desired. First step along the way is to observe/monitor. The first step even in that is an initial implementation to get feedback on the direction. This PR is a start in that direction - Concept of a supervisor at local participant level - This supervisor will be responsible for periodically monitor actual vs desired (this is the one which will eventually trigger other things to reconcile, but for now it just logs on error) - A new interface `OperationMonitor` which requires two methods o Check() returns an error based on actual vs desired state. o IsIdle() returns bool. Returns true if the monitor is idle. - The supervisor maintains a list of monitors and does periodic check. In the above framework, starting with list of subscriptions/unsubscriptions. There is a new module `SubscriptionMonitor` which checks subscription transitions. A subscription transition is queued on subscribe/unsubscribe. The transition can be satisfied when a subscribedTrack is added OR removed. Error condition is when a transition is not satisfied for 10 seconds. Idle is when the transition queue is empty and subscribedTrack is nil, i. e. the last transition would have been unsubscribe and subscribed track removed (unsubscribe satisfied). The idea is individual monitors can check on different things. Some more things that I am thinking about are - PublishedTrackMonitor - started when an add track happens, satisfied when OnTrack happens, error if `OnTrack` does not fire for a while and track is not muted, idle when there is nothing pending. - PublishedTrackStreamingMonitor - to ensure that a published track is receiving media at the server (accounting for dynacast, mute, etc) - SubscribedTrackStreamingMonitor - to ensure down track is sending data unless muted. * Remove debug * Protect against early casting errors * Adding PublicationMonitor --- pkg/rtc/participant.go | 33 +++- pkg/rtc/signalhandler.go | 12 +- pkg/rtc/supervisor/participant_supervisor.go | 166 ++++++++++++++++++ pkg/rtc/supervisor/publication_monitor.go | 148 ++++++++++++++++ pkg/rtc/supervisor/subscription_monitor.go | 128 ++++++++++++++ pkg/rtc/transport_test.go | 21 ++- pkg/rtc/types/interfaces.go | 5 + .../typesfakes/fake_local_participant.go | 113 ------------ pkg/service/rtcservice.go | 2 +- 9 files changed, 503 insertions(+), 125 deletions(-) create mode 100644 pkg/rtc/supervisor/participant_supervisor.go create mode 100644 pkg/rtc/supervisor/publication_monitor.go create mode 100644 pkg/rtc/supervisor/subscription_monitor.go diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 0c0e5f9e5..aeeab20e3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -18,6 +18,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/supervisor" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" @@ -153,6 +154,8 @@ type ParticipantImpl struct { subscriptionInProgress map[livekit.TrackID]bool subscriptionRequestsQueue map[livekit.TrackID][]SubscribeRequest trackPublisherVersion map[livekit.TrackID]uint32 + + supervisor *supervisor.ParticipantSupervisor } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -179,6 +182,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { subscriptionInProgress: make(map[livekit.TrackID]bool), subscriptionRequestsQueue: make(map[livekit.TrackID][]SubscribeRequest), trackPublisherVersion: make(map[livekit.TrackID]uint32), + supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), } p.version.Store(params.InitialVersion) p.migrateState.Store(types.MigrateStateInit) @@ -623,7 +627,12 @@ func (p *ParticipantImpl) SetMigrateInfo( ) { p.pendingTracksLock.Lock() for _, t := range mediaTracks { - p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{t.GetTrack()}, migrated: true} + ti := t.GetTrack() + + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + + p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} } p.pendingTracksLock.Unlock() @@ -654,6 +663,8 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea }) } + p.supervisor.Stop() + p.UpTrackManager.Close(!sendLeave) p.pendingTracksLock.Lock() @@ -963,6 +974,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { onSubscribedTo := p.onSubscribedTo p.subscribedTracks[subTrack.ID()] = subTrack + p.supervisor.SetSubscribedTrack(subTrack.ID(), subTrack) settings := p.subscribedTracksSettings[subTrack.ID()] p.lock.Unlock() @@ -1004,6 +1016,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) p.trackPublisherVersion[subTrack.ID()] = subTrack.PublisherVersion() delete(p.subscribedTracks, subTrack.ID()) + p.supervisor.ClearSubscribedTrack(subTrack.ID(), subTrack) // remove from subscribed map numRemaining := 0 @@ -1015,7 +1028,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) // // NOTE - // subscribedTrackSettings should not be deleted on removal as it is needed if corresponding publisher migrated + // subscribedTracksSettings should not be deleted on removal as it is needed if corresponding publisher migrated // LK-TODO: find a way to clean these up // @@ -1534,6 +1547,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.pendingTracks[req.Cid] == nil { p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} } else { @@ -1543,6 +1559,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l return nil } + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} p.params.Logger.Debugw("pending track added", "track", ti.String(), "request", req.String()) return ti @@ -1570,6 +1589,8 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro } func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { + p.supervisor.SetPublicationMute(trackID, muted) + track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted) isPending := false @@ -1722,7 +1743,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv }) mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) + // add to published and clean up pending + p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) p.UpTrackManager.AddPublishedTrack(mt) p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:] @@ -1731,6 +1754,8 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv } mt.AddOnClose(func() { + p.supervisor.ClearPublishedTrack(livekit.TrackID(ti.Sid), mt) + // re-use track sid p.pendingTracksLock.Lock() if pti := p.pendingTracks[signalCid]; pti != nil { @@ -2032,6 +2057,8 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(sub types.LocalParticipant) error) { p.params.Logger.Infow("queuing subscribe", "trackID", trackID) + p.supervisor.UpdateSubscription(trackID, true) + p.lock.Lock() p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{ requestType: SubscribeRequestTypeAdd, @@ -2045,6 +2072,8 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func( func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) { p.params.Logger.Infow("queuing unsubscribe", "trackID", trackID) + p.supervisor.UpdateSubscription(trackID, false) + p.lock.Lock() p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{ requestType: SubscribeRequestTypeRemove, diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index a638f40b0..63ff435cc 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -11,9 +11,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant switch msg := req.Message.(type) { case *livekit.SignalRequest_Offer: participant.HandleOffer(FromProtoSessionDescription(msg.Offer)) - case *livekit.SignalRequest_AddTrack: - pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid) - participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Answer: participant.HandleAnswer(FromProtoSessionDescription(msg.Answer)) case *livekit.SignalRequest_Trickle: @@ -23,6 +20,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant return nil } participant.AddICECandidate(candidateInit, msg.Trickle.Target) + case *livekit.SignalRequest_AddTrack: + pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid) + participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Mute: participant.SetTrackMuted(livekit.TrackID(msg.Mute.Sid), msg.Mute.Muted, false) case *livekit.SignalRequest_Subscription: @@ -60,6 +60,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant pLogger.Infow("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting) } + case *livekit.SignalRequest_Leave: + pLogger.Infow("client leaving room") + room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) if err != nil { @@ -67,9 +70,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant "update", msg.UpdateLayers) return nil } - case *livekit.SignalRequest_Leave: - pLogger.Infow("client leaving room") - room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_SubscriptionPermission: err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) if err != nil { diff --git a/pkg/rtc/supervisor/participant_supervisor.go b/pkg/rtc/supervisor/participant_supervisor.go new file mode 100644 index 000000000..687c34d21 --- /dev/null +++ b/pkg/rtc/supervisor/participant_supervisor.go @@ -0,0 +1,166 @@ +package supervisor + +import ( + "sync" + "time" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "go.uber.org/atomic" +) + +const ( + monitorInterval = 3 * time.Second +) + +type ParticipantSupervisorParams struct { + Logger logger.Logger +} + +type ParticipantSupervisor struct { + params ParticipantSupervisorParams + + lock sync.RWMutex + publications map[livekit.TrackID]types.OperationMonitor + subscriptions map[livekit.TrackID]types.OperationMonitor + + isStopped atomic.Bool +} + +func NewParticipantSupervisor(params ParticipantSupervisorParams) *ParticipantSupervisor { + p := &ParticipantSupervisor{ + params: params, + publications: make(map[livekit.TrackID]types.OperationMonitor), + subscriptions: make(map[livekit.TrackID]types.OperationMonitor), + } + + go p.checkState() + + return p +} + +func (p *ParticipantSupervisor) Stop() { + p.isStopped.Store(true) +} + +func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) { + p.lock.Lock() + pm, ok := p.publications[trackID] + if !ok { + pm = NewPublicationMonitor(PublicationMonitorParams{TrackID: trackID, Logger: p.params.Logger}) + p.publications[trackID] = pm + } + pm.(*PublicationMonitor).AddPending() + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetPublicationMute(trackID livekit.TrackID, isMuted bool) { + p.lock.Lock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).SetMute(isMuted) + } + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) { + p.lock.RLock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).SetPublishedTrack(pubTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) ClearPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) { + p.lock.RLock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).ClearPublishedTrack(pubTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) UpdateSubscription(trackID livekit.TrackID, isSubscribed bool) { + p.lock.Lock() + sm, ok := p.subscriptions[trackID] + if !ok { + sm = NewSubscriptionMonitor(SubscriptionMonitorParams{TrackID: trackID, Logger: p.params.Logger}) + p.subscriptions[trackID] = sm + } + sm.(*SubscriptionMonitor).UpdateSubscription(isSubscribed) + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) { + p.lock.RLock() + sm, ok := p.subscriptions[trackID] + if ok { + sm.(*SubscriptionMonitor).SetSubscribedTrack(subTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) ClearSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) { + p.lock.RLock() + sm, ok := p.subscriptions[trackID] + if ok { + sm.(*SubscriptionMonitor).ClearSubscribedTrack(subTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) checkState() { + ticker := time.NewTicker(monitorInterval) + defer ticker.Stop() + + for !p.isStopped.Load() { + <-ticker.C + + p.checkPublications() + p.checkSubscriptions() + } +} + +func (p *ParticipantSupervisor) checkPublications() { + var removablePublications []livekit.TrackID + p.lock.RLock() + for trackID, pm := range p.publications { + if err := pm.Check(); err != nil { + p.params.Logger.Errorw("supervisor error on publication", err, "trackID", trackID) + } else { + if pm.IsIdle() { + removablePublications = append(removablePublications, trackID) + } + } + } + p.lock.RUnlock() + + p.lock.Lock() + for _, trackID := range removablePublications { + delete(p.publications, trackID) + } + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) checkSubscriptions() { + var removableSubscriptions []livekit.TrackID + p.lock.RLock() + for trackID, sm := range p.subscriptions { + if err := sm.Check(); err != nil { + p.params.Logger.Errorw("supervisor error on subscription", err, "trackID", trackID) + } else { + if sm.IsIdle() { + removableSubscriptions = append(removableSubscriptions, trackID) + } + } + } + p.lock.RUnlock() + + p.lock.Lock() + for _, trackID := range removableSubscriptions { + delete(p.subscriptions, trackID) + } + p.lock.Unlock() +} diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go new file mode 100644 index 000000000..2f99b0470 --- /dev/null +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -0,0 +1,148 @@ +package supervisor + +import ( + "errors" + "sync" + "time" + + "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + publishWaitDuration = 10 * time.Second +) + +var ( + errPublishTimeout = errors.New("publish time out") +) + +type publish struct { + isStart bool +} + +type PublicationMonitorParams struct { + TrackID livekit.TrackID + Logger logger.Logger +} + +type PublicationMonitor struct { + params PublicationMonitorParams + + lock sync.RWMutex + desiredPublishes deque.Deque + + publishedTrack types.LocalMediaTrack + isMuted bool + unmutedAt time.Time +} + +func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor { + p := &PublicationMonitor{ + params: params, + } + p.desiredPublishes.SetMinCapacity(2) + return p +} + +func (p *PublicationMonitor) AddPending() { + p.lock.Lock() + p.desiredPublishes.PushBack( + &publish{ + isStart: true, + }, + ) + + // synthesize an end + p.desiredPublishes.PushBack( + &publish{ + isStart: false, + }, + ) + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) SetMute(isMuted bool) { + p.lock.Lock() + p.isMuted = isMuted + if !p.isMuted { + p.unmutedAt = time.Now() + } + p.lock.Unlock() +} + +func (p *PublicationMonitor) SetPublishedTrack(pubTrack types.LocalMediaTrack) { + p.lock.Lock() + p.publishedTrack = pubTrack + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) ClearPublishedTrack(pubTrack types.LocalMediaTrack) { + p.lock.Lock() + if p.publishedTrack == pubTrack { + p.publishedTrack = nil + } else { + p.params.Logger.Errorw("mismatched published track on clear", nil, "trackID", p.params.TrackID) + } + + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) Check() error { + p.lock.RLock() + var pub *publish + if p.desiredPublishes.Len() > 0 { + pub = p.desiredPublishes.Front().(*publish) + } + + isMuted := p.isMuted + unmutedAt := p.unmutedAt + p.lock.RUnlock() + + if pub == nil { + return nil + } + + if pub.isStart && !isMuted && !unmutedAt.IsZero() && time.Since(unmutedAt) > publishWaitDuration { + // timed out waiting for publish + return errPublishTimeout + } + + // give more time for publish to happen + // NOTE: synthesized end events do not have a start time, so do not check them for time out + return nil +} + +func (p *PublicationMonitor) IsIdle() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.desiredPublishes.Len() == 0 && p.publishedTrack == nil +} + +func (p *PublicationMonitor) update() { + var pub *publish + if p.desiredPublishes.Len() > 0 { + pub = p.desiredPublishes.PopFront().(*publish) + } + + if pub == nil { + return + } + + switch { + case pub.isStart && p.publishedTrack != nil: + return + case !pub.isStart && p.publishedTrack == nil: + return + default: + // put it back as the condition is not satisfied + p.desiredPublishes.PushFront(pub) + return + } +} diff --git a/pkg/rtc/supervisor/subscription_monitor.go b/pkg/rtc/supervisor/subscription_monitor.go new file mode 100644 index 000000000..35fb6c872 --- /dev/null +++ b/pkg/rtc/supervisor/subscription_monitor.go @@ -0,0 +1,128 @@ +package supervisor + +import ( + "errors" + "sync" + "time" + + "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + transitionWaitDuration = 10 * time.Second +) + +var ( + errTransitionTimeout = errors.New("transition time out") +) + +type transition struct { + isSubscribed bool + at time.Time +} + +type SubscriptionMonitorParams struct { + TrackID livekit.TrackID + Logger logger.Logger +} + +type SubscriptionMonitor struct { + params SubscriptionMonitorParams + + lock sync.RWMutex + desiredTransitions deque.Deque + + subscribedTrack types.SubscribedTrack +} + +func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor { + s := &SubscriptionMonitor{ + params: params, + } + s.desiredTransitions.SetMinCapacity(2) + return s +} + +func (s *SubscriptionMonitor) UpdateSubscription(isSubscribed bool) { + s.lock.Lock() + s.desiredTransitions.PushBack( + &transition{ + isSubscribed: isSubscribed, + at: time.Now(), + }, + ) + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) SetSubscribedTrack(subTrack types.SubscribedTrack) { + s.lock.Lock() + s.subscribedTrack = subTrack + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) ClearSubscribedTrack(subTrack types.SubscribedTrack) { + s.lock.Lock() + if s.subscribedTrack == subTrack { + s.subscribedTrack = nil + } else { + s.params.Logger.Errorw("mismatched subscribed track on clear", nil, "trackID", s.params.TrackID) + } + + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) Check() error { + s.lock.RLock() + var tx *transition + if s.desiredTransitions.Len() > 0 { + tx = s.desiredTransitions.Front().(*transition) + } + s.lock.RUnlock() + + if tx == nil { + return nil + } + + if time.Since(tx.at) > transitionWaitDuration { + // timed out waiting for transition + return errTransitionTimeout + } + + // give more time for transition to happen + return nil +} + +func (s *SubscriptionMonitor) IsIdle() bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.desiredTransitions.Len() == 0 && s.subscribedTrack == nil +} + +func (s *SubscriptionMonitor) update() { + var tx *transition + if s.desiredTransitions.Len() > 0 { + tx = s.desiredTransitions.PopFront().(*transition) + } + + if tx == nil { + return + } + + switch { + case tx.isSubscribed && s.subscribedTrack != nil: + return + case !tx.isSubscribed && s.subscribedTrack == nil: + return + default: + // put it back as the condition is not satisfied + s.desiredTransitions.PushFront(tx) + return + } +} diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index d30d75dba..ef8d66d21 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -100,20 +100,35 @@ func TestNegotiationTiming(t *testing.T) { // initial offer transportA.Negotiate(true) require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRemote + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRemote }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRemote") // second try, should've flipped transport status to retry transportA.Negotiate(true) require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRetry + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRetry }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry") // third try, should've stayed at retry transportA.Negotiate(true) time.Sleep(100 * time.Millisecond) // some time to process the negotiate event require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRetry + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRetry }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry") time.Sleep(5 * time.Millisecond) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index ffa656b8b..32bc5d389 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -419,3 +419,8 @@ type SubscribedTrack interface { // selects appropriate video layer according to subscriber preferences UpdateVideoLayer() } + +type OperationMonitor interface { + Check() error + IsIdle() bool +} diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 312e77b7a..9528ec7a4 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -21,11 +21,6 @@ type FakeLocalParticipant struct { arg1 webrtc.ICECandidateInit arg2 livekit.SignalTarget } - AddNegotiationPendingStub func(livekit.ParticipantID) - addNegotiationPendingMutex sync.RWMutex - addNegotiationPendingArgsForCall []struct { - arg1 livekit.ParticipantID - } AddSubscribedTrackStub func(types.SubscribedTrack) addSubscribedTrackMutex sync.RWMutex addSubscribedTrackArgsForCall []struct { @@ -342,17 +337,6 @@ type FakeLocalParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } - IsNegotiationPendingStub func(livekit.ParticipantID) bool - isNegotiationPendingMutex sync.RWMutex - isNegotiationPendingArgsForCall []struct { - arg1 livekit.ParticipantID - } - isNegotiationPendingReturns struct { - result1 bool - } - isNegotiationPendingReturnsOnCall map[int]struct { - result1 bool - } IsPublisherStub func() bool isPublisherMutex sync.RWMutex isPublisherArgsForCall []struct { @@ -796,38 +780,6 @@ func (fake *FakeLocalParticipant) AddICECandidateArgsForCall(i int) (webrtc.ICEC return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeLocalParticipant) AddNegotiationPending(arg1 livekit.ParticipantID) { - fake.addNegotiationPendingMutex.Lock() - fake.addNegotiationPendingArgsForCall = append(fake.addNegotiationPendingArgsForCall, struct { - arg1 livekit.ParticipantID - }{arg1}) - stub := fake.AddNegotiationPendingStub - fake.recordInvocation("AddNegotiationPending", []interface{}{arg1}) - fake.addNegotiationPendingMutex.Unlock() - if stub != nil { - fake.AddNegotiationPendingStub(arg1) - } -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingCallCount() int { - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() - return len(fake.addNegotiationPendingArgsForCall) -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingCalls(stub func(livekit.ParticipantID)) { - fake.addNegotiationPendingMutex.Lock() - defer fake.addNegotiationPendingMutex.Unlock() - fake.AddNegotiationPendingStub = stub -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingArgsForCall(i int) livekit.ParticipantID { - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() - argsForCall := fake.addNegotiationPendingArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeLocalParticipant) AddSubscribedTrack(arg1 types.SubscribedTrack) { fake.addSubscribedTrackMutex.Lock() fake.addSubscribedTrackArgsForCall = append(fake.addSubscribedTrackArgsForCall, struct { @@ -2488,67 +2440,6 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P }{result1} } -func (fake *FakeLocalParticipant) IsNegotiationPending(arg1 livekit.ParticipantID) bool { - fake.isNegotiationPendingMutex.Lock() - ret, specificReturn := fake.isNegotiationPendingReturnsOnCall[len(fake.isNegotiationPendingArgsForCall)] - fake.isNegotiationPendingArgsForCall = append(fake.isNegotiationPendingArgsForCall, struct { - arg1 livekit.ParticipantID - }{arg1}) - stub := fake.IsNegotiationPendingStub - fakeReturns := fake.isNegotiationPendingReturns - fake.recordInvocation("IsNegotiationPending", []interface{}{arg1}) - fake.isNegotiationPendingMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingCallCount() int { - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() - return len(fake.isNegotiationPendingArgsForCall) -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingCalls(stub func(livekit.ParticipantID) bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = stub -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingArgsForCall(i int) livekit.ParticipantID { - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() - argsForCall := fake.isNegotiationPendingArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingReturns(result1 bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = nil - fake.isNegotiationPendingReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingReturnsOnCall(i int, result1 bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = nil - if fake.isNegotiationPendingReturnsOnCall == nil { - fake.isNegotiationPendingReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.isNegotiationPendingReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeLocalParticipant) IsPublisher() bool { fake.isPublisherMutex.Lock() ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)] @@ -4814,8 +4705,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.addICECandidateMutex.RLock() defer fake.addICECandidateMutex.RUnlock() - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() fake.addSubscribedTrackMutex.RLock() defer fake.addSubscribedTrackMutex.RUnlock() fake.addSubscriberMutex.RLock() @@ -4884,8 +4773,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() fake.isPublisherMutex.RLock() defer fake.isPublisherMutex.RUnlock() fake.isReadyMutex.RLock() diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index bdb1aedcd..7919c8ce7 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -288,7 +288,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok { _ = sigConn.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Pong{ - Pong: 1, + Pong: time.Now().UnixNano(), }, }) continue