Add optional supervisor disable. (#2277)

* Add optional supervisor disable.

Used `DisableSupervisor` so that default can be enabled and
it can be disabled explicity. But, open to defaulting to disable
(i. e. change param to `EnableSupervisor`).

* Move nil check to call site
This commit is contained in:
Raja Subramanian
2023-11-30 13:04:31 +05:30
committed by GitHub
parent 0f1c1ec224
commit 2ee5aa7c98
2 changed files with 31 additions and 11 deletions
+30 -11
View File
@@ -117,6 +117,7 @@ type ParticipantParams struct {
AllowUDPUnstableFallback bool
TURNSEnabled bool
GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo
DisableSupervisor bool
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
ReconnectOnDataChannelError bool
@@ -241,11 +242,13 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
params.SID,
params.Telemetry),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
}
if !params.DisableSupervisor {
p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger})
}
p.version.Store(params.InitialVersion)
p.timedVersion.Update(params.VersionGenerator.New())
p.migrateState.Store(types.MigrateStateInit)
@@ -255,7 +258,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.SetResponseSink(params.Sink)
p.setupEnabledCodecs(params.PublishEnabledCodecs, params.SubscribeEnabledCodecs, params.ClientConf.GetDisabledCodecs())
p.supervisor.OnPublicationError(p.onPublicationError)
if p.supervisor != nil {
p.supervisor.OnPublicationError(p.onPublicationError)
}
var err error
// keep last participants and when updates were sent
@@ -712,8 +717,10 @@ func (p *ParticipantImpl) SetMigrateInfo(
for _, t := range mediaTracks {
ti := t.GetTrack()
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.supervisor != nil {
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
}
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true}
p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", logger.Proto(ti))
@@ -756,7 +763,9 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
})
}
p.supervisor.Stop()
if p.supervisor != nil {
p.supervisor.Stop()
}
p.pendingTracksLock.Lock()
p.pendingTracks = make(map[string]*pendingTrackInfo)
@@ -1393,7 +1402,9 @@ func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.
}
func (p *ParticipantImpl) onPublisherInitialConnected() {
p.supervisor.SetPublisherPeerConnectionConnected(true)
if p.supervisor != nil {
p.supervisor.SetPublisherPeerConnectionConnected(true)
}
go p.publisherRTCPWorker()
}
@@ -1663,8 +1674,10 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}
p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti)
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.supervisor != nil {
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
}
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil {
if p.pendingTracks[req.Cid] == nil {
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
@@ -1716,7 +1729,9 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo {
p.dirty.Store(true)
p.supervisor.SetPublicationMute(trackID, muted)
if p.supervisor != nil {
p.supervisor.SetPublicationMute(trackID, muted)
}
track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted)
var trackInfo *livekit.TrackInfo
@@ -1901,7 +1916,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
// add to published and clean up pending
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
if p.supervisor != nil {
p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt)
}
p.UpTrackManager.AddPublishedTrack(mt)
pti := p.pendingTracks[signalCid]
@@ -1922,7 +1939,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
trackID := livekit.TrackID(ti.Sid)
mt.AddOnClose(func() {
p.supervisor.ClearPublishedTrack(trackID, mt)
if p.supervisor != nil {
p.supervisor.ClearPublishedTrack(trackID, mt)
}
// not logged when closing
p.params.Telemetry.TrackUnpublished(
+1
View File
@@ -544,6 +544,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
"extLastRRSN", s.extLastRRSN,
"firstTime", r.firstTime.String(),
"startTime", r.startTime.String(),
"highestTime", r.highestTime.String(),
"extReceivedRRSN", extReceivedRRSN,
"packetsInInterval", extReceivedRRSN-s.extLastRRSN,
"intervalStats", is.ToString(),