diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 0c0e5f9e5..aeeab20e3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -18,6 +18,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/supervisor" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" @@ -153,6 +154,8 @@ type ParticipantImpl struct { subscriptionInProgress map[livekit.TrackID]bool subscriptionRequestsQueue map[livekit.TrackID][]SubscribeRequest trackPublisherVersion map[livekit.TrackID]uint32 + + supervisor *supervisor.ParticipantSupervisor } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -179,6 +182,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { subscriptionInProgress: make(map[livekit.TrackID]bool), subscriptionRequestsQueue: make(map[livekit.TrackID][]SubscribeRequest), trackPublisherVersion: make(map[livekit.TrackID]uint32), + supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), } p.version.Store(params.InitialVersion) p.migrateState.Store(types.MigrateStateInit) @@ -623,7 +627,12 @@ func (p *ParticipantImpl) SetMigrateInfo( ) { p.pendingTracksLock.Lock() for _, t := range mediaTracks { - p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{t.GetTrack()}, migrated: true} + ti := t.GetTrack() + + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + + p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} } p.pendingTracksLock.Unlock() @@ -654,6 +663,8 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea }) } + p.supervisor.Stop() + p.UpTrackManager.Close(!sendLeave) p.pendingTracksLock.Lock() @@ -963,6 +974,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { onSubscribedTo := p.onSubscribedTo p.subscribedTracks[subTrack.ID()] = subTrack + p.supervisor.SetSubscribedTrack(subTrack.ID(), subTrack) settings := p.subscribedTracksSettings[subTrack.ID()] p.lock.Unlock() @@ -1004,6 +1016,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) p.trackPublisherVersion[subTrack.ID()] = subTrack.PublisherVersion() delete(p.subscribedTracks, subTrack.ID()) + p.supervisor.ClearSubscribedTrack(subTrack.ID(), subTrack) // remove from subscribed map numRemaining := 0 @@ -1015,7 +1028,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) // // NOTE - // subscribedTrackSettings should not be deleted on removal as it is needed if corresponding publisher migrated + // subscribedTracksSettings should not be deleted on removal as it is needed if corresponding publisher migrated // LK-TODO: find a way to clean these up // @@ -1534,6 +1547,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.pendingTracks[req.Cid] == nil { p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} } else { @@ -1543,6 +1559,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l return nil } + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} p.params.Logger.Debugw("pending track added", "track", ti.String(), "request", req.String()) return ti @@ -1570,6 +1589,8 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro } func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { + p.supervisor.SetPublicationMute(trackID, muted) + track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted) isPending := false @@ -1722,7 +1743,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv }) mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) + // add to published and clean up pending + p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) p.UpTrackManager.AddPublishedTrack(mt) p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:] @@ -1731,6 +1754,8 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv } mt.AddOnClose(func() { + p.supervisor.ClearPublishedTrack(livekit.TrackID(ti.Sid), mt) + // re-use track sid p.pendingTracksLock.Lock() if pti := p.pendingTracks[signalCid]; pti != nil { @@ -2032,6 +2057,8 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(sub types.LocalParticipant) error) { p.params.Logger.Infow("queuing subscribe", "trackID", trackID) + p.supervisor.UpdateSubscription(trackID, true) + p.lock.Lock() p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{ requestType: SubscribeRequestTypeAdd, @@ -2045,6 +2072,8 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func( func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) { p.params.Logger.Infow("queuing unsubscribe", "trackID", trackID) + p.supervisor.UpdateSubscription(trackID, false) + p.lock.Lock() p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{ requestType: SubscribeRequestTypeRemove, diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index a638f40b0..63ff435cc 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -11,9 +11,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant switch msg := req.Message.(type) { case *livekit.SignalRequest_Offer: participant.HandleOffer(FromProtoSessionDescription(msg.Offer)) - case *livekit.SignalRequest_AddTrack: - pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid) - participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Answer: participant.HandleAnswer(FromProtoSessionDescription(msg.Answer)) case *livekit.SignalRequest_Trickle: @@ -23,6 +20,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant return nil } participant.AddICECandidate(candidateInit, msg.Trickle.Target) + case *livekit.SignalRequest_AddTrack: + pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid) + participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Mute: participant.SetTrackMuted(livekit.TrackID(msg.Mute.Sid), msg.Mute.Muted, false) case *livekit.SignalRequest_Subscription: @@ -60,6 +60,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant pLogger.Infow("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting) } + case *livekit.SignalRequest_Leave: + pLogger.Infow("client leaving room") + room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_UpdateLayers: err := room.UpdateVideoLayers(participant, msg.UpdateLayers) if err != nil { @@ -67,9 +70,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant "update", msg.UpdateLayers) return nil } - case *livekit.SignalRequest_Leave: - pLogger.Infow("client leaving room") - room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave) case *livekit.SignalRequest_SubscriptionPermission: err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) if err != nil { diff --git a/pkg/rtc/supervisor/participant_supervisor.go b/pkg/rtc/supervisor/participant_supervisor.go new file mode 100644 index 000000000..687c34d21 --- /dev/null +++ b/pkg/rtc/supervisor/participant_supervisor.go @@ -0,0 +1,166 @@ +package supervisor + +import ( + "sync" + "time" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "go.uber.org/atomic" +) + +const ( + monitorInterval = 3 * time.Second +) + +type ParticipantSupervisorParams struct { + Logger logger.Logger +} + +type ParticipantSupervisor struct { + params ParticipantSupervisorParams + + lock sync.RWMutex + publications map[livekit.TrackID]types.OperationMonitor + subscriptions map[livekit.TrackID]types.OperationMonitor + + isStopped atomic.Bool +} + +func NewParticipantSupervisor(params ParticipantSupervisorParams) *ParticipantSupervisor { + p := &ParticipantSupervisor{ + params: params, + publications: make(map[livekit.TrackID]types.OperationMonitor), + subscriptions: make(map[livekit.TrackID]types.OperationMonitor), + } + + go p.checkState() + + return p +} + +func (p *ParticipantSupervisor) Stop() { + p.isStopped.Store(true) +} + +func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) { + p.lock.Lock() + pm, ok := p.publications[trackID] + if !ok { + pm = NewPublicationMonitor(PublicationMonitorParams{TrackID: trackID, Logger: p.params.Logger}) + p.publications[trackID] = pm + } + pm.(*PublicationMonitor).AddPending() + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetPublicationMute(trackID livekit.TrackID, isMuted bool) { + p.lock.Lock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).SetMute(isMuted) + } + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) { + p.lock.RLock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).SetPublishedTrack(pubTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) ClearPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) { + p.lock.RLock() + pm, ok := p.publications[trackID] + if ok { + pm.(*PublicationMonitor).ClearPublishedTrack(pubTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) UpdateSubscription(trackID livekit.TrackID, isSubscribed bool) { + p.lock.Lock() + sm, ok := p.subscriptions[trackID] + if !ok { + sm = NewSubscriptionMonitor(SubscriptionMonitorParams{TrackID: trackID, Logger: p.params.Logger}) + p.subscriptions[trackID] = sm + } + sm.(*SubscriptionMonitor).UpdateSubscription(isSubscribed) + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) SetSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) { + p.lock.RLock() + sm, ok := p.subscriptions[trackID] + if ok { + sm.(*SubscriptionMonitor).SetSubscribedTrack(subTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) ClearSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) { + p.lock.RLock() + sm, ok := p.subscriptions[trackID] + if ok { + sm.(*SubscriptionMonitor).ClearSubscribedTrack(subTrack) + } + p.lock.RUnlock() +} + +func (p *ParticipantSupervisor) checkState() { + ticker := time.NewTicker(monitorInterval) + defer ticker.Stop() + + for !p.isStopped.Load() { + <-ticker.C + + p.checkPublications() + p.checkSubscriptions() + } +} + +func (p *ParticipantSupervisor) checkPublications() { + var removablePublications []livekit.TrackID + p.lock.RLock() + for trackID, pm := range p.publications { + if err := pm.Check(); err != nil { + p.params.Logger.Errorw("supervisor error on publication", err, "trackID", trackID) + } else { + if pm.IsIdle() { + removablePublications = append(removablePublications, trackID) + } + } + } + p.lock.RUnlock() + + p.lock.Lock() + for _, trackID := range removablePublications { + delete(p.publications, trackID) + } + p.lock.Unlock() +} + +func (p *ParticipantSupervisor) checkSubscriptions() { + var removableSubscriptions []livekit.TrackID + p.lock.RLock() + for trackID, sm := range p.subscriptions { + if err := sm.Check(); err != nil { + p.params.Logger.Errorw("supervisor error on subscription", err, "trackID", trackID) + } else { + if sm.IsIdle() { + removableSubscriptions = append(removableSubscriptions, trackID) + } + } + } + p.lock.RUnlock() + + p.lock.Lock() + for _, trackID := range removableSubscriptions { + delete(p.subscriptions, trackID) + } + p.lock.Unlock() +} diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go new file mode 100644 index 000000000..2f99b0470 --- /dev/null +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -0,0 +1,148 @@ +package supervisor + +import ( + "errors" + "sync" + "time" + + "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + publishWaitDuration = 10 * time.Second +) + +var ( + errPublishTimeout = errors.New("publish time out") +) + +type publish struct { + isStart bool +} + +type PublicationMonitorParams struct { + TrackID livekit.TrackID + Logger logger.Logger +} + +type PublicationMonitor struct { + params PublicationMonitorParams + + lock sync.RWMutex + desiredPublishes deque.Deque + + publishedTrack types.LocalMediaTrack + isMuted bool + unmutedAt time.Time +} + +func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor { + p := &PublicationMonitor{ + params: params, + } + p.desiredPublishes.SetMinCapacity(2) + return p +} + +func (p *PublicationMonitor) AddPending() { + p.lock.Lock() + p.desiredPublishes.PushBack( + &publish{ + isStart: true, + }, + ) + + // synthesize an end + p.desiredPublishes.PushBack( + &publish{ + isStart: false, + }, + ) + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) SetMute(isMuted bool) { + p.lock.Lock() + p.isMuted = isMuted + if !p.isMuted { + p.unmutedAt = time.Now() + } + p.lock.Unlock() +} + +func (p *PublicationMonitor) SetPublishedTrack(pubTrack types.LocalMediaTrack) { + p.lock.Lock() + p.publishedTrack = pubTrack + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) ClearPublishedTrack(pubTrack types.LocalMediaTrack) { + p.lock.Lock() + if p.publishedTrack == pubTrack { + p.publishedTrack = nil + } else { + p.params.Logger.Errorw("mismatched published track on clear", nil, "trackID", p.params.TrackID) + } + + p.update() + p.lock.Unlock() +} + +func (p *PublicationMonitor) Check() error { + p.lock.RLock() + var pub *publish + if p.desiredPublishes.Len() > 0 { + pub = p.desiredPublishes.Front().(*publish) + } + + isMuted := p.isMuted + unmutedAt := p.unmutedAt + p.lock.RUnlock() + + if pub == nil { + return nil + } + + if pub.isStart && !isMuted && !unmutedAt.IsZero() && time.Since(unmutedAt) > publishWaitDuration { + // timed out waiting for publish + return errPublishTimeout + } + + // give more time for publish to happen + // NOTE: synthesized end events do not have a start time, so do not check them for time out + return nil +} + +func (p *PublicationMonitor) IsIdle() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.desiredPublishes.Len() == 0 && p.publishedTrack == nil +} + +func (p *PublicationMonitor) update() { + var pub *publish + if p.desiredPublishes.Len() > 0 { + pub = p.desiredPublishes.PopFront().(*publish) + } + + if pub == nil { + return + } + + switch { + case pub.isStart && p.publishedTrack != nil: + return + case !pub.isStart && p.publishedTrack == nil: + return + default: + // put it back as the condition is not satisfied + p.desiredPublishes.PushFront(pub) + return + } +} diff --git a/pkg/rtc/supervisor/subscription_monitor.go b/pkg/rtc/supervisor/subscription_monitor.go new file mode 100644 index 000000000..35fb6c872 --- /dev/null +++ b/pkg/rtc/supervisor/subscription_monitor.go @@ -0,0 +1,128 @@ +package supervisor + +import ( + "errors" + "sync" + "time" + + "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + transitionWaitDuration = 10 * time.Second +) + +var ( + errTransitionTimeout = errors.New("transition time out") +) + +type transition struct { + isSubscribed bool + at time.Time +} + +type SubscriptionMonitorParams struct { + TrackID livekit.TrackID + Logger logger.Logger +} + +type SubscriptionMonitor struct { + params SubscriptionMonitorParams + + lock sync.RWMutex + desiredTransitions deque.Deque + + subscribedTrack types.SubscribedTrack +} + +func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor { + s := &SubscriptionMonitor{ + params: params, + } + s.desiredTransitions.SetMinCapacity(2) + return s +} + +func (s *SubscriptionMonitor) UpdateSubscription(isSubscribed bool) { + s.lock.Lock() + s.desiredTransitions.PushBack( + &transition{ + isSubscribed: isSubscribed, + at: time.Now(), + }, + ) + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) SetSubscribedTrack(subTrack types.SubscribedTrack) { + s.lock.Lock() + s.subscribedTrack = subTrack + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) ClearSubscribedTrack(subTrack types.SubscribedTrack) { + s.lock.Lock() + if s.subscribedTrack == subTrack { + s.subscribedTrack = nil + } else { + s.params.Logger.Errorw("mismatched subscribed track on clear", nil, "trackID", s.params.TrackID) + } + + s.update() + s.lock.Unlock() +} + +func (s *SubscriptionMonitor) Check() error { + s.lock.RLock() + var tx *transition + if s.desiredTransitions.Len() > 0 { + tx = s.desiredTransitions.Front().(*transition) + } + s.lock.RUnlock() + + if tx == nil { + return nil + } + + if time.Since(tx.at) > transitionWaitDuration { + // timed out waiting for transition + return errTransitionTimeout + } + + // give more time for transition to happen + return nil +} + +func (s *SubscriptionMonitor) IsIdle() bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.desiredTransitions.Len() == 0 && s.subscribedTrack == nil +} + +func (s *SubscriptionMonitor) update() { + var tx *transition + if s.desiredTransitions.Len() > 0 { + tx = s.desiredTransitions.PopFront().(*transition) + } + + if tx == nil { + return + } + + switch { + case tx.isSubscribed && s.subscribedTrack != nil: + return + case !tx.isSubscribed && s.subscribedTrack == nil: + return + default: + // put it back as the condition is not satisfied + s.desiredTransitions.PushFront(tx) + return + } +} diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index d30d75dba..ef8d66d21 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -100,20 +100,35 @@ func TestNegotiationTiming(t *testing.T) { // initial offer transportA.Negotiate(true) require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRemote + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRemote }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRemote") // second try, should've flipped transport status to retry transportA.Negotiate(true) require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRetry + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRetry }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry") // third try, should've stayed at retry transportA.Negotiate(true) time.Sleep(100 * time.Millisecond) // some time to process the negotiate event require.Eventually(t, func() bool { - return negotiationState.Load().(NegotiationState) == NegotiationStateRetry + state, ok := negotiationState.Load().(NegotiationState) + if !ok { + return false + } + + return state == NegotiationStateRetry }, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry") time.Sleep(5 * time.Millisecond) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index ffa656b8b..32bc5d389 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -419,3 +419,8 @@ type SubscribedTrack interface { // selects appropriate video layer according to subscriber preferences UpdateVideoLayer() } + +type OperationMonitor interface { + Check() error + IsIdle() bool +} diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 312e77b7a..9528ec7a4 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -21,11 +21,6 @@ type FakeLocalParticipant struct { arg1 webrtc.ICECandidateInit arg2 livekit.SignalTarget } - AddNegotiationPendingStub func(livekit.ParticipantID) - addNegotiationPendingMutex sync.RWMutex - addNegotiationPendingArgsForCall []struct { - arg1 livekit.ParticipantID - } AddSubscribedTrackStub func(types.SubscribedTrack) addSubscribedTrackMutex sync.RWMutex addSubscribedTrackArgsForCall []struct { @@ -342,17 +337,6 @@ type FakeLocalParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } - IsNegotiationPendingStub func(livekit.ParticipantID) bool - isNegotiationPendingMutex sync.RWMutex - isNegotiationPendingArgsForCall []struct { - arg1 livekit.ParticipantID - } - isNegotiationPendingReturns struct { - result1 bool - } - isNegotiationPendingReturnsOnCall map[int]struct { - result1 bool - } IsPublisherStub func() bool isPublisherMutex sync.RWMutex isPublisherArgsForCall []struct { @@ -796,38 +780,6 @@ func (fake *FakeLocalParticipant) AddICECandidateArgsForCall(i int) (webrtc.ICEC return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeLocalParticipant) AddNegotiationPending(arg1 livekit.ParticipantID) { - fake.addNegotiationPendingMutex.Lock() - fake.addNegotiationPendingArgsForCall = append(fake.addNegotiationPendingArgsForCall, struct { - arg1 livekit.ParticipantID - }{arg1}) - stub := fake.AddNegotiationPendingStub - fake.recordInvocation("AddNegotiationPending", []interface{}{arg1}) - fake.addNegotiationPendingMutex.Unlock() - if stub != nil { - fake.AddNegotiationPendingStub(arg1) - } -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingCallCount() int { - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() - return len(fake.addNegotiationPendingArgsForCall) -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingCalls(stub func(livekit.ParticipantID)) { - fake.addNegotiationPendingMutex.Lock() - defer fake.addNegotiationPendingMutex.Unlock() - fake.AddNegotiationPendingStub = stub -} - -func (fake *FakeLocalParticipant) AddNegotiationPendingArgsForCall(i int) livekit.ParticipantID { - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() - argsForCall := fake.addNegotiationPendingArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeLocalParticipant) AddSubscribedTrack(arg1 types.SubscribedTrack) { fake.addSubscribedTrackMutex.Lock() fake.addSubscribedTrackArgsForCall = append(fake.addSubscribedTrackArgsForCall, struct { @@ -2488,67 +2440,6 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P }{result1} } -func (fake *FakeLocalParticipant) IsNegotiationPending(arg1 livekit.ParticipantID) bool { - fake.isNegotiationPendingMutex.Lock() - ret, specificReturn := fake.isNegotiationPendingReturnsOnCall[len(fake.isNegotiationPendingArgsForCall)] - fake.isNegotiationPendingArgsForCall = append(fake.isNegotiationPendingArgsForCall, struct { - arg1 livekit.ParticipantID - }{arg1}) - stub := fake.IsNegotiationPendingStub - fakeReturns := fake.isNegotiationPendingReturns - fake.recordInvocation("IsNegotiationPending", []interface{}{arg1}) - fake.isNegotiationPendingMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingCallCount() int { - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() - return len(fake.isNegotiationPendingArgsForCall) -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingCalls(stub func(livekit.ParticipantID) bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = stub -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingArgsForCall(i int) livekit.ParticipantID { - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() - argsForCall := fake.isNegotiationPendingArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingReturns(result1 bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = nil - fake.isNegotiationPendingReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeLocalParticipant) IsNegotiationPendingReturnsOnCall(i int, result1 bool) { - fake.isNegotiationPendingMutex.Lock() - defer fake.isNegotiationPendingMutex.Unlock() - fake.IsNegotiationPendingStub = nil - if fake.isNegotiationPendingReturnsOnCall == nil { - fake.isNegotiationPendingReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.isNegotiationPendingReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeLocalParticipant) IsPublisher() bool { fake.isPublisherMutex.Lock() ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)] @@ -4814,8 +4705,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.addICECandidateMutex.RLock() defer fake.addICECandidateMutex.RUnlock() - fake.addNegotiationPendingMutex.RLock() - defer fake.addNegotiationPendingMutex.RUnlock() fake.addSubscribedTrackMutex.RLock() defer fake.addSubscribedTrackMutex.RUnlock() fake.addSubscriberMutex.RLock() @@ -4884,8 +4773,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() - fake.isNegotiationPendingMutex.RLock() - defer fake.isNegotiationPendingMutex.RUnlock() fake.isPublisherMutex.RLock() defer fake.isPublisherMutex.RUnlock() fake.isReadyMutex.RLock() diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index bdb1aedcd..7919c8ce7 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -288,7 +288,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok { _ = sigConn.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Pong{ - Pong: 1, + Pong: time.Now().UnixNano(), }, }) continue