mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
Supervisor beginnings (#1005)
* Remove VP9 from media engine set up. * Remove vp9 from config sample * Supervisor beginnings Eventual goal is to have a reconciler which moves state from actual -> desired. First step along the way is to observe/monitor. The first step even in that is an initial implementation to get feedback on the direction. This PR is a start in that direction - Concept of a supervisor at local participant level - This supervisor will be responsible for periodically monitor actual vs desired (this is the one which will eventually trigger other things to reconcile, but for now it just logs on error) - A new interface `OperationMonitor` which requires two methods o Check() returns an error based on actual vs desired state. o IsIdle() returns bool. Returns true if the monitor is idle. - The supervisor maintains a list of monitors and does periodic check. In the above framework, starting with list of subscriptions/unsubscriptions. There is a new module `SubscriptionMonitor` which checks subscription transitions. A subscription transition is queued on subscribe/unsubscribe. The transition can be satisfied when a subscribedTrack is added OR removed. Error condition is when a transition is not satisfied for 10 seconds. Idle is when the transition queue is empty and subscribedTrack is nil, i. e. the last transition would have been unsubscribe and subscribed track removed (unsubscribe satisfied). The idea is individual monitors can check on different things. Some more things that I am thinking about are - PublishedTrackMonitor - started when an add track happens, satisfied when OnTrack happens, error if `OnTrack` does not fire for a while and track is not muted, idle when there is nothing pending. - PublishedTrackStreamingMonitor - to ensure that a published track is receiving media at the server (accounting for dynacast, mute, etc) - SubscribedTrackStreamingMonitor - to ensure down track is sending data unless muted. * Remove debug * Protect against early casting errors * Adding PublicationMonitor
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/supervisor"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
@@ -153,6 +154,8 @@ type ParticipantImpl struct {
|
||||
subscriptionInProgress map[livekit.TrackID]bool
|
||||
subscriptionRequestsQueue map[livekit.TrackID][]SubscribeRequest
|
||||
trackPublisherVersion map[livekit.TrackID]uint32
|
||||
|
||||
supervisor *supervisor.ParticipantSupervisor
|
||||
}
|
||||
|
||||
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
@@ -179,6 +182,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
subscriptionInProgress: make(map[livekit.TrackID]bool),
|
||||
subscriptionRequestsQueue: make(map[livekit.TrackID][]SubscribeRequest),
|
||||
trackPublisherVersion: make(map[livekit.TrackID]uint32),
|
||||
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
|
||||
}
|
||||
p.version.Store(params.InitialVersion)
|
||||
p.migrateState.Store(types.MigrateStateInit)
|
||||
@@ -623,7 +627,12 @@ func (p *ParticipantImpl) SetMigrateInfo(
|
||||
) {
|
||||
p.pendingTracksLock.Lock()
|
||||
for _, t := range mediaTracks {
|
||||
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{t.GetTrack()}, migrated: true}
|
||||
ti := t.GetTrack()
|
||||
|
||||
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
|
||||
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
|
||||
|
||||
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true}
|
||||
}
|
||||
p.pendingTracksLock.Unlock()
|
||||
|
||||
@@ -654,6 +663,8 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
|
||||
})
|
||||
}
|
||||
|
||||
p.supervisor.Stop()
|
||||
|
||||
p.UpTrackManager.Close(!sendLeave)
|
||||
|
||||
p.pendingTracksLock.Lock()
|
||||
@@ -963,6 +974,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
|
||||
onSubscribedTo := p.onSubscribedTo
|
||||
|
||||
p.subscribedTracks[subTrack.ID()] = subTrack
|
||||
p.supervisor.SetSubscribedTrack(subTrack.ID(), subTrack)
|
||||
|
||||
settings := p.subscribedTracksSettings[subTrack.ID()]
|
||||
p.lock.Unlock()
|
||||
@@ -1004,6 +1016,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack)
|
||||
p.trackPublisherVersion[subTrack.ID()] = subTrack.PublisherVersion()
|
||||
|
||||
delete(p.subscribedTracks, subTrack.ID())
|
||||
p.supervisor.ClearSubscribedTrack(subTrack.ID(), subTrack)
|
||||
|
||||
// remove from subscribed map
|
||||
numRemaining := 0
|
||||
@@ -1015,7 +1028,7 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack)
|
||||
|
||||
//
|
||||
// NOTE
|
||||
// subscribedTrackSettings should not be deleted on removal as it is needed if corresponding publisher migrated
|
||||
// subscribedTracksSettings should not be deleted on removal as it is needed if corresponding publisher migrated
|
||||
// LK-TODO: find a way to clean these up
|
||||
//
|
||||
|
||||
@@ -1534,6 +1547,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
|
||||
}
|
||||
|
||||
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil {
|
||||
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
|
||||
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
|
||||
|
||||
if p.pendingTracks[req.Cid] == nil {
|
||||
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
|
||||
} else {
|
||||
@@ -1543,6 +1559,9 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
|
||||
return nil
|
||||
}
|
||||
|
||||
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
|
||||
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
|
||||
|
||||
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
|
||||
p.params.Logger.Debugw("pending track added", "track", ti.String(), "request", req.String())
|
||||
return ti
|
||||
@@ -1570,6 +1589,8 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
|
||||
p.supervisor.SetPublicationMute(trackID, muted)
|
||||
|
||||
track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted)
|
||||
|
||||
isPending := false
|
||||
@@ -1722,7 +1743,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
|
||||
})
|
||||
|
||||
mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
|
||||
|
||||
// add to published and clean up pending
|
||||
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
|
||||
p.UpTrackManager.AddPublishedTrack(mt)
|
||||
|
||||
p.pendingTracks[signalCid].trackInfos = p.pendingTracks[signalCid].trackInfos[1:]
|
||||
@@ -1731,6 +1754,8 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
|
||||
}
|
||||
|
||||
mt.AddOnClose(func() {
|
||||
p.supervisor.ClearPublishedTrack(livekit.TrackID(ti.Sid), mt)
|
||||
|
||||
// re-use track sid
|
||||
p.pendingTracksLock.Lock()
|
||||
if pti := p.pendingTracks[signalCid]; pti != nil {
|
||||
@@ -2032,6 +2057,8 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() {
|
||||
func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(sub types.LocalParticipant) error) {
|
||||
p.params.Logger.Infow("queuing subscribe", "trackID", trackID)
|
||||
|
||||
p.supervisor.UpdateSubscription(trackID, true)
|
||||
|
||||
p.lock.Lock()
|
||||
p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{
|
||||
requestType: SubscribeRequestTypeAdd,
|
||||
@@ -2045,6 +2072,8 @@ func (p *ParticipantImpl) EnqueueSubscribeTrack(trackID livekit.TrackID, f func(
|
||||
func (p *ParticipantImpl) EnqueueUnsubscribeTrack(trackID livekit.TrackID, willBeResumed bool, f func(subscriberID livekit.ParticipantID, willBeResumed bool) error) {
|
||||
p.params.Logger.Infow("queuing unsubscribe", "trackID", trackID)
|
||||
|
||||
p.supervisor.UpdateSubscription(trackID, false)
|
||||
|
||||
p.lock.Lock()
|
||||
p.subscriptionRequestsQueue[trackID] = append(p.subscriptionRequestsQueue[trackID], SubscribeRequest{
|
||||
requestType: SubscribeRequestTypeRemove,
|
||||
|
||||
@@ -11,9 +11,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
|
||||
switch msg := req.Message.(type) {
|
||||
case *livekit.SignalRequest_Offer:
|
||||
participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
|
||||
case *livekit.SignalRequest_AddTrack:
|
||||
pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid)
|
||||
participant.AddTrack(msg.AddTrack)
|
||||
case *livekit.SignalRequest_Answer:
|
||||
participant.HandleAnswer(FromProtoSessionDescription(msg.Answer))
|
||||
case *livekit.SignalRequest_Trickle:
|
||||
@@ -23,6 +20,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
|
||||
return nil
|
||||
}
|
||||
participant.AddICECandidate(candidateInit, msg.Trickle.Target)
|
||||
case *livekit.SignalRequest_AddTrack:
|
||||
pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid)
|
||||
participant.AddTrack(msg.AddTrack)
|
||||
case *livekit.SignalRequest_Mute:
|
||||
participant.SetTrackMuted(livekit.TrackID(msg.Mute.Sid), msg.Mute.Muted, false)
|
||||
case *livekit.SignalRequest_Subscription:
|
||||
@@ -60,6 +60,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
|
||||
|
||||
pLogger.Infow("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting)
|
||||
}
|
||||
case *livekit.SignalRequest_Leave:
|
||||
pLogger.Infow("client leaving room")
|
||||
room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave)
|
||||
case *livekit.SignalRequest_UpdateLayers:
|
||||
err := room.UpdateVideoLayers(participant, msg.UpdateLayers)
|
||||
if err != nil {
|
||||
@@ -67,9 +70,6 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
|
||||
"update", msg.UpdateLayers)
|
||||
return nil
|
||||
}
|
||||
case *livekit.SignalRequest_Leave:
|
||||
pLogger.Infow("client leaving room")
|
||||
room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave)
|
||||
case *livekit.SignalRequest_SubscriptionPermission:
|
||||
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)
|
||||
if err != nil {
|
||||
|
||||
166
pkg/rtc/supervisor/participant_supervisor.go
Normal file
166
pkg/rtc/supervisor/participant_supervisor.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package supervisor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
monitorInterval = 3 * time.Second
|
||||
)
|
||||
|
||||
type ParticipantSupervisorParams struct {
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
type ParticipantSupervisor struct {
|
||||
params ParticipantSupervisorParams
|
||||
|
||||
lock sync.RWMutex
|
||||
publications map[livekit.TrackID]types.OperationMonitor
|
||||
subscriptions map[livekit.TrackID]types.OperationMonitor
|
||||
|
||||
isStopped atomic.Bool
|
||||
}
|
||||
|
||||
func NewParticipantSupervisor(params ParticipantSupervisorParams) *ParticipantSupervisor {
|
||||
p := &ParticipantSupervisor{
|
||||
params: params,
|
||||
publications: make(map[livekit.TrackID]types.OperationMonitor),
|
||||
subscriptions: make(map[livekit.TrackID]types.OperationMonitor),
|
||||
}
|
||||
|
||||
go p.checkState()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) Stop() {
|
||||
p.isStopped.Store(true)
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) {
|
||||
p.lock.Lock()
|
||||
pm, ok := p.publications[trackID]
|
||||
if !ok {
|
||||
pm = NewPublicationMonitor(PublicationMonitorParams{TrackID: trackID, Logger: p.params.Logger})
|
||||
p.publications[trackID] = pm
|
||||
}
|
||||
pm.(*PublicationMonitor).AddPending()
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) SetPublicationMute(trackID livekit.TrackID, isMuted bool) {
|
||||
p.lock.Lock()
|
||||
pm, ok := p.publications[trackID]
|
||||
if ok {
|
||||
pm.(*PublicationMonitor).SetMute(isMuted)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) SetPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) {
|
||||
p.lock.RLock()
|
||||
pm, ok := p.publications[trackID]
|
||||
if ok {
|
||||
pm.(*PublicationMonitor).SetPublishedTrack(pubTrack)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) ClearPublishedTrack(trackID livekit.TrackID, pubTrack types.LocalMediaTrack) {
|
||||
p.lock.RLock()
|
||||
pm, ok := p.publications[trackID]
|
||||
if ok {
|
||||
pm.(*PublicationMonitor).ClearPublishedTrack(pubTrack)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) UpdateSubscription(trackID livekit.TrackID, isSubscribed bool) {
|
||||
p.lock.Lock()
|
||||
sm, ok := p.subscriptions[trackID]
|
||||
if !ok {
|
||||
sm = NewSubscriptionMonitor(SubscriptionMonitorParams{TrackID: trackID, Logger: p.params.Logger})
|
||||
p.subscriptions[trackID] = sm
|
||||
}
|
||||
sm.(*SubscriptionMonitor).UpdateSubscription(isSubscribed)
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) SetSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) {
|
||||
p.lock.RLock()
|
||||
sm, ok := p.subscriptions[trackID]
|
||||
if ok {
|
||||
sm.(*SubscriptionMonitor).SetSubscribedTrack(subTrack)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) ClearSubscribedTrack(trackID livekit.TrackID, subTrack types.SubscribedTrack) {
|
||||
p.lock.RLock()
|
||||
sm, ok := p.subscriptions[trackID]
|
||||
if ok {
|
||||
sm.(*SubscriptionMonitor).ClearSubscribedTrack(subTrack)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) checkState() {
|
||||
ticker := time.NewTicker(monitorInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for !p.isStopped.Load() {
|
||||
<-ticker.C
|
||||
|
||||
p.checkPublications()
|
||||
p.checkSubscriptions()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) checkPublications() {
|
||||
var removablePublications []livekit.TrackID
|
||||
p.lock.RLock()
|
||||
for trackID, pm := range p.publications {
|
||||
if err := pm.Check(); err != nil {
|
||||
p.params.Logger.Errorw("supervisor error on publication", err, "trackID", trackID)
|
||||
} else {
|
||||
if pm.IsIdle() {
|
||||
removablePublications = append(removablePublications, trackID)
|
||||
}
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
|
||||
p.lock.Lock()
|
||||
for _, trackID := range removablePublications {
|
||||
delete(p.publications, trackID)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *ParticipantSupervisor) checkSubscriptions() {
|
||||
var removableSubscriptions []livekit.TrackID
|
||||
p.lock.RLock()
|
||||
for trackID, sm := range p.subscriptions {
|
||||
if err := sm.Check(); err != nil {
|
||||
p.params.Logger.Errorw("supervisor error on subscription", err, "trackID", trackID)
|
||||
} else {
|
||||
if sm.IsIdle() {
|
||||
removableSubscriptions = append(removableSubscriptions, trackID)
|
||||
}
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
|
||||
p.lock.Lock()
|
||||
for _, trackID := range removableSubscriptions {
|
||||
delete(p.subscriptions, trackID)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
148
pkg/rtc/supervisor/publication_monitor.go
Normal file
148
pkg/rtc/supervisor/publication_monitor.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package supervisor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gammazero/deque"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
publishWaitDuration = 10 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
errPublishTimeout = errors.New("publish time out")
|
||||
)
|
||||
|
||||
type publish struct {
|
||||
isStart bool
|
||||
}
|
||||
|
||||
type PublicationMonitorParams struct {
|
||||
TrackID livekit.TrackID
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
type PublicationMonitor struct {
|
||||
params PublicationMonitorParams
|
||||
|
||||
lock sync.RWMutex
|
||||
desiredPublishes deque.Deque
|
||||
|
||||
publishedTrack types.LocalMediaTrack
|
||||
isMuted bool
|
||||
unmutedAt time.Time
|
||||
}
|
||||
|
||||
func NewPublicationMonitor(params PublicationMonitorParams) *PublicationMonitor {
|
||||
p := &PublicationMonitor{
|
||||
params: params,
|
||||
}
|
||||
p.desiredPublishes.SetMinCapacity(2)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) AddPending() {
|
||||
p.lock.Lock()
|
||||
p.desiredPublishes.PushBack(
|
||||
&publish{
|
||||
isStart: true,
|
||||
},
|
||||
)
|
||||
|
||||
// synthesize an end
|
||||
p.desiredPublishes.PushBack(
|
||||
&publish{
|
||||
isStart: false,
|
||||
},
|
||||
)
|
||||
p.update()
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) SetMute(isMuted bool) {
|
||||
p.lock.Lock()
|
||||
p.isMuted = isMuted
|
||||
if !p.isMuted {
|
||||
p.unmutedAt = time.Now()
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) SetPublishedTrack(pubTrack types.LocalMediaTrack) {
|
||||
p.lock.Lock()
|
||||
p.publishedTrack = pubTrack
|
||||
p.update()
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) ClearPublishedTrack(pubTrack types.LocalMediaTrack) {
|
||||
p.lock.Lock()
|
||||
if p.publishedTrack == pubTrack {
|
||||
p.publishedTrack = nil
|
||||
} else {
|
||||
p.params.Logger.Errorw("mismatched published track on clear", nil, "trackID", p.params.TrackID)
|
||||
}
|
||||
|
||||
p.update()
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) Check() error {
|
||||
p.lock.RLock()
|
||||
var pub *publish
|
||||
if p.desiredPublishes.Len() > 0 {
|
||||
pub = p.desiredPublishes.Front().(*publish)
|
||||
}
|
||||
|
||||
isMuted := p.isMuted
|
||||
unmutedAt := p.unmutedAt
|
||||
p.lock.RUnlock()
|
||||
|
||||
if pub == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if pub.isStart && !isMuted && !unmutedAt.IsZero() && time.Since(unmutedAt) > publishWaitDuration {
|
||||
// timed out waiting for publish
|
||||
return errPublishTimeout
|
||||
}
|
||||
|
||||
// give more time for publish to happen
|
||||
// NOTE: synthesized end events do not have a start time, so do not check them for time out
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) IsIdle() bool {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
return p.desiredPublishes.Len() == 0 && p.publishedTrack == nil
|
||||
}
|
||||
|
||||
func (p *PublicationMonitor) update() {
|
||||
var pub *publish
|
||||
if p.desiredPublishes.Len() > 0 {
|
||||
pub = p.desiredPublishes.PopFront().(*publish)
|
||||
}
|
||||
|
||||
if pub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case pub.isStart && p.publishedTrack != nil:
|
||||
return
|
||||
case !pub.isStart && p.publishedTrack == nil:
|
||||
return
|
||||
default:
|
||||
// put it back as the condition is not satisfied
|
||||
p.desiredPublishes.PushFront(pub)
|
||||
return
|
||||
}
|
||||
}
|
||||
128
pkg/rtc/supervisor/subscription_monitor.go
Normal file
128
pkg/rtc/supervisor/subscription_monitor.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package supervisor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gammazero/deque"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
transitionWaitDuration = 10 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
errTransitionTimeout = errors.New("transition time out")
|
||||
)
|
||||
|
||||
type transition struct {
|
||||
isSubscribed bool
|
||||
at time.Time
|
||||
}
|
||||
|
||||
type SubscriptionMonitorParams struct {
|
||||
TrackID livekit.TrackID
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
type SubscriptionMonitor struct {
|
||||
params SubscriptionMonitorParams
|
||||
|
||||
lock sync.RWMutex
|
||||
desiredTransitions deque.Deque
|
||||
|
||||
subscribedTrack types.SubscribedTrack
|
||||
}
|
||||
|
||||
func NewSubscriptionMonitor(params SubscriptionMonitorParams) *SubscriptionMonitor {
|
||||
s := &SubscriptionMonitor{
|
||||
params: params,
|
||||
}
|
||||
s.desiredTransitions.SetMinCapacity(2)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) UpdateSubscription(isSubscribed bool) {
|
||||
s.lock.Lock()
|
||||
s.desiredTransitions.PushBack(
|
||||
&transition{
|
||||
isSubscribed: isSubscribed,
|
||||
at: time.Now(),
|
||||
},
|
||||
)
|
||||
s.update()
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) SetSubscribedTrack(subTrack types.SubscribedTrack) {
|
||||
s.lock.Lock()
|
||||
s.subscribedTrack = subTrack
|
||||
s.update()
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) ClearSubscribedTrack(subTrack types.SubscribedTrack) {
|
||||
s.lock.Lock()
|
||||
if s.subscribedTrack == subTrack {
|
||||
s.subscribedTrack = nil
|
||||
} else {
|
||||
s.params.Logger.Errorw("mismatched subscribed track on clear", nil, "trackID", s.params.TrackID)
|
||||
}
|
||||
|
||||
s.update()
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) Check() error {
|
||||
s.lock.RLock()
|
||||
var tx *transition
|
||||
if s.desiredTransitions.Len() > 0 {
|
||||
tx = s.desiredTransitions.Front().(*transition)
|
||||
}
|
||||
s.lock.RUnlock()
|
||||
|
||||
if tx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if time.Since(tx.at) > transitionWaitDuration {
|
||||
// timed out waiting for transition
|
||||
return errTransitionTimeout
|
||||
}
|
||||
|
||||
// give more time for transition to happen
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) IsIdle() bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.desiredTransitions.Len() == 0 && s.subscribedTrack == nil
|
||||
}
|
||||
|
||||
func (s *SubscriptionMonitor) update() {
|
||||
var tx *transition
|
||||
if s.desiredTransitions.Len() > 0 {
|
||||
tx = s.desiredTransitions.PopFront().(*transition)
|
||||
}
|
||||
|
||||
if tx == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case tx.isSubscribed && s.subscribedTrack != nil:
|
||||
return
|
||||
case !tx.isSubscribed && s.subscribedTrack == nil:
|
||||
return
|
||||
default:
|
||||
// put it back as the condition is not satisfied
|
||||
s.desiredTransitions.PushFront(tx)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -100,20 +100,35 @@ func TestNegotiationTiming(t *testing.T) {
|
||||
// initial offer
|
||||
transportA.Negotiate(true)
|
||||
require.Eventually(t, func() bool {
|
||||
return negotiationState.Load().(NegotiationState) == NegotiationStateRemote
|
||||
state, ok := negotiationState.Load().(NegotiationState)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return state == NegotiationStateRemote
|
||||
}, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRemote")
|
||||
|
||||
// second try, should've flipped transport status to retry
|
||||
transportA.Negotiate(true)
|
||||
require.Eventually(t, func() bool {
|
||||
return negotiationState.Load().(NegotiationState) == NegotiationStateRetry
|
||||
state, ok := negotiationState.Load().(NegotiationState)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return state == NegotiationStateRetry
|
||||
}, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry")
|
||||
|
||||
// third try, should've stayed at retry
|
||||
transportA.Negotiate(true)
|
||||
time.Sleep(100 * time.Millisecond) // some time to process the negotiate event
|
||||
require.Eventually(t, func() bool {
|
||||
return negotiationState.Load().(NegotiationState) == NegotiationStateRetry
|
||||
state, ok := negotiationState.Load().(NegotiationState)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return state == NegotiationStateRetry
|
||||
}, 10*time.Second, 10*time.Millisecond, "negotiation state does not match NegotiateStateRetry")
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
@@ -419,3 +419,8 @@ type SubscribedTrack interface {
|
||||
// selects appropriate video layer according to subscriber preferences
|
||||
UpdateVideoLayer()
|
||||
}
|
||||
|
||||
type OperationMonitor interface {
|
||||
Check() error
|
||||
IsIdle() bool
|
||||
}
|
||||
|
||||
@@ -21,11 +21,6 @@ type FakeLocalParticipant struct {
|
||||
arg1 webrtc.ICECandidateInit
|
||||
arg2 livekit.SignalTarget
|
||||
}
|
||||
AddNegotiationPendingStub func(livekit.ParticipantID)
|
||||
addNegotiationPendingMutex sync.RWMutex
|
||||
addNegotiationPendingArgsForCall []struct {
|
||||
arg1 livekit.ParticipantID
|
||||
}
|
||||
AddSubscribedTrackStub func(types.SubscribedTrack)
|
||||
addSubscribedTrackMutex sync.RWMutex
|
||||
addSubscribedTrackArgsForCall []struct {
|
||||
@@ -342,17 +337,6 @@ type FakeLocalParticipant struct {
|
||||
identityReturnsOnCall map[int]struct {
|
||||
result1 livekit.ParticipantIdentity
|
||||
}
|
||||
IsNegotiationPendingStub func(livekit.ParticipantID) bool
|
||||
isNegotiationPendingMutex sync.RWMutex
|
||||
isNegotiationPendingArgsForCall []struct {
|
||||
arg1 livekit.ParticipantID
|
||||
}
|
||||
isNegotiationPendingReturns struct {
|
||||
result1 bool
|
||||
}
|
||||
isNegotiationPendingReturnsOnCall map[int]struct {
|
||||
result1 bool
|
||||
}
|
||||
IsPublisherStub func() bool
|
||||
isPublisherMutex sync.RWMutex
|
||||
isPublisherArgsForCall []struct {
|
||||
@@ -796,38 +780,6 @@ func (fake *FakeLocalParticipant) AddICECandidateArgsForCall(i int) (webrtc.ICEC
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) AddNegotiationPending(arg1 livekit.ParticipantID) {
|
||||
fake.addNegotiationPendingMutex.Lock()
|
||||
fake.addNegotiationPendingArgsForCall = append(fake.addNegotiationPendingArgsForCall, struct {
|
||||
arg1 livekit.ParticipantID
|
||||
}{arg1})
|
||||
stub := fake.AddNegotiationPendingStub
|
||||
fake.recordInvocation("AddNegotiationPending", []interface{}{arg1})
|
||||
fake.addNegotiationPendingMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.AddNegotiationPendingStub(arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) AddNegotiationPendingCallCount() int {
|
||||
fake.addNegotiationPendingMutex.RLock()
|
||||
defer fake.addNegotiationPendingMutex.RUnlock()
|
||||
return len(fake.addNegotiationPendingArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) AddNegotiationPendingCalls(stub func(livekit.ParticipantID)) {
|
||||
fake.addNegotiationPendingMutex.Lock()
|
||||
defer fake.addNegotiationPendingMutex.Unlock()
|
||||
fake.AddNegotiationPendingStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) AddNegotiationPendingArgsForCall(i int) livekit.ParticipantID {
|
||||
fake.addNegotiationPendingMutex.RLock()
|
||||
defer fake.addNegotiationPendingMutex.RUnlock()
|
||||
argsForCall := fake.addNegotiationPendingArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) AddSubscribedTrack(arg1 types.SubscribedTrack) {
|
||||
fake.addSubscribedTrackMutex.Lock()
|
||||
fake.addSubscribedTrackArgsForCall = append(fake.addSubscribedTrackArgsForCall, struct {
|
||||
@@ -2488,67 +2440,6 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPending(arg1 livekit.ParticipantID) bool {
|
||||
fake.isNegotiationPendingMutex.Lock()
|
||||
ret, specificReturn := fake.isNegotiationPendingReturnsOnCall[len(fake.isNegotiationPendingArgsForCall)]
|
||||
fake.isNegotiationPendingArgsForCall = append(fake.isNegotiationPendingArgsForCall, struct {
|
||||
arg1 livekit.ParticipantID
|
||||
}{arg1})
|
||||
stub := fake.IsNegotiationPendingStub
|
||||
fakeReturns := fake.isNegotiationPendingReturns
|
||||
fake.recordInvocation("IsNegotiationPending", []interface{}{arg1})
|
||||
fake.isNegotiationPendingMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPendingCallCount() int {
|
||||
fake.isNegotiationPendingMutex.RLock()
|
||||
defer fake.isNegotiationPendingMutex.RUnlock()
|
||||
return len(fake.isNegotiationPendingArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPendingCalls(stub func(livekit.ParticipantID) bool) {
|
||||
fake.isNegotiationPendingMutex.Lock()
|
||||
defer fake.isNegotiationPendingMutex.Unlock()
|
||||
fake.IsNegotiationPendingStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPendingArgsForCall(i int) livekit.ParticipantID {
|
||||
fake.isNegotiationPendingMutex.RLock()
|
||||
defer fake.isNegotiationPendingMutex.RUnlock()
|
||||
argsForCall := fake.isNegotiationPendingArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPendingReturns(result1 bool) {
|
||||
fake.isNegotiationPendingMutex.Lock()
|
||||
defer fake.isNegotiationPendingMutex.Unlock()
|
||||
fake.IsNegotiationPendingStub = nil
|
||||
fake.isNegotiationPendingReturns = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsNegotiationPendingReturnsOnCall(i int, result1 bool) {
|
||||
fake.isNegotiationPendingMutex.Lock()
|
||||
defer fake.isNegotiationPendingMutex.Unlock()
|
||||
fake.IsNegotiationPendingStub = nil
|
||||
if fake.isNegotiationPendingReturnsOnCall == nil {
|
||||
fake.isNegotiationPendingReturnsOnCall = make(map[int]struct {
|
||||
result1 bool
|
||||
})
|
||||
}
|
||||
fake.isNegotiationPendingReturnsOnCall[i] = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsPublisher() bool {
|
||||
fake.isPublisherMutex.Lock()
|
||||
ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)]
|
||||
@@ -4814,8 +4705,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.invocationsMutex.RUnlock()
|
||||
fake.addICECandidateMutex.RLock()
|
||||
defer fake.addICECandidateMutex.RUnlock()
|
||||
fake.addNegotiationPendingMutex.RLock()
|
||||
defer fake.addNegotiationPendingMutex.RUnlock()
|
||||
fake.addSubscribedTrackMutex.RLock()
|
||||
defer fake.addSubscribedTrackMutex.RUnlock()
|
||||
fake.addSubscriberMutex.RLock()
|
||||
@@ -4884,8 +4773,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.iDMutex.RUnlock()
|
||||
fake.identityMutex.RLock()
|
||||
defer fake.identityMutex.RUnlock()
|
||||
fake.isNegotiationPendingMutex.RLock()
|
||||
defer fake.isNegotiationPendingMutex.RUnlock()
|
||||
fake.isPublisherMutex.RLock()
|
||||
defer fake.isPublisherMutex.RUnlock()
|
||||
fake.isReadyMutex.RLock()
|
||||
|
||||
@@ -288,7 +288,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok {
|
||||
_ = sigConn.WriteResponse(&livekit.SignalResponse{
|
||||
Message: &livekit.SignalResponse_Pong{
|
||||
Pong: 1,
|
||||
Pong: time.Now().UnixNano(),
|
||||
},
|
||||
})
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user