diff --git a/cmd/server/main.go b/cmd/server/main.go index 58664107c..4d37142fa 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -202,7 +202,7 @@ func getConfig(c *cli.Context) (*config.Config, error) { if err != nil { return nil, err } - config.InitLoggerFromConfig(conf.Logging) + config.InitLoggerFromConfig(&conf.Logging) if c.String("config") == "" && c.String("config-body") == "" && conf.Development { // use single port UDP when no config is provided diff --git a/config-sample.yaml b/config-sample.yaml index 1e44c6c2b..aa2b01889 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -156,7 +156,6 @@ keys: # playout_delay: # enabled: true # min: 100 -# max: 300 # Webhooks # when configured, LiveKit notifies your URL handler with room events diff --git a/go.mod b/go.mod index 6bde0c1ee..b2d5ba870 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a - github.com/livekit/protocol v1.5.11-0.20230729124740-d45d830f69e2 + github.com/livekit/protocol v1.6.0 github.com/livekit/psrpc v0.3.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index d038f6bd6..5dfd6e6be 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A= github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= -github.com/livekit/protocol v1.5.11-0.20230729124740-d45d830f69e2 h1:KxQIooCpXmn+qzxQxNbxBtRXstEFd2/7ihH4Pp1dOc4= -github.com/livekit/protocol v1.5.11-0.20230729124740-d45d830f69e2/go.mod h1:3Dt53NrYnuA7pAJjAjXLJ2q5rU3JKoebvMttZPZWDH8= +github.com/livekit/protocol v1.6.0 h1:19S+vFZqnivKIOpyR3DEK/mSaykQ3UEf7H2G/mBOE54= +github.com/livekit/protocol v1.6.0/go.mod h1:SUS9foM1xBzw/AFrgTJuFX/oSuwlnIbHmpdiPdCvwEM= github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/config/config.go b/pkg/config/config.go index 923bab6e7..6a7a57943 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,7 +29,6 @@ import ( "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/logger/pionlogger" redisLiveKit "github.com/livekit/protocol/redis" ) @@ -200,7 +199,6 @@ type StreamTrackersConfig struct { type PlayoutDelayConfig struct { Enabled bool `yaml:"enabled,omitempty"` Min int `yaml:"min,omitempty"` - Max int `yaml:"max,omitempty"` } type VideoConfig struct { @@ -526,6 +524,13 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c if conf.Logging.Level == "" && conf.Development { conf.Logging.Level = "debug" } + if conf.Logging.PionLevel != "" { + if conf.Logging.ComponentLevels == nil { + conf.Logging.ComponentLevels = map[string]string{} + } + conf.Logging.ComponentLevels["transport.pion"] = conf.Logging.PionLevel + conf.Logging.ComponentLevels["pion"] = conf.Logging.PionLevel + } if conf.Development { conf.Environment = "dev" @@ -819,7 +824,6 @@ func SetLogger(l logger.Logger) { logger.SetLogger(l, "livekit") } -func InitLoggerFromConfig(config LoggingConfig) { - pionlogger.SetLogLevel(config.PionLevel) - logger.InitFromConfig(config.Config, "livekit") +func InitLoggerFromConfig(config *LoggingConfig) { + logger.InitFromConfig(&config.Config, "livekit") } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 378c235b7..552fc06ce 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -22,6 +22,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/atomic" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -133,7 +134,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * PlayoutDelayLimit: sub.GetPlayoutDelayConfig(), Pacer: sub.GetPacer(), Trailer: trailer, - Logger: LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed), + Logger: LoggerWithTrack(sub.GetLogger().WithComponent(sutils.ComponentSub), trackID, t.params.IsRelayed), }) if err != nil { return nil, err diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 490a6e4a9..75af8b01c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -41,6 +41,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -187,6 +188,10 @@ type ParticipantImpl struct { supervisor *supervisor.ParticipantSupervisor tracksQuality map[livekit.TrackID]livekit.ConnectionQuality + + // loggers for publisher and subscriber + pubLogger logger.Logger + subLogger logger.Logger } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -213,6 +218,8 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { params.Telemetry), supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), + pubLogger: params.Logger.WithComponent(sutils.ComponentPub), + subLogger: params.Logger.WithComponent(sutils.ComponentSub), } p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) @@ -549,7 +556,7 @@ func (p *ParticipantImpl) HandleSignalSourceClose() { // HandleOffer an offer from remote participant, used when clients make the initial connection func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { - p.params.Logger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER) + p.pubLogger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER) shouldPend := false if p.MigrateState() == types.MigrateStateInit { shouldPend = true @@ -563,7 +570,7 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { // HandleAnswer handles a client answer response, with subscriber PC, server initiates the // offer and client answers func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { - p.params.Logger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) + p.subLogger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) /* from server received join request to client answer * 1. server send join response & offer @@ -581,7 +588,7 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) er return nil } - p.params.Logger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER) + p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER) answer = p.configurePublisherAnswer(answer) if err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ @@ -607,7 +614,7 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { } if len(pti.trackInfos) > 1 { - p.params.Logger.Warnw("too many pending migrated tracks", nil, "trackID", pti.trackInfos[0].Sid, "count", len(pti.trackInfos), "cid", cid) + p.pubLogger.Warnw("too many pending migrated tracks", nil, "trackID", pti.trackInfos[0].Sid, "count", len(pti.trackInfos), "cid", cid) } ti := pti.trackInfos[0] @@ -616,7 +623,7 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { if mt != nil { addedTracks = append(addedTracks, mt) } else { - p.params.Logger.Warnw("could not find migrated muted track", nil, "cid", cid) + p.pubLogger.Warnw("could not find migrated muted track", nil, "cid", cid) } } } @@ -652,7 +659,7 @@ func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) { // records track details and lets client know it's ok to proceed func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { if !p.CanPublishSource(req.Source) { - p.params.Logger.Warnw("no permission to publish track", nil) + p.pubLogger.Warnw("no permission to publish track", nil) return } @@ -679,7 +686,7 @@ func (p *ParticipantImpl) SetMigrateInfo( p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} - p.params.Logger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", ti.String()) + p.pubLogger.Infow("pending track added (migration)", "trackID", ti.Sid, "track", ti.String()) } p.pendingTracksLock.Unlock() @@ -806,7 +813,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { return } // TODO: change to debug once we are confident - p.params.Logger.Infow("closing subscriber peer connection to aid migration") + p.subLogger.Infow("closing subscriber peer connection to aid migration") // // Close all down tracks before closing subscriber peer connection. @@ -1053,7 +1060,7 @@ func (p *ParticipantImpl) onTrackUnsubscribed(subTrack types.SubscribedTrack) { } func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) { - p.params.Logger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) + p.subLogger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{ SubscriptionPermissionUpdate: &livekit.SubscriptionPermissionUpdate{ @@ -1064,7 +1071,7 @@ func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.Parti }, }) if err != nil { - p.params.Logger.Errorw("could not send subscription permission update", err) + p.subLogger.Errorw("could not send subscription permission update", err) } } @@ -1106,7 +1113,7 @@ func (p *ParticipantImpl) setupTransportManager() error { AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback, TURNSEnabled: p.params.TURNSEnabled, AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled() && p.SupportSyncStreamID(), - Logger: p.params.Logger, + Logger: p.params.Logger.WithComponent(sutils.ComponentTransport), }) if err != nil { return err @@ -1161,7 +1168,7 @@ func (p *ParticipantImpl) setupTransportManager() error { func (p *ParticipantImpl) setupUpTrackManager() { p.UpTrackManager = NewUpTrackManager(UpTrackManagerParams{ SID: p.params.SID, - Logger: p.params.Logger, + Logger: p.pubLogger, VersionGenerator: p.params.VersionGenerator, }) @@ -1182,7 +1189,7 @@ func (p *ParticipantImpl) setupUpTrackManager() { func (p *ParticipantImpl) setupSubscriptionManager() { p.SubscriptionManager = NewSubscriptionManager(SubscriptionManagerParams{ Participant: p, - Logger: p.params.Logger.WithoutSampler(), + Logger: p.subLogger.WithoutSampler(), TrackResolver: p.params.TrackResolver, Telemetry: p.params.Telemetry, OnTrackSubscribed: p.onTrackSubscribed, @@ -1243,7 +1250,7 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) { // when the server has an offer for participant func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) error { - p.params.Logger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER) + p.subLogger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER) return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), @@ -1269,7 +1276,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w publishedTrack, isNewTrack := p.mediaTrackReceived(track, rtpReceiver) if publishedTrack == nil { - p.params.Logger.Warnw("webrtc Track published but can't find MediaTrack", nil, + p.pubLogger.Warnw("webrtc Track published but can't find MediaTrack", nil, "kind", track.Kind().String(), "webrtcTrackID", track.ID(), "rid", track.RID(), @@ -1280,7 +1287,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } if !p.CanPublishSource(publishedTrack.Source()) { - p.params.Logger.Warnw("no permission to publish mediaTrack", nil, + p.pubLogger.Warnw("no permission to publish mediaTrack", nil, "source", publishedTrack.Source(), ) p.removePublishedTrack(publishedTrack) @@ -1290,7 +1297,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w p.setIsPublisher(true) p.dirty.Store(true) - p.params.Logger.Infow("mediaTrack published", + p.pubLogger.Infow("mediaTrack published", "kind", track.Kind().String(), "trackID", publishedTrack.ID(), "webrtcTrackID", track.ID(), @@ -1318,7 +1325,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt dp := livekit.DataPacket{} if err := proto.Unmarshal(data, &dp); err != nil { - p.params.Logger.Warnw("could not parse data packet", err) + p.pubLogger.Warnw("could not parse data packet", err) return } @@ -1336,7 +1343,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt onDataPacket(p, &dp) } default: - p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload) + p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload) } p.setIsPublisher(true) @@ -1445,7 +1452,7 @@ func (p *ParticipantImpl) subscriberRTCPWorker() { if IsEOF(err) { return } - p.params.Logger.Errorw("could not send down track reports", err) + p.subLogger.Errorw("could not send down track reports", err) } pkts = pkts[:0] @@ -1462,7 +1469,7 @@ func (p *ParticipantImpl) subscriberRTCPWorker() { if IsEOF(err) { return } - p.params.Logger.Errorw("could not send down track reports", err) + p.subLogger.Errorw("could not send down track reports", err) } } @@ -1545,7 +1552,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, ) } - p.params.Logger.Infow( + p.pubLogger.Infow( "sending max subscribed quality", "trackID", trackID, "qualities", subscribedQualities, @@ -1565,7 +1572,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l if req.Sid != "" { track := p.GetPublishedTrack(livekit.TrackID(req.Sid)) if track == nil { - p.params.Logger.Infow("could not find existing track for multi-codec simulcast", "trackID", req.Sid) + p.pubLogger.Infow("could not find existing track for multi-codec simulcast", "trackID", req.Sid) return nil } @@ -1616,17 +1623,17 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } else { p.pendingTracks[req.Cid].trackInfos = append(p.pendingTracks[req.Cid].trackInfos, ti) } - p.params.Logger.Infow("pending track queued", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) + p.pubLogger.Infow("pending track queued", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) return nil } p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} - p.params.Logger.Infow("pending track added", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) + p.pubLogger.Infow("pending track added", "trackID", ti.Sid, "track", ti.String(), "request", req.String()) return ti } func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) { - p.params.Logger.Debugw("sending track published", "cid", cid, "trackInfo", ti.String()) + p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", ti.String()) _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_TrackPublished{ TrackPublished: &livekit.TrackPublishedResponse{ @@ -1678,7 +1685,7 @@ func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) { } if !isPending && track == nil { - p.params.Logger.Warnw("could not locate track", nil, "trackID", trackID) + p.pubLogger.Warnw("could not locate track", nil, "trackID", trackID) } } @@ -1686,7 +1693,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei p.pendingTracksLock.Lock() newTrack := false - p.params.Logger.Debugw( + p.pubLogger.Debugw( "media track received", "kind", track.Kind().String(), "trackID", track.ID(), @@ -1696,7 +1703,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei ) mid := p.TransportManager.GetPublisherMid(rtpReceiver) if mid == "" { - p.params.Logger.Warnw("could not get mid for track", nil, "trackID", track.ID()) + p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID()) return nil, false } @@ -1735,10 +1742,10 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei } func (p *ParticipantImpl) addMigrateMutedTrack(cid string, ti *livekit.TrackInfo) *MediaTrack { - p.params.Logger.Infow("add migrate muted track", "cid", cid, "trackID", ti.Sid, "track", ti.String()) + p.pubLogger.Infow("add migrate muted track", "cid", cid, "trackID", ti.Sid, "track", ti.String()) rtpReceiver := p.TransportManager.GetPublisherRTPReceiver(ti.Mid) if rtpReceiver == nil { - p.params.Logger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid) + p.pubLogger.Errorw("could not find receiver for migrated track", nil, "trackID", ti.Sid) return nil } @@ -1801,7 +1808,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv AudioConfig: p.params.AudioConfig, VideoConfig: p.params.VideoConfig, Telemetry: p.params.Telemetry, - Logger: LoggerWithTrack(p.params.Logger, livekit.TrackID(ti.Sid), false), + Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false), SubscriberConfig: p.params.Config.Subscriber, PLIThrottleConfig: p.params.PLIThrottleConfig, SimTracks: p.params.SimTracks, @@ -1816,7 +1823,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv pti := p.pendingTracks[signalCid] if pti != nil { if p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] != nil { - p.params.Logger.Infow("unexpected pending publish track", "trackID", ti.Sid) + p.pubLogger.Infow("unexpected pending publish track", "trackID", ti.Sid) } p.pendingPublishingTracks[livekit.TrackID(ti.Sid)] = &pendingTrackInfo{ trackInfos: []*livekit.TrackInfo{pti.trackInfos[0]}, @@ -1855,7 +1862,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv if !p.IsClosed() { // unpublished events aren't necessary when participant is closed - p.params.Logger.Infow("unpublished track", "trackID", ti.Sid, "trackInfo", ti) + p.pubLogger.Infow("unpublished track", "trackID", ti.Sid, "trackInfo", ti) p.lock.RLock() onTrackUnpublished := p.onTrackUnpublished p.lock.RUnlock() @@ -1952,7 +1959,7 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp // if still not found, we are done if pendingInfo == nil { - p.params.Logger.Errorw("track info not published prior to track", nil, "clientId", clientId) + p.pubLogger.Errorw("track info not published prior to track", nil, "clientId", clientId) return signalCid, nil } @@ -2030,7 +2037,7 @@ func (p *ParticipantImpl) getPublishedTrackBySignalCid(clientId string) types.Me func (p *ParticipantImpl) getPublishedTrackBySdpCid(clientId string) types.MediaTrack { for _, publishedTrack := range p.GetPublishedTracks() { if publishedTrack.(types.LocalMediaTrack).HasSdpCid(clientId) { - p.params.Logger.Debugw("found track by sdp cid", "sdpCid", clientId, "trackID", publishedTrack.ID()) + p.pubLogger.Debugw("found track by SDP cid", "sdpCid", clientId, "trackID", publishedTrack.ID()) return publishedTrack } } @@ -2048,13 +2055,13 @@ func (p *ParticipantImpl) publisherRTCPWorker() { // read from rtcpChan for pkts := range p.rtcpCh { if pkts == nil { - p.params.Logger.Debugw("exiting publisher RTCP worker") + p.pubLogger.Debugw("exiting publisher RTCP worker") return } if err := p.TransportManager.WritePublisherRTCP(pkts); err != nil { if !IsEOF(err) { - p.params.Logger.Errorw("could not write RTCP to participant", err) + p.pubLogger.Errorw("could not write RTCP to participant", err) } } } @@ -2106,7 +2113,7 @@ func (p *ParticipantImpl) setDowntracksConnected() { func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrack sfu.DownTrackState) { p.lock.Lock() if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver { - p.params.Logger.Infow("cached transceiver changed", "trackID", trackID) + p.subLogger.Infow("cached transceiver changed", "trackID", trackID) } p.cachedDownTracks[trackID] = &downTrackState{transceiver: rtpTransceiver, downTrack: downTrack} p.lock.Unlock() @@ -2162,7 +2169,7 @@ func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason func (p *ParticipantImpl) onPublicationError(trackID livekit.TrackID) { if p.params.ReconnectOnPublicationError { - p.params.Logger.Infow("issuing full reconnect on publication error", "trackID", trackID) + p.pubLogger.Infow("issuing full reconnect on publication error", "trackID", trackID) p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError) } } @@ -2186,7 +2193,7 @@ func (p *ParticipantImpl) onSubscriptionError(trackID livekit.TrackID, fatal boo }) if p.params.ReconnectOnSubscriptionError && fatal { - p.params.Logger.Infow("issuing full reconnect on subscription error", "trackID", trackID) + p.subLogger.Infow("issuing full reconnect on subscription error", "trackID", trackID) p.IssueFullReconnect(types.ParticipantCloseReasonSubscriptionError) } } @@ -2201,7 +2208,7 @@ func (p *ParticipantImpl) onAnyTransportNegotiationFailed() { func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error { track := p.GetPublishedTrack(trackID) if track == nil { - p.params.Logger.Warnw("could not find track", nil, "trackID", trackID) + p.pubLogger.Warnw("could not find track", nil, "trackID", trackID) return errors.New("could not find published track") } @@ -2212,7 +2219,7 @@ func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID func (p *ParticipantImpl) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error { track := p.GetPublishedTrack(trackID) if track == nil { - p.params.Logger.Warnw("could not find track", nil, "trackID", trackID) + p.pubLogger.Warnw("could not find track", nil, "trackID", trackID) return errors.New("could not find published track") } diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index fc11324d6..2d24f15e3 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -53,7 +53,7 @@ func (p *ParticipantImpl) setCodecPreferencesOpusRedForPublisher(offer webrtc.Se codecs, err := codecsFromMediaDescription(unmatchAudio) if err != nil { - p.params.Logger.Errorw("extract codecs from media section failed", err, "media", unmatchAudio) + p.pubLogger.Errorw("extract codecs from media section failed", err, "media", unmatchAudio) continue } @@ -104,7 +104,7 @@ func (p *ParticipantImpl) setCodecPreferencesOpusRedForPublisher(offer webrtc.Se bytes, err := parsed.Marshal() if err != nil { - p.params.Logger.Errorw("failed to marshal offer", err) + p.pubLogger.Errorw("failed to marshal offer", err) return offer } @@ -166,7 +166,7 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess if mime != "" { codecs, err := codecsFromMediaDescription(unmatchVideo) if err != nil { - p.params.Logger.Errorw("extract codecs from media section failed", err, "media", unmatchVideo) + p.pubLogger.Errorw("extract codecs from media section failed", err, "media", unmatchVideo) continue } @@ -186,7 +186,7 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess bytes, err := parsed.Marshal() if err != nil { - p.params.Logger.Errorw("failed to marshal offer", err) + p.pubLogger.Errorw("failed to marshal offer", err) return offer } @@ -252,7 +252,7 @@ func (p *ParticipantImpl) configurePublisherAnswer(answer webrtc.SessionDescript opusPT, err := parsed.GetPayloadTypeForCodec(sdp.Codec{Name: "opus"}) if err != nil { - p.params.Logger.Infow("failed to get opus payload type", "error", err, "trakcID", ti.Sid) + p.pubLogger.Infow("failed to get opus payload type", "error", err, "trakcID", ti.Sid) continue } @@ -275,7 +275,7 @@ func (p *ParticipantImpl) configurePublisherAnswer(answer webrtc.SessionDescript bytes, err := parsed.Marshal() if err != nil { - p.params.Logger.Infow("failed to marshal answer", "error", err) + p.pubLogger.Infow("failed to marshal answer", "error", err) return answer } answer.SDP = string(bytes) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index fa6a285ae..f153d27ee 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -113,9 +114,13 @@ func NewRoom( egressLauncher EgressLauncher, ) *Room { r := &Room{ - protoRoom: proto.Clone(room).(*livekit.Room), - internal: internal, - Logger: LoggerWithRoom(logger.GetLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)), + protoRoom: proto.Clone(room).(*livekit.Room), + internal: internal, + Logger: LoggerWithRoom( + logger.GetLogger().WithComponent(sutils.ComponentRoom), + livekit.RoomName(room.Name), + livekit.RoomID(room.Sid), + ), config: config, audioConfig: audioConfig, telemetry: telemetry, diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 0fb46b0af..0e31e4772 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -24,7 +24,6 @@ import ( "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/config" @@ -43,9 +42,7 @@ const ( ) func init() { - config.InitLoggerFromConfig(config.LoggingConfig{ - Config: logger.Config{Level: "debug"}, - }) + config.InitLoggerFromConfig(&config.DefaultConfig.Logging) // allow immediate closure in testing RoomDepartureGrace = 1 roomUpdateInterval = defaultDelay diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 10bf25fcb..fd1eb7f94 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -22,6 +22,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/atomic" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -64,7 +65,7 @@ type SubscribedTrack struct { func NewSubscribedTrack(params SubscribedTrackParams) *SubscribedTrack { s := &SubscribedTrack{ params: params, - logger: params.Subscriber.GetLogger().WithValues( + logger: params.Subscriber.GetLogger().WithComponent(sutils.ComponentSub).WithValues( "trackID", params.DownTrack.ID(), "publisherID", params.PublisherID, "publisher", params.PublisherIdentity, @@ -211,18 +212,15 @@ func (t *SubscribedTrack) UpdateVideoLayer() { } settings := t.settings.Load() - if settings == nil { + if settings == nil || settings.Disabled { return } t.logger.Debugw("updating video layer", "settings", settings) - - if settings.Width > 0 || settings.Fps > 0 { - spatial := t.spatialLayerFromSettings(settings) - t.DownTrack().SetMaxSpatialLayer(spatial) - if settings.Fps > 0 { - t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType)) - } + spatial := t.spatialLayerFromSettings(settings) + t.DownTrack().SetMaxSpatialLayer(spatial) + if settings.Fps > 0 { + t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType)) } } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 65698500f..29e87c631 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -34,6 +34,7 @@ import ( "github.com/pkg/errors" "go.uber.org/atomic" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/logger/pionlogger" @@ -391,7 +392,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { if params.IsSendSide { t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{ Config: params.CongestionControlConfig, - Logger: params.Logger, + Logger: params.Logger.WithComponent(sutils.ComponentCongestionControl), }) t.streamAllocator.Start() t.pacer = pacer.NewPassThrough(params.Logger) diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index f07545c52..e7fbf878f 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -85,6 +85,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if req.Egress != nil && req.Egress.Tracks != nil { internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} } + if req.MinPlayoutDelay > 0 { + rm.PlayoutDelay = &livekit.PlayoutDelay{ + Enabled: true, + Min: req.MinPlayoutDelay, + } + } if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { return nil, err @@ -154,6 +160,5 @@ func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) { room.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: conf.PlayoutDelay.Enabled, Min: uint32(conf.PlayoutDelay.Min), - Max: uint32(conf.PlayoutDelay.Max), } } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5834a3805..2dec20064 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -335,7 +335,11 @@ func (r *RoomManager) StartSession( rtcConf := *r.rtcConfig rtcConf.SetBufferFactory(room.GetBufferFactory()) sid := livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix)) - pLogger := rtc.LoggerWithParticipant(room.Logger, pi.Identity, sid, false) + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), + pi.Identity, + sid, + false) // default allow forceTCP allowFallback := true if r.config.RTC.AllowTCPFallback != nil { diff --git a/pkg/service/server.go b/pkg/service/server.go index 69191ff81..693315994 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -37,6 +37,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -102,7 +103,7 @@ func NewLivekitServer(conf *config.Config, middlewares = append(middlewares, NewAPIKeyAuthMiddleware(keyProvider)) } - twirpLoggingHook := TwirpLogger(logger.GetLogger()) + twirpLoggingHook := TwirpLogger(logger.GetLogger().WithComponent(sutils.ComponentAPI)) twirpRequestStatusHook := TwirpRequestStatusReporter() roomServer := livekit.NewRoomServiceServer(roomService, twirpLoggingHook) egressServer := livekit.NewEgressServer(egressService, twirp.WithServerHooks( diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index fa7bb8352..2cfe68567 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -30,6 +30,7 @@ import ( "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/audio" + sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/bucket" "github.com/livekit/mediatransportutil/pkg/nack" @@ -124,7 +125,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { videoPool: vp, audioPool: ap, pliThrottle: int64(500 * time.Millisecond), - logger: l, + logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU), } b.extPackets.SetMinCapacity(7) return b @@ -134,9 +135,9 @@ func (b *Buffer) SetLogger(logger logger.Logger) { b.Lock() defer b.Unlock() - b.logger = logger + b.logger = logger.WithComponent(sutils.ComponentSFU) if b.rtpStats != nil { - b.rtpStats.SetLogger(logger) + b.rtpStats.SetLogger(b.logger) } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e402b51f8..5ab7b0cfb 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -294,7 +294,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { kind: kind, codec: codecs[0].RTPCodecCapability, pacer: params.Pacer, - maxLayerNotifierCh: make(chan struct{}, 20), + maxLayerNotifierCh: make(chan struct{}, 1), } d.forwarder = NewForwarder( d.kind, @@ -330,7 +330,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { if d.params.PlayoutDelayLimit.GetEnabled() && d.params.PlayoutDelayLimit.GetMin() > 0 { delay := rtpextension.PlayoutDelayFromValue( uint16(d.params.PlayoutDelayLimit.GetMin()), - uint16(d.params.PlayoutDelayLimit.GetMax()), + rtpextension.PlayoutDelayDefaultMax, ) b, err := delay.Marshal() if err == nil { @@ -612,14 +612,13 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } func (d *DownTrack) postMaxLayerNotifierEvent() { - if d.IsClosed() { + if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo { return } select { case d.maxLayerNotifierCh <- struct{}{}: default: - d.params.Logger.Warnw("max layer notifier event queue full", nil) } } @@ -633,7 +632,7 @@ func (d *DownTrack) maxLayerNotifierWorker() { maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial() } if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil { - d.params.Logger.Infow("max subscribed layer changed", "maxLayerSpatial", maxLayerSpatial) + d.params.Logger.Debugw("max subscribed layer changed", "maxLayerSpatial", maxLayerSpatial) onMaxSubscribedLayerChanged(d, maxLayerSpatial) } } diff --git a/pkg/sfu/rtpextension/playoutdelay.go b/pkg/sfu/rtpextension/playoutdelay.go index 2cb719269..d42322be7 100644 --- a/pkg/sfu/rtpextension/playoutdelay.go +++ b/pkg/sfu/rtpextension/playoutdelay.go @@ -6,7 +6,8 @@ import ( ) const ( - PlayoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay" + PlayoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay" + PlayoutDelayDefaultMax = 4000 // 4s playoutDelayExtensionSize = 3 ) diff --git a/pkg/utils/logging.go b/pkg/utils/logging.go new file mode 100644 index 000000000..5e9a3db74 --- /dev/null +++ b/pkg/utils/logging.go @@ -0,0 +1,28 @@ +/* + * Copyright 2023 LiveKit, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +const ( + ComponentPub = "pub" + ComponentSub = "sub" + ComponentRoom = "room" + ComponentAPI = "api" + ComponentTransport = "transport" + ComponentSFU = "sfu" + // transport subcomponents + ComponentCongestionControl = "cc" +) diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 82f50f825..633c6a934 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -55,9 +55,7 @@ const ( var roomClient livekit.RoomService func init() { - config.InitLoggerFromConfig(config.LoggingConfig{ - Config: logger.Config{Level: "debug"}, - }) + config.InitLoggerFromConfig(&config.DefaultConfig.Logging) prometheus.Init("test", livekit.NodeType_SERVER, "test") }