diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8362932b7..6e45fcef9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -117,6 +117,7 @@ type ParticipantParams struct { AllowUDPUnstableFallback bool TURNSEnabled bool GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo + DisableSupervisor bool ReconnectOnPublicationError bool ReconnectOnSubscriptionError bool ReconnectOnDataChannelError bool @@ -241,11 +242,13 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), params.SID, params.Telemetry), - supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), pubLogger: params.Logger.WithComponent(sutils.ComponentPub), subLogger: params.Logger.WithComponent(sutils.ComponentSub), } + if !params.DisableSupervisor { + p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}) + } p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) p.migrateState.Store(types.MigrateStateInit) @@ -255,7 +258,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.SetResponseSink(params.Sink) p.setupEnabledCodecs(params.PublishEnabledCodecs, params.SubscribeEnabledCodecs, params.ClientConf.GetDisabledCodecs()) - p.supervisor.OnPublicationError(p.onPublicationError) + if p.supervisor != nil { + p.supervisor.OnPublicationError(p.onPublicationError) + } var err error // keep last participants and when updates were sent @@ -712,8 +717,10 @@ func (p *ParticipantImpl) SetMigrateInfo( for _, t := range mediaTracks { ti := t.GetTrack() - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", logger.Proto(ti)) @@ -756,7 +763,9 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea }) } - p.supervisor.Stop() + if p.supervisor != nil { + p.supervisor.Stop() + } p.pendingTracksLock.Lock() p.pendingTracks = make(map[string]*pendingTrackInfo) @@ -1393,7 +1402,9 @@ func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit. } func (p *ParticipantImpl) onPublisherInitialConnected() { - p.supervisor.SetPublisherPeerConnectionConnected(true) + if p.supervisor != nil { + p.supervisor.SetPublisherPeerConnectionConnected(true) + } go p.publisherRTCPWorker() } @@ -1663,8 +1674,10 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti) - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) - p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + if p.supervisor != nil { + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) + } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { if p.pendingTracks[req.Cid] == nil { p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} @@ -1716,7 +1729,9 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo { p.dirty.Store(true) - p.supervisor.SetPublicationMute(trackID, muted) + if p.supervisor != nil { + p.supervisor.SetPublicationMute(trackID, muted) + } track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted) var trackInfo *livekit.TrackInfo @@ -1901,7 +1916,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) // add to published and clean up pending - p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + if p.supervisor != nil { + p.supervisor.SetPublishedTrack(livekit.TrackID(ti.Sid), mt) + } p.UpTrackManager.AddPublishedTrack(mt) pti := p.pendingTracks[signalCid] @@ -1922,7 +1939,9 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv trackID := livekit.TrackID(ti.Sid) mt.AddOnClose(func() { - p.supervisor.ClearPublishedTrack(trackID, mt) + if p.supervisor != nil { + p.supervisor.ClearPublishedTrack(trackID, mt) + } // not logged when closing p.params.Telemetry.TrackUnpublished( diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index fd2e9dd58..1cb0ff1bd 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -544,6 +544,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt "extLastRRSN", s.extLastRRSN, "firstTime", r.firstTime.String(), "startTime", r.startTime.String(), + "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, "intervalStats", is.ToString(),