diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 4514320b5..6a2ed28e8 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -3,14 +3,12 @@ package serverlogger import ( "github.com/go-logr/logr" "github.com/go-logr/zapr" - "github.com/livekit/protocol/logger" "github.com/pion/logging" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/protocol/logger" ) var ( @@ -42,8 +40,6 @@ func (f *LoggerFactory) NewLogger(scope string) logging.LeveledLogger { // Note: only pass in logr.Logger with default depth func SetLogger(l logr.Logger) { logger.SetLogger(l, "livekit") - sfu.Logger = l.WithName("sfu") - buffer.Logger = sfu.Logger } func InitFromConfig(config config.LoggingConfig) { diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f85445d52..160318b01 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -214,6 +214,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra receiver, track, t.PublisherID(), + t.params.Logger, sfu.WithPliThrottle(0), sfu.WithLoadBalanceThreshold(20), sfu.WithStreamTrackers(), diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5b5e0c8ad..243560fa8 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/go-logr/logr" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "go.uber.org/atomic" @@ -67,7 +66,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC telemetry: telemetry, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), - bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize, logr.Logger{}), + bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize), closed: make(chan struct{}), } if r.Room.EmptyTimeout == 0 { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index cebaf47ee..c4f70154a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -9,7 +9,7 @@ import ( "time" "github.com/gammazero/deque" - "github.com/go-logr/logr" + "github.com/livekit/protocol/logger" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/sdp/v3" @@ -20,9 +20,6 @@ const ( ReportDelta = 1e9 ) -// Logger is an implementation of logr.Logger. If is not provided - will be turned off. -var Logger = logr.Discard() - type pendingPackets struct { arrivalTime int64 packet []byte @@ -92,7 +89,7 @@ type Buffer struct { feedbackTWCC func(sn uint16, timeNS int64, marker bool) // logger - logger logr.Logger + logger logger.Logger } type Stats struct { @@ -110,18 +107,22 @@ type Options struct { } // NewBuffer constructs a new Buffer -func NewBuffer(ssrc uint32, vp, ap *sync.Pool, logger logr.Logger) *Buffer { +func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { b := &Buffer{ mediaSSRC: ssrc, videoPool: vp, audioPool: ap, - logger: logger, + logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger } b.bitrate.Store(make([]int64, len(b.bitrateHelper))) b.extPackets.SetMinCapacity(7) return b } +func (b *Buffer) SetLogger(logger logger.Logger) { + b.logger = logger +} + func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) { b.Lock() defer b.Unlock() @@ -152,13 +153,13 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili for _, fb := range codec.RTCPFeedback { switch fb.Type { case webrtc.TypeRTCPFBGoogREMB: - b.logger.V(1).Info("Setting feedback", "type", "webrtc.TypeRTCPFBGoogREMB") + b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB) b.remb = true case webrtc.TypeRTCPFBTransportCC: - b.logger.V(1).Info("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC) + b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC) b.twcc = true case webrtc.TypeRTCPFBNACK: - b.logger.V(1).Info("Setting feedback", "type", webrtc.TypeRTCPFBNACK) + b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBNACK) b.nacker = NewNACKQueue() b.nack = true } @@ -178,7 +179,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.pPackets = nil b.bound = true - b.logger.V(1).Info("NewBuffer", "MaxBitRate", o.MaxBitRate) + b.logger.Infow("NewBuffer", "MaxBitRate", o.MaxBitRate) } // Write adds an RTP Packet, out of order, new packet may be arrived later diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 4f150ba22..72a563a62 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -37,10 +37,9 @@ func TestNack(t *testing.T) { return &b }, } - logger := Logger t.Run("nack normal", func(t *testing.T) { - buff := NewBuffer(123, pool, pool, logger) + buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo assert.NotNil(t, buff) var wg sync.WaitGroup @@ -82,7 +81,7 @@ func TestNack(t *testing.T) { }) t.Run("nack with seq wrap", func(t *testing.T) { - buff := NewBuffer(123, pool, pool, logger) + buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo assert.NotNil(t, buff) var wg sync.WaitGroup @@ -187,8 +186,7 @@ func TestNewBuffer(t *testing.T) { return &b }, } - logger := Logger - buff := NewBuffer(123, pool, pool, logger) + buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo assert.NotNil(t, buff) assert.NotNil(t, TestPackets) @@ -217,7 +215,7 @@ func TestFractionLostReport(t *testing.T) { return &b }, } - buff := NewBuffer(123, pool, pool, Logger) + buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo assert.NotNil(t, buff) var wg sync.WaitGroup diff --git a/pkg/sfu/buffer/factory.go b/pkg/sfu/buffer/factory.go index 627c76072..4a7a1957c 100644 --- a/pkg/sfu/buffer/factory.go +++ b/pkg/sfu/buffer/factory.go @@ -4,7 +4,6 @@ import ( "io" "sync" - "github.com/go-logr/logr" "github.com/pion/transport/packetio" ) @@ -14,19 +13,9 @@ type Factory struct { audioPool *sync.Pool rtpBuffers map[uint32]*Buffer rtcpReaders map[uint32]*RTCPReader - logger logr.Logger } -func NewBufferFactory(trackingPackets int, logger logr.Logger) *Factory { - // Enable package wide logging for non-method functions. - // If logger is empty - use default Logger. - // Logger is a public variable in buffer package. - if logger == (logr.Logger{}) { - logger = Logger - } else { - Logger = logger - } - +func NewBufferFactory(trackingPackets int) *Factory { return &Factory{ videoPool: &sync.Pool{ New: func() interface{} { @@ -42,7 +31,6 @@ func NewBufferFactory(trackingPackets int, logger logr.Logger) *Factory { }, rtpBuffers: make(map[uint32]*Buffer), rtcpReaders: make(map[uint32]*RTCPReader), - logger: logger, } } @@ -66,7 +54,7 @@ func (f *Factory) GetOrNew(packetType packetio.BufferPacketType, ssrc uint32) io if reader, ok := f.rtpBuffers[ssrc]; ok { return reader } - buffer := NewBuffer(ssrc, f.videoPool, f.audioPool, f.logger) + buffer := NewBuffer(ssrc, f.videoPool, f.audioPool) f.rtpBuffers[ssrc] = buffer buffer.OnClose(func() { f.Lock() diff --git a/pkg/sfu/buffer/helpers.go b/pkg/sfu/buffer/helpers.go index 5cdd1cc13..1a5761c93 100644 --- a/pkg/sfu/buffer/helpers.go +++ b/pkg/sfu/buffer/helpers.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "errors" "sync/atomic" + + "github.com/livekit/protocol/logger" ) var ( @@ -267,7 +269,7 @@ func IsH264Keyframe(payload []byte) bool { return true } else if n >= 24 { // is this legal? - Logger.V(0).Info("Non-simple NALU within a STAP") + logger.Infow("Non-simple NALU within a STAP") } i += int(length) } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2e7365d81..8b823751b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -213,7 +213,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, }) } if strings.HasPrefix(d.codec.MimeType, "video/") { - d.sequencer = newSequencer(d.maxTrack) + d.sequencer = newSequencer(d.maxTrack, d.logger) } if d.onBind != nil { d.onBind() diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0a0fbcbe5..140be8e5c 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -10,6 +10,7 @@ import ( "time" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/rs/zerolog/log" @@ -43,6 +44,8 @@ type TrackReceiver interface { // WebRTCReceiver receives a media track type WebRTCReceiver struct { + logger logger.Logger + peerID livekit.ParticipantID trackID livekit.TrackID streamID string @@ -120,8 +123,15 @@ func WithLoadBalanceThreshold(downTracks int) ReceiverOpts { } // NewWebRTCReceiver creates a new webrtc track receiver -func NewWebRTCReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid livekit.ParticipantID, opts ...ReceiverOpts) *WebRTCReceiver { +func NewWebRTCReceiver( + receiver *webrtc.RTPReceiver, + track *webrtc.TrackRemote, + pid livekit.ParticipantID, + logger logger.Logger, + opts ...ReceiverOpts, +) *WebRTCReceiver { w := &WebRTCReceiver{ + logger: logger, peerID: pid, receiver: receiver, trackID: livekit.TrackID(track.ID()), @@ -186,6 +196,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff return } + buff.SetLogger(w.logger) + layer := RidToLayer(track.RID()) w.upTrackMu.Lock() diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 4f26923ad..6399102ab 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -5,6 +5,7 @@ import ( "time" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/protocol/logger" ) const ( @@ -91,13 +92,15 @@ type sequencer struct { step int headSN uint16 startTime int64 + logger logger.Logger } -func newSequencer(maxTrack int) *sequencer { +func newSequencer(maxTrack int, logger logger.Logger) *sequencer { return &sequencer{ startTime: time.Now().UnixNano() / 1e6, max: maxTrack, seq: make([]packetMeta, maxTrack), + logger: logger, } } @@ -124,7 +127,7 @@ func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head b step = n.step - int(n.headSN-offSn) if step < 0 { if step*-1 >= n.max { - Logger.V(0).Info("Old packet received, can not be sequenced", "head", sn, "received", offSn) + n.logger.Infow("old packet received, can not be sequenced", "head", sn, "received", offSn) return nil } step = n.max + step diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index 01a5b6213..537c247ac 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -8,10 +8,11 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/protocol/logger" ) func Test_sequencer(t *testing.T) { - seq := newSequencer(500) + seq := newSequencer(500, logger.Logger(logger.GetLogger())) off := uint16(15) for i := uint16(1); i < 520; i++ { @@ -74,7 +75,7 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - n := newSequencer(500) + n := newSequencer(500, logger.Logger(logger.GetLogger())) for _, i := range tt.fields.input { n.push(i, i+tt.fields.offset, 123, 3, true) diff --git a/pkg/sfu/sfu.go b/pkg/sfu/sfu.go index ca5b3585f..cd15dc01f 100644 --- a/pkg/sfu/sfu.go +++ b/pkg/sfu/sfu.go @@ -2,13 +2,8 @@ package sfu import ( "sync" - - "github.com/go-logr/logr" ) -// Logger is an implementation of logr.Logger. If is not provided - will be turned off. -var Logger = logr.Discard() - var ( PacketFactory *sync.Pool )