LK logger with context in SFU (#391)

* LK logger with context in SFU

* Move buff.SetLogger into sfu.WebRTCReceiver
This commit is contained in:
Raja Subramanian
2022-02-01 08:57:09 +05:30
committed by GitHub
parent 3d132730f9
commit 9db2bd22df
12 changed files with 46 additions and 50 deletions

View File

@@ -3,14 +3,12 @@ package serverlogger
import (
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/livekit/protocol/logger"
"github.com/pion/logging"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/logger"
)
var (
@@ -42,8 +40,6 @@ func (f *LoggerFactory) NewLogger(scope string) logging.LeveledLogger {
// Note: only pass in logr.Logger with default depth
func SetLogger(l logr.Logger) {
logger.SetLogger(l, "livekit")
sfu.Logger = l.WithName("sfu")
buffer.Logger = sfu.Logger
}
func InitFromConfig(config config.LoggingConfig) {

View File

@@ -214,6 +214,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
receiver,
track,
t.PublisherID(),
t.params.Logger,
sfu.WithPliThrottle(0),
sfu.WithLoadBalanceThreshold(20),
sfu.WithStreamTrackers(),

View File

@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/go-logr/logr"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
@@ -67,7 +66,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
telemetry: telemetry,
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize, logr.Logger{}),
bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize),
closed: make(chan struct{}),
}
if r.Room.EmptyTimeout == 0 {

View File

@@ -9,7 +9,7 @@ import (
"time"
"github.com/gammazero/deque"
"github.com/go-logr/logr"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
@@ -20,9 +20,6 @@ const (
ReportDelta = 1e9
)
// Logger is an implementation of logr.Logger. If is not provided - will be turned off.
var Logger = logr.Discard()
type pendingPackets struct {
arrivalTime int64
packet []byte
@@ -92,7 +89,7 @@ type Buffer struct {
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
// logger
logger logr.Logger
logger logger.Logger
}
type Stats struct {
@@ -110,18 +107,22 @@ type Options struct {
}
// NewBuffer constructs a new Buffer
func NewBuffer(ssrc uint32, vp, ap *sync.Pool, logger logr.Logger) *Buffer {
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
b := &Buffer{
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
logger: logger,
logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger
}
b.bitrate.Store(make([]int64, len(b.bitrateHelper)))
b.extPackets.SetMinCapacity(7)
return b
}
func (b *Buffer) SetLogger(logger logger.Logger) {
b.logger = logger
}
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) {
b.Lock()
defer b.Unlock()
@@ -152,13 +153,13 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
for _, fb := range codec.RTCPFeedback {
switch fb.Type {
case webrtc.TypeRTCPFBGoogREMB:
b.logger.V(1).Info("Setting feedback", "type", "webrtc.TypeRTCPFBGoogREMB")
b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBGoogREMB)
b.remb = true
case webrtc.TypeRTCPFBTransportCC:
b.logger.V(1).Info("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC)
b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBTransportCC)
b.twcc = true
case webrtc.TypeRTCPFBNACK:
b.logger.V(1).Info("Setting feedback", "type", webrtc.TypeRTCPFBNACK)
b.logger.Infow("Setting feedback", "type", webrtc.TypeRTCPFBNACK)
b.nacker = NewNACKQueue()
b.nack = true
}
@@ -178,7 +179,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
b.pPackets = nil
b.bound = true
b.logger.V(1).Info("NewBuffer", "MaxBitRate", o.MaxBitRate)
b.logger.Infow("NewBuffer", "MaxBitRate", o.MaxBitRate)
}
// Write adds an RTP Packet, out of order, new packet may be arrived later

View File

@@ -37,10 +37,9 @@ func TestNack(t *testing.T) {
return &b
},
}
logger := Logger
t.Run("nack normal", func(t *testing.T) {
buff := NewBuffer(123, pool, pool, logger)
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
assert.NotNil(t, buff)
var wg sync.WaitGroup
@@ -82,7 +81,7 @@ func TestNack(t *testing.T) {
})
t.Run("nack with seq wrap", func(t *testing.T) {
buff := NewBuffer(123, pool, pool, logger)
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
assert.NotNil(t, buff)
var wg sync.WaitGroup
@@ -187,8 +186,7 @@ func TestNewBuffer(t *testing.T) {
return &b
},
}
logger := Logger
buff := NewBuffer(123, pool, pool, logger)
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
assert.NotNil(t, buff)
assert.NotNil(t, TestPackets)
@@ -217,7 +215,7 @@ func TestFractionLostReport(t *testing.T) {
return &b
},
}
buff := NewBuffer(123, pool, pool, Logger)
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
assert.NotNil(t, buff)
var wg sync.WaitGroup

View File

@@ -4,7 +4,6 @@ import (
"io"
"sync"
"github.com/go-logr/logr"
"github.com/pion/transport/packetio"
)
@@ -14,19 +13,9 @@ type Factory struct {
audioPool *sync.Pool
rtpBuffers map[uint32]*Buffer
rtcpReaders map[uint32]*RTCPReader
logger logr.Logger
}
func NewBufferFactory(trackingPackets int, logger logr.Logger) *Factory {
// Enable package wide logging for non-method functions.
// If logger is empty - use default Logger.
// Logger is a public variable in buffer package.
if logger == (logr.Logger{}) {
logger = Logger
} else {
Logger = logger
}
func NewBufferFactory(trackingPackets int) *Factory {
return &Factory{
videoPool: &sync.Pool{
New: func() interface{} {
@@ -42,7 +31,6 @@ func NewBufferFactory(trackingPackets int, logger logr.Logger) *Factory {
},
rtpBuffers: make(map[uint32]*Buffer),
rtcpReaders: make(map[uint32]*RTCPReader),
logger: logger,
}
}
@@ -66,7 +54,7 @@ func (f *Factory) GetOrNew(packetType packetio.BufferPacketType, ssrc uint32) io
if reader, ok := f.rtpBuffers[ssrc]; ok {
return reader
}
buffer := NewBuffer(ssrc, f.videoPool, f.audioPool, f.logger)
buffer := NewBuffer(ssrc, f.videoPool, f.audioPool)
f.rtpBuffers[ssrc] = buffer
buffer.OnClose(func() {
f.Lock()

View File

@@ -4,6 +4,8 @@ import (
"encoding/binary"
"errors"
"sync/atomic"
"github.com/livekit/protocol/logger"
)
var (
@@ -267,7 +269,7 @@ func IsH264Keyframe(payload []byte) bool {
return true
} else if n >= 24 {
// is this legal?
Logger.V(0).Info("Non-simple NALU within a STAP")
logger.Infow("Non-simple NALU within a STAP")
}
i += int(length)
}

View File

@@ -213,7 +213,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
})
}
if strings.HasPrefix(d.codec.MimeType, "video/") {
d.sequencer = newSequencer(d.maxTrack)
d.sequencer = newSequencer(d.maxTrack, d.logger)
}
if d.onBind != nil {
d.onBind()

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/rs/zerolog/log"
@@ -43,6 +44,8 @@ type TrackReceiver interface {
// WebRTCReceiver receives a media track
type WebRTCReceiver struct {
logger logger.Logger
peerID livekit.ParticipantID
trackID livekit.TrackID
streamID string
@@ -120,8 +123,15 @@ func WithLoadBalanceThreshold(downTracks int) ReceiverOpts {
}
// NewWebRTCReceiver creates a new webrtc track receiver
func NewWebRTCReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid livekit.ParticipantID, opts ...ReceiverOpts) *WebRTCReceiver {
func NewWebRTCReceiver(
receiver *webrtc.RTPReceiver,
track *webrtc.TrackRemote,
pid livekit.ParticipantID,
logger logger.Logger,
opts ...ReceiverOpts,
) *WebRTCReceiver {
w := &WebRTCReceiver{
logger: logger,
peerID: pid,
receiver: receiver,
trackID: livekit.TrackID(track.ID()),
@@ -186,6 +196,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
return
}
buff.SetLogger(w.logger)
layer := RidToLayer(track.RID())
w.upTrackMu.Lock()

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/logger"
)
const (
@@ -91,13 +92,15 @@ type sequencer struct {
step int
headSN uint16
startTime int64
logger logger.Logger
}
func newSequencer(maxTrack int) *sequencer {
func newSequencer(maxTrack int, logger logger.Logger) *sequencer {
return &sequencer{
startTime: time.Now().UnixNano() / 1e6,
max: maxTrack,
seq: make([]packetMeta, maxTrack),
logger: logger,
}
}
@@ -124,7 +127,7 @@ func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head b
step = n.step - int(n.headSN-offSn)
if step < 0 {
if step*-1 >= n.max {
Logger.V(0).Info("Old packet received, can not be sequenced", "head", sn, "received", offSn)
n.logger.Infow("old packet received, can not be sequenced", "head", sn, "received", offSn)
return nil
}
step = n.max + step

View File

@@ -8,10 +8,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/logger"
)
func Test_sequencer(t *testing.T) {
seq := newSequencer(500)
seq := newSequencer(500, logger.Logger(logger.GetLogger()))
off := uint16(15)
for i := uint16(1); i < 520; i++ {
@@ -74,7 +75,7 @@ func Test_sequencer_getNACKSeqNo(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
n := newSequencer(500)
n := newSequencer(500, logger.Logger(logger.GetLogger()))
for _, i := range tt.fields.input {
n.push(i, i+tt.fields.offset, 123, 3, true)

View File

@@ -2,13 +2,8 @@ package sfu
import (
"sync"
"github.com/go-logr/logr"
)
// Logger is an implementation of logr.Logger. If is not provided - will be turned off.
var Logger = logr.Discard()
var (
PacketFactory *sync.Pool
)