error logging

This commit is contained in:
David Colburn
2021-06-03 01:57:43 -05:00
parent 5d85257577
commit 69b94e1142
13 changed files with 87 additions and 78 deletions

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="GoCommentStart" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
</profile>
</component>

View File

@@ -49,17 +49,23 @@ func Infow(msg string, keysAndValues ...interface{}) {
logger.Infow(msg, keysAndValues...)
}
func Warnw(msg string, keysAndValues ...interface{}) {
func Warnw(msg string, err error, keysAndValues ...interface{}) {
if logger == nil {
return
}
if err != nil {
keysAndValues = append([]interface{}{"error", err}, keysAndValues...)
}
logger.Warnw(msg, keysAndValues...)
}
func Errorw(msg string, keysAndValues ...interface{}) {
func Errorw(msg string, err error, keysAndValues ...interface{}) {
if logger == nil {
return
}
if err != nil {
keysAndValues = append([]interface{}{"error", err}, keysAndValues...)
}
logger.Errorw(msg, keysAndValues...)
}

View File

@@ -157,7 +157,7 @@ func (r *LocalRouter) rtcMessageWorker() {
if rtcMsg, ok := msg.(*livekit.RTCNodeMessage); ok {
room, identity, err := parseParticipantKey(rtcMsg.ParticipantKey)
if err != nil {
logger.Errorw("could not process RTC message", "error", err)
logger.Errorw("could not process RTC message", err)
continue
}
if r.onRTCMessage != nil {

View File

@@ -8,9 +8,10 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/logger"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/utils"
)
const (
@@ -187,9 +188,10 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
}
if rtcNode.Id != r.currentNode.Id {
logger.Errorw("called participant on incorrect node",
err = ErrIncorrectRTCNode
logger.Errorw("called participant on incorrect node", err,
"rtcNode", rtcNode, "currentNode", r.currentNode.Id)
return ErrIncorrectRTCNode
return err
}
if err := r.setParticipantRTCNode(participantKey, rtcNode.Id); err != nil {
@@ -298,7 +300,7 @@ func (r *RedisRouter) statsWorker() {
case <-time.After(statsUpdateInterval):
r.currentNode.Stats.UpdatedAt = time.Now().Unix()
if err := r.RegisterNode(); err != nil {
logger.Errorw("could not update node", "error", err)
logger.Errorw("could not update node", err)
}
case <-r.ctx.Done():
return
@@ -324,21 +326,21 @@ func (r *RedisRouter) redisWorker() {
if msg.Channel == sigChannel {
sm := livekit.SignalNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil {
logger.Errorw("could not unmarshal signal message on sigchan", "error", err)
logger.Errorw("could not unmarshal signal message on sigchan", err)
continue
}
if err := r.handleSignalMessage(&sm); err != nil {
logger.Errorw("error processing signal message", "error", err)
logger.Errorw("error processing signal message", err)
continue
}
} else if msg.Channel == rtcChannel {
rm := livekit.RTCNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil {
logger.Errorw("could not unmarshal RTC message on rtcchan", "error", err)
logger.Errorw("could not unmarshal RTC message on rtcchan", err)
continue
}
if err := r.handleRTCMessage(&rm); err != nil {
logger.Errorw("error processing RTC message", "error", err)
logger.Errorw("error processing RTC message", err)
continue
}
}

View File

@@ -157,7 +157,7 @@ func (t *DataTrack) forwardWorker() {
err := sub.SendMessage(msg)
if err != nil {
logger.Errorw("could not send data message",
"err", err,
err,
"source", t.participantId,
"dest", sub.participantId)
}

View File

@@ -12,11 +12,12 @@ import (
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/utils"
)
var (
@@ -199,9 +200,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
logger.Warnw("could not remove remoteTrack from forwarder",
"sub", sub.Identity(),
"err", err)
logger.Warnw("could not remove remoteTrack from forwarder", err,
"sub", sub.Identity())
}
}
@@ -252,7 +252,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
rtcpReader.OnPacket(func(bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
logger.Errorw("could not unmarshal RTCP", "error", err)
logger.Errorw("could not unmarshal RTCP", err)
return
}

View File

@@ -14,13 +14,14 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/utils"
)
const (
@@ -524,7 +525,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) {
defer p.lock.RUnlock()
track := p.publishedTracks[trackId]
if track == nil {
logger.Warnw("could not locate track", "track", trackId)
logger.Warnw("could not locate track", nil, "track", trackId)
return
}
currentMuted := track.IsMuted()
@@ -646,8 +647,7 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
sink := p.params.Sink
err := sink.WriteMessage(msg)
if err != nil {
logger.Warnw("could not send message to participant",
"error", err,
logger.Warnw("could not send message to participant", err,
"id", p.ID(),
"participant", p.Identity(),
"message", fmt.Sprintf("%T", msg.Message))
@@ -684,7 +684,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"rid", track.RID())
if !p.CanPublish() {
logger.Warnw("no permission to publish mediaTrack",
logger.Warnw("no permission to publish mediaTrack", nil,
"participant", p.Identity())
return
}
@@ -749,7 +749,7 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
logger.Debugw("dataChannel added", "participant", p.Identity(), "label", dc.Label())
if !p.CanPublish() {
logger.Warnw("no permission to publish dataTrack",
logger.Warnw("no permission to publish dataTrack", nil,
"participant", p.Identity())
return
}
@@ -786,7 +786,7 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp
// if still not found, we are done
if ti == nil {
logger.Errorw("track info not published prior to track", "clientId", clientId)
logger.Errorw("track info not published prior to track", nil, "clientId", clientId)
} else if deleteAfter {
delete(p.pendingTracks, clientId)
}
@@ -796,7 +796,7 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp
func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data []byte) {
dp := livekit.DataPacket{}
if err := proto.Unmarshal(data, &dp); err != nil {
logger.Warnw("could not parse data packet", "error", err)
logger.Warnw("could not parse data packet", err)
return
}
@@ -811,7 +811,7 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [
p.onDataPacket(p, &dp)
}
default:
logger.Warnw("received unsupported data packet", "payload", payload)
logger.Warnw("received unsupported data packet", nil, "payload", payload)
}
}
@@ -894,9 +894,8 @@ func (p *ParticipantImpl) downTracksRTCPWorker() {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
logger.Errorw("could not send downtrack reports",
"participant", p.Identity(),
"err", err)
logger.Errorw("could not send downtrack reports", err,
"participant", p.Identity())
}
pkts = pkts[:0]
}
@@ -914,9 +913,8 @@ func (p *ParticipantImpl) rtcpSendWorker() {
// logger.Debugw("writing RTCP", "packet", pkt)
//}
if err := p.publisher.pc.WriteRTCP(pkts); err != nil {
logger.Errorw("could not write RTCP to participant",
"participant", p.Identity(),
"err", err)
logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity())
}
}
}

View File

@@ -6,10 +6,11 @@ import (
"sync/atomic"
"time"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/utils"
)
const (
@@ -360,7 +361,7 @@ func (r *Room) onTrackAdded(participant types.Participant, track types.Published
"remoteTrack", track.ID(),
"dest", existingParticipant.Identity())
if err := track.AddSubscriber(existingParticipant); err != nil {
logger.Errorw("could not subscribe to remoteTrack",
logger.Errorw("could not subscribe to remoteTrack", err,
"source", participant.Identity(),
"remoteTrack", track.ID(),
"dest", existingParticipant.Identity())
@@ -415,7 +416,7 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) {
}
if n, err := op.AddSubscriber(p); err != nil {
// TODO: log error? or disconnect?
logger.Errorw("could not subscribe to participant",
logger.Errorw("could not subscribe to participant", err,
"dest", p.Identity(),
"source", op.Identity())
} else {
@@ -439,9 +440,8 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
err := op.SendParticipantUpdate(updates)
if err != nil {
logger.Errorw("could not send update to participant",
"participant", p.Identity(),
"err", err)
logger.Errorw("could not send update to participant", err,
"participant", p.Identity())
}
}
}

View File

@@ -6,10 +6,11 @@ import (
"time"
"github.com/bep/debounce"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/logger"
livekit "github.com/livekit/livekit-server/proto"
)
@@ -95,7 +96,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
if state == webrtc.ICEGathererStateComplete {
if restart, ok := t.restartAfterGathering.Load().(bool); ok && restart {
if err := t.CreateAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
logger.Warnw("could not restart ICE", "error", err)
logger.Warnw("could not restart ICE", err)
}
}
}
@@ -156,7 +157,7 @@ func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) {
func (t *PCTransport) Negotiate() {
t.debouncedNegotiate(func() {
if err := t.CreateAndSendOffer(nil); err != nil {
logger.Errorw("could not negotiate", "error", err)
logger.Errorw("could not negotiate", err)
}
})
}
@@ -200,13 +201,13 @@ func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error {
offer, err := t.pc.CreateOffer(options)
if err != nil {
logger.Errorw("could not create offer", "err", err)
logger.Errorw("could not create offer", err)
return err
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
logger.Errorw("could not set local description", "err", err)
logger.Errorw("could not set local description", err)
return err
}

View File

@@ -5,14 +5,15 @@ import (
"sync"
"time"
"github.com/livekit/protocol/utils"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/utils"
"github.com/pion/webrtc/v3"
)
const (
@@ -193,7 +194,7 @@ func (r *RoomManager) Stop() {
func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
room, err := r.getOrCreateRoom(roomName)
if err != nil {
logger.Errorw("could not create room", "error", err)
logger.Errorw("could not create room", err)
return
}
@@ -214,10 +215,8 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
}
participant.SetResponseSink(responseSink)
if err := participant.ICERestart(); err != nil {
logger.Warnw("could not restart ICE",
"participant", pi.Identity,
"error", err,
)
logger.Warnw("could not restart ICE", err,
"participant", pi.Identity)
}
return
} else {
@@ -248,7 +247,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
Stats: room.GetStatsReporter(),
})
if err != nil {
logger.Errorw("could not create participant", "error", err)
logger.Errorw("could not create participant", err)
return
}
if pi.Metadata != "" {
@@ -264,7 +263,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
AutoSubscribe: pi.AutoSubscribe,
}
if err := room.Join(participant, &opts); err != nil {
logger.Errorw("could not join room", "error", err)
logger.Errorw("could not join room", err)
return
}
@@ -291,7 +290,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
room = rtc.NewRoom(ri, *r.rtcConfig, r.iceServersForRoom(ri), r.config.Audio.UpdateInterval)
room.OnClose(func() {
if err := r.DeleteRoom(roomName); err != nil {
logger.Errorw("could not delete room", "error", err)
logger.Errorw("could not delete room", err)
}
// print stats
logger.Infow("room closed",
@@ -307,7 +306,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
err = r.roomStore.PersistParticipant(roomName, p.ToProto())
}
if err != nil {
logger.Errorw("could not handle participant change", "error", err)
logger.Errorw("could not handle participant change", err)
}
})
r.lock.Lock()
@@ -346,7 +345,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
case *livekit.SignalRequest_Offer:
_, err := participant.HandleOffer(rtc.FromProtoSessionDescription(msg.Offer))
if err != nil {
logger.Errorw("could not handle offer", "err", err, "participant", participant.Identity())
logger.Errorw("could not handle offer", err, "participant", participant.Identity())
return
}
case *livekit.SignalRequest_AddTrack:
@@ -355,30 +354,30 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
participant.AddTrack(msg.AddTrack.Cid, msg.AddTrack.Name, msg.AddTrack.Type)
case *livekit.SignalRequest_Answer:
if participant.State() == livekit.ParticipantInfo_JOINING {
logger.Errorw("cannot negotiate before peer offer", "participant", participant.Identity())
logger.Errorw("cannot negotiate before peer offer", nil, "participant", participant.Identity())
//conn.WriteJSON(jsonError(http.StatusNotAcceptable, "cannot negotiate before peer offer"))
return
}
sd := rtc.FromProtoSessionDescription(msg.Answer)
if err := participant.HandleAnswer(sd); err != nil {
logger.Errorw("could not handle answer", "participant", participant.Identity(), "err", err)
logger.Errorw("could not handle answer", err, "participant", participant.Identity())
}
case *livekit.SignalRequest_Trickle:
candidateInit, err := rtc.FromProtoTrickle(msg.Trickle)
if err != nil {
logger.Errorw("could not decode trickle", "participant", participant.Identity(), "err", err)
logger.Errorw("could not decode trickle", err, "participant", participant.Identity())
break
}
//logger.Debugw("adding peer candidate", "participant", participant.ID())
if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil {
logger.Errorw("could not handle trickle", "participant", participant.Identity(), "err", err)
logger.Errorw("could not handle trickle", err, "participant", participant.Identity())
}
case *livekit.SignalRequest_Mute:
participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted)
case *livekit.SignalRequest_Subscription:
if participant.CanSubscribe() {
if err := room.UpdateSubscriptions(participant, msg.Subscription); err != nil {
logger.Warnw("could not update subscription",
logger.Warnw("could not update subscription", err,
"participant", participant.Identity(),
"tracks", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
@@ -411,7 +410,7 @@ func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.R
r.lock.RUnlock()
if room == nil {
logger.Warnw("Could not find room", "room", roomName)
logger.Warnw("Could not find room", nil, "room", roomName)
return
}

View File

@@ -31,8 +31,8 @@ func NewRTCService(conf *config.Config, roomStore RoomStore, roomManager *RoomMa
upgrader: websocket.Upgrader{
// increase buffer size to avoid errors such as
// read: connection reset by peer
//ReadBufferSize: 10240,
//WriteBufferSize: 10240,
// ReadBufferSize: 10240,
// WriteBufferSize: 10240,
},
currentNode: currentNode,
isDev: conf.Development,
@@ -125,9 +125,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// upgrade only once the basics are good to go
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Warnw("could not upgrade to WS",
"err", err,
)
logger.Warnw("could not upgrade to WS", err)
handleError(w, http.StatusInternalServerError, err.Error())
return
}
@@ -161,7 +159,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
res, ok := msg.(*livekit.SignalResponse)
if !ok {
logger.Errorw("unexpected message type",
logger.Errorw("unexpected message type", nil,
"type", fmt.Sprintf("%T", msg),
"participant", pi.Identity,
"connectionId", connId)
@@ -169,7 +167,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if err = sigConn.WriteResponse(res); err != nil {
logger.Warnw("error writing to websocket", "error", err)
logger.Warnw("error writing to websocket", err)
return
}
}
@@ -185,13 +183,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
return
} else {
logger.Errorw("error reading from websocket", "error", err)
logger.Errorw("error reading from websocket", err)
return
}
}
if err := reqSink.WriteMessage(req); err != nil {
logger.Warnw("error writing to request sink",
"error", err,
logger.Warnw("error writing to request sink", err,
"participant", pi.Identity,
"connectionId", connId)
}

View File

@@ -113,7 +113,7 @@ func (s *LivekitServer) Start() error {
}
defer func() {
if err := s.router.UnregisterNode(); err != nil {
logger.Errorw("could not unregister node", "error", err)
logger.Errorw("could not unregister node", err)
}
}()
@@ -161,7 +161,7 @@ func (s *LivekitServer) Start() error {
}
logger.Infow("starting LiveKit server", values...)
if err := s.httpServer.Serve(ln); err != http.ErrServerClosed {
logger.Errorw("could not start server", "error", err)
logger.Errorw("could not start server", err)
s.Stop()
}
}()
@@ -173,7 +173,7 @@ func (s *LivekitServer) Start() error {
<-s.doneChan
if err := s.router.UnregisterNode(); err != nil {
logger.Errorw("could not unregister node", "error", err)
logger.Errorw("could not unregister node", err)
}
// wait for shutdown

View File

@@ -112,7 +112,7 @@ func (w *TrackWriter) writeOgg() {
}
if err != nil {
logger.Errorw("could not parse ogg page", "err", err)
logger.Errorw("could not parse ogg page", err)
return
}
@@ -122,7 +122,7 @@ func (w *TrackWriter) writeOgg() {
sampleDuration := time.Duration((sampleCount/48000)*1000) * time.Millisecond
if err = w.track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
logger.Errorw("could not write sample", "err", err)
logger.Errorw("could not write sample", err)
return
}
@@ -146,13 +146,13 @@ func (w *TrackWriter) writeVP8() {
}
if err != nil {
logger.Errorw("could not parse VP8 frame", "err", err)
logger.Errorw("could not parse VP8 frame", err)
return
}
time.Sleep(sleepTime)
if err = w.track.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil {
logger.Errorw("could not write sample", "err", err)
logger.Errorw("could not write sample", err)
return
}
}