Cleanup pass through logging (#1073)

* added filtering for noisy pion logs
* demoted some logs to debug
* using consistent trackID / participant / publisher / subscriber terminology
* removed ice candidate log lines, deferring to combined log
This commit is contained in:
David Zhao
2022-10-06 23:48:37 -07:00
committed by GitHub
parent 792349cc56
commit 1019faa0e6
16 changed files with 114 additions and 55 deletions
+2 -2
View File
@@ -25,7 +25,7 @@ require (
github.com/maxbrunsfeld/counterfeiter/v6 v6.5.0
github.com/mitchellh/go-homedir v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/ice/v2 v2.2.10
github.com/pion/ice/v2 v2.2.11-0.20221007053825-a705a5f29459
github.com/pion/interceptor v0.1.12
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.10
@@ -88,7 +88,7 @@ require (
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220927171203-f486391704dc // indirect
golang.org/x/net v0.0.0-20221002022538-bcab6841153b // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
+4 -2
View File
@@ -306,8 +306,9 @@ github.com/pion/dtls/v2 v2.1.3/go.mod h1:o6+WvyLDAlXF7YiPB/RlskRoeK+/JtuaZa5emwQ
github.com/pion/dtls/v2 v2.1.5 h1:jlh2vtIyUBShchoTDqpCCqiYCyRFJ/lvf/gQ8TALs+c=
github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY=
github.com/pion/ice/v2 v2.2.6/go.mod h1:SWuHiOGP17lGromHTFadUe1EuPgFh/oCU6FCMZHooVE=
github.com/pion/ice/v2 v2.2.10 h1:i8rn0iIN8wHlS9wosoHS3Du4hYwx+TjBEluisXYtQVE=
github.com/pion/ice/v2 v2.2.10/go.mod h1:J6HhupoMLTOv0yALipuOHEPoSMk7dm1ofwUI1KHVHk8=
github.com/pion/ice/v2 v2.2.11-0.20221007053825-a705a5f29459 h1:kcefLjFMe1jf1vmchdE4vZFskToUqRuZZQ11b9+dRDA=
github.com/pion/ice/v2 v2.2.11-0.20221007053825-a705a5f29459/go.mod h1:NqUDUao6SjSs1+4jrqpexDmFlptlVhGxQjcymXLaVvE=
github.com/pion/interceptor v0.1.11/go.mod h1:tbtKjZY14awXd7Bq0mmWvgtHB5MDaRN7HV3OZ/uy7s8=
github.com/pion/interceptor v0.1.12 h1:CslaNriCFUItiXS5o+hh5lpL0t0ytQkFnUcbbCs2Zq8=
github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA=
@@ -537,8 +538,9 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220927171203-f486391704dc h1:FxpXZdoBqT8RjqTy6i1E8nXHhW21wK7ptQ/EPIGxzPQ=
golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20221002022538-bcab6841153b h1:6e93nYa3hNqAvLr0pD4PN1fFS+gKzp2zAXqrnTCstqU=
golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+33 -4
View File
@@ -2,6 +2,7 @@ package serverlogger
import (
"fmt"
"strings"
"github.com/go-logr/logr"
"go.uber.org/zap/zapcore"
@@ -9,8 +10,9 @@ import (
// implements webrtc.LeveledLogger
type logAdapter struct {
logger logr.Logger
level zapcore.Level
logger logr.Logger
level zapcore.Level
ignoredPrefixes []string
}
func (l *logAdapter) Trace(msg string) {
@@ -39,6 +41,9 @@ func (l *logAdapter) Info(msg string) {
if l.level > zapcore.InfoLevel {
return
}
if l.shouldIgnore(msg) {
return
}
l.logger.Info(msg)
}
@@ -46,6 +51,9 @@ func (l *logAdapter) Infof(format string, args ...interface{}) {
if l.level > zapcore.InfoLevel {
return
}
if l.shouldIgnore(format) {
return
}
l.logger.Info(fmt.Sprintf(format, args...))
}
@@ -53,20 +61,29 @@ func (l *logAdapter) Warn(msg string) {
if l.level > zapcore.WarnLevel {
return
}
l.logger.Info(msg)
if l.shouldIgnore(msg) {
return
}
l.logger.V(-1).Info(msg)
}
func (l *logAdapter) Warnf(format string, args ...interface{}) {
if l.level > zapcore.WarnLevel {
return
}
l.logger.Info(fmt.Sprintf(format, args...))
if l.shouldIgnore(format) {
return
}
l.logger.V(-1).Info(fmt.Sprintf(format, args...))
}
func (l *logAdapter) Error(msg string) {
if l.level > zapcore.ErrorLevel {
return
}
if l.shouldIgnore(msg) {
return
}
l.logger.Error(nil, msg)
}
@@ -74,5 +91,17 @@ func (l *logAdapter) Errorf(format string, args ...interface{}) {
if l.level > zapcore.ErrorLevel {
return
}
if l.shouldIgnore(format) {
return
}
l.logger.Error(nil, fmt.Sprintf(format, args...))
}
func (l *logAdapter) shouldIgnore(msg string) bool {
for _, prefix := range l.ignoredPrefixes {
if strings.HasPrefix(msg, prefix) {
return true
}
}
return false
}
+20 -3
View File
@@ -11,7 +11,23 @@ import (
)
var (
pionLevel zapcore.Level
pionLevel zapcore.Level
pionIgnoredPrefixes = map[string][]string{
"ice": {
"pingAllCandidates called with no candidate pairs",
"failed to send packet",
"Ignoring remote candidate with tcpType active",
},
"pc": {
"Failed to accept RTCP stream is already closed",
"Failed to accept RTP stream is already closed",
"Incoming unhandled RTCP ssrc",
},
"tcp_mux": {
"Error reading first packet from",
"error closing connection",
},
}
)
// implements webrtc.LoggerFactory
@@ -30,8 +46,9 @@ func NewLoggerFactory(logger logr.Logger) *LoggerFactory {
func (f *LoggerFactory) NewLogger(scope string) logging.LeveledLogger {
return &logAdapter{
logger: f.logger.WithName(scope),
level: pionLevel,
logger: f.logger.WithName(scope),
level: pionLevel,
ignoredPrefixes: pionIgnoredPrefixes[scope],
}
}
+22 -15
View File
@@ -530,7 +530,6 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess
bytes, err := parsed.Marshal()
if err != nil {
p.params.Logger.Infow("failed to marshal offer", "error", err)
p.params.Logger.Errorw("failed to marshal offer", err)
return offer
}
@@ -646,7 +645,7 @@ func (p *ParticipantImpl) Start() {
}
func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason) error {
p.params.Logger.Infow("try close participant", "sendLeave", sendLeave, "reason", reason.String())
p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String())
if p.isClosed.Swap(true) {
// already closed
return nil
@@ -755,6 +754,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
if p.isClosed.Load() || p.State() == livekit.ParticipantInfo_DISCONNECTED {
return
}
// TODO: change to debug once we are confident
p.params.Logger.Infow("closing subscriber peer connection to aid migration")
//
@@ -968,16 +968,20 @@ func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID,
// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
p.lock.Lock()
if v, ok := p.trackPublisherVersion[subTrack.ID()]; ok && v > subTrack.PublisherVersion() {
p.lock.Unlock()
p.params.Logger.Infow("ignoring add subscribedTrack from older version",
"current", v,
"requesting", subTrack.PublisherVersion(),
"trackID", subTrack.ID(),
)
return
}
p.params.Logger.Infow("added subscribedTrack",
"publisherID", subTrack.PublisherID(),
"publisherIdentity", subTrack.PublisherIdentity(),
"trackID", subTrack.ID())
p.lock.Lock()
if v, ok := p.trackPublisherVersion[subTrack.ID()]; ok && v > subTrack.PublisherVersion() {
p.lock.Unlock()
p.params.Logger.Infow("ignoring add subscribedTrack from older version", "current", v, "requesting", subTrack.PublisherVersion())
return
}
p.trackPublisherVersion[subTrack.ID()] = subTrack.PublisherVersion()
onSubscribedTo := p.onSubscribedTo
@@ -1011,17 +1015,20 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
p.lock.Lock()
if v, ok := p.trackPublisherVersion[subTrack.ID()]; ok && v > subTrack.PublisherVersion() {
p.lock.Unlock()
p.params.Logger.Infow("ignoring remove subscribedTrack from older version",
"current", v,
"requesting", subTrack.PublisherVersion(),
"trackID", subTrack.ID(),
)
return
}
p.params.Logger.Infow("removed subscribedTrack",
"publisherID", subTrack.PublisherID(),
"publisherIdentity", subTrack.PublisherIdentity(),
"trackID", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
p.lock.Lock()
if v, ok := p.trackPublisherVersion[subTrack.ID()]; ok && v > subTrack.PublisherVersion() {
p.lock.Unlock()
p.params.Logger.Infow("ignoring remove subscribedTrack from older version", "current", v, "requesting", subTrack.PublisherVersion())
return
}
p.trackPublisherVersion[subTrack.ID()] = subTrack.PublisherVersion()
delete(p.subscribedTracks, subTrack.ID())
-1
View File
@@ -139,7 +139,6 @@ func (p *ParticipantImpl) SendRefreshToken(token string) error {
}
func (p *ParticipantImpl) sendICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {
p.params.Logger.Infow("sending ice candidate", "candidate", c.String(), "target", target)
trickle := ToProtoTrickle(c.ToJSON())
trickle.Target = target
return p.writeMessage(&livekit.SignalResponse{
+15 -8
View File
@@ -718,13 +718,17 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
}
r.Logger.Debugw("subscribing to new track",
"participants", []livekit.ParticipantIdentity{participant.Identity(), existingParticipant.Identity()},
"pIDs", []livekit.ParticipantID{participant.ID(), existingParticipant.ID()},
"participant", existingParticipant.Identity(),
"pID", existingParticipant.ID(),
"publisher", participant.Identity(),
"publisherID", participant.ID(),
"trackID", track.ID())
if _, err := participant.AddSubscriber(existingParticipant, types.AddSubscriberParams{TrackIDs: []livekit.TrackID{track.ID()}}); err != nil {
r.Logger.Errorw("could not subscribe to remoteTrack", err,
"participants", []livekit.ParticipantIdentity{participant.Identity(), existingParticipant.Identity()},
"pIDs", []livekit.ParticipantID{participant.ID(), existingParticipant.ID()},
"participant", existingParticipant.Identity(),
"pID", existingParticipant.ID(),
"publisher", participant.Identity(),
"publisherID", participant.ID(),
"trackID", track.ID())
}
}
@@ -814,14 +818,17 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) int {
n, err := op.AddSubscriber(p, types.AddSubscriberParams{AllTracks: true})
if err != nil {
// TODO: log error? or disconnect?
r.Logger.Errorw("could not subscribe to participant", err,
"participants", []livekit.ParticipantIdentity{op.Identity(), p.Identity()},
"pIDs", []livekit.ParticipantID{op.ID(), p.ID()})
r.Logger.Errorw("could not subscribe to publisher", err,
"participant", p.Identity(),
"pID", p.ID(),
"publisher", op.Identity(),
"publisherID", op.ID(),
)
}
tracksAdded += n
}
if tracksAdded > 0 {
r.Logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded)
r.Logger.Debugw("subscribed participants to existing tracks", "trackID", tracksAdded)
}
return tracksAdded
}
+2 -2
View File
@@ -43,11 +43,11 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
}
if err != nil {
pLogger.Warnw("could not update subscription", err,
"tracks", msg.Subscription.TrackSids,
"trackID", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
} else {
pLogger.Infow("updated subscription",
"tracks", msg.Subscription.TrackSids,
"trackID", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
}
case *livekit.SignalRequest_TrackSetting:
+10 -10
View File
@@ -496,6 +496,7 @@ func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionSta
t.maybeNotifyFullyEstablished()
}
case webrtc.PeerConnectionStateFailed:
t.logICECandidates()
t.handleConnectionFailed()
}
}
@@ -1297,7 +1298,7 @@ func (t *PCTransport) processEvents() {
}
t.clearSignalStateCheckTimer()
t.params.Logger.Infow("leaving events processor")
t.params.Logger.Debugw("leaving events processor")
}
func (t *PCTransport) handleEvent(e *event) error {
@@ -1396,7 +1397,7 @@ func (t *PCTransport) handleLocalICECandidate(e *event) error {
filtered := false
if t.preferTCP.Load() && c != nil && c.Protocol != webrtc.ICEProtocolTCP {
cstr := c.String()
t.params.Logger.Infow("filtering out local candidate", "candidate", cstr)
t.params.Logger.Debugw("filtering out local candidate", "candidate", cstr)
t.filteredLocalCandidates = append(t.filteredLocalCandidates, cstr)
filtered = true
}
@@ -1425,7 +1426,7 @@ func (t *PCTransport) handleRemoteICECandidate(e *event) error {
filtered := false
if t.preferTCP.Load() && !strings.Contains(c.Candidate, "tcp") {
t.params.Logger.Infow("filtering out remote candidate", "candidate", c.Candidate)
t.params.Logger.Debugw("filtering out remote candidate", "candidate", c.Candidate)
t.filteredRemoteCandidates = append(t.filteredRemoteCandidates, c.Candidate)
filtered = true
}
@@ -1443,7 +1444,6 @@ func (t *PCTransport) handleRemoteICECandidate(e *event) error {
return nil
}
t.params.Logger.Infow("add candidate ", "candidate", c.Candidate)
if err := t.pc.AddICECandidate(*c); err != nil {
return errors.Wrap(err, "add ice candidate failed")
}
@@ -1584,7 +1584,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
preferTCP := t.preferTCP.Load()
if preferTCP {
t.params.Logger.Infow("local offer (unfiltered)", "sdp", offer.SDP)
t.params.Logger.Debugw("local offer (unfiltered)", "sdp", offer.SDP)
}
err = t.pc.SetLocalDescription(offer)
@@ -1601,7 +1601,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
//
offer = t.filterCandidates(offer, preferTCP)
if preferTCP {
t.params.Logger.Infow("local offer (filtered)", "sdp", offer.SDP)
t.params.Logger.Debugw("local offer (filtered)", "sdp", offer.SDP)
}
// indicate waiting for remote
@@ -1655,11 +1655,11 @@ func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error {
// filter before setting remote description so that pion does not see filtered remote candidates
preferTCP := t.preferTCP.Load()
if preferTCP {
t.params.Logger.Infow("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP)
t.params.Logger.Debugw("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP)
}
sd = t.filterCandidates(sd, preferTCP)
if preferTCP {
t.params.Logger.Infow("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP)
t.params.Logger.Debugw("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP)
}
if err := t.pc.SetRemoteDescription(sd); err != nil {
@@ -1703,7 +1703,7 @@ func (t *PCTransport) createAndSendAnswer() error {
preferTCP := t.preferTCP.Load()
if preferTCP {
t.params.Logger.Infow("local answer (unfiltered)", "sdp", answer.SDP)
t.params.Logger.Debugw("local answer (unfiltered)", "sdp", answer.SDP)
}
if err = t.pc.SetLocalDescription(answer); err != nil {
@@ -1719,7 +1719,7 @@ func (t *PCTransport) createAndSendAnswer() error {
//
answer = t.filterCandidates(answer, preferTCP)
if preferTCP {
t.params.Logger.Infow("local answer (filtered)", "sdp", answer.SDP)
t.params.Logger.Debugw("local answer (filtered)", "sdp", answer.SDP)
}
if onAnswer := t.getOnAnswer(); onAnswer != nil {
+1 -1
View File
@@ -169,7 +169,7 @@ func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted b
track.SetMuted(muted)
if currentMuted != track.IsMuted() {
u.params.Logger.Infow("mute status changed", "trackID", trackID, "muted", track.IsMuted())
u.params.Logger.Infow("publisher mute status changed", "trackID", trackID, "muted", track.IsMuted())
if u.onTrackUpdated != nil {
u.onTrackUpdated(track, false)
}
+1 -1
View File
@@ -108,7 +108,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
nodeID = livekit.NodeID(node.Id)
}
logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeID)
logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID)
err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID)
if err != nil {
return nil, err
+1 -1
View File
@@ -577,7 +577,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo
"subscribe", rm.UpdateSubscriptions.Subscribe)
}
case *livekit.RTCNodeMessage_SendData:
pLogger.Debugw("SendData", "size", len(rm.SendData.Data))
pLogger.Debugw("api send data", "size", len(rm.SendData.Data))
up := &livekit.UserPacket{
Payload: rm.SendData.Data,
DestinationSids: rm.SendData.DestinationSids,
-2
View File
@@ -255,8 +255,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
case msg := <-resSource.ReadChan():
if msg == nil {
pLogger.Infow("source closed connection",
"connID", connId)
return
}
res, ok := msg.(*livekit.SignalResponse)
+1 -1
View File
@@ -289,7 +289,7 @@ func (f *Forwarder) Mute(muted bool) (bool, VideoLayers) {
return false, f.maxLayers
}
f.logger.Infow("setting mute", "muted", muted)
f.logger.Infow("setting forwarder mute", "muted", muted)
f.muted = muted
// resync when muted so that sequence numbers do not jump on unmute
+1 -1
View File
@@ -352,7 +352,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
}
if w.downTrackSpreader.HasDownTrack(track.SubscriberID()) {
w.logger.Infow("subscriberID already exists, replace the downtrack", "subscriberID", track.SubscriberID())
w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
if w.Kind() == webrtc.RTPCodecTypeVideo {
+1 -1
View File
@@ -69,7 +69,7 @@ func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error {
}
if r.downTrackSpreader.HasDownTrack(track.SubscriberID()) {
r.logger.Infow("subscriberID already exists, replace the downtrack", "subscriberID", track.SubscriberID())
r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
r.downTrackSpreader.Store(track)