Use participant and room specific loggers (#252)

This commit is contained in:
David Zhao
2021-12-10 15:51:05 -08:00
committed by GitHub
parent bd42a39117
commit 8abd734d16
10 changed files with 132 additions and 154 deletions
+5 -5
View File
@@ -265,8 +265,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
}
t.params.Logger.Debugw("removing peerconnection track",
"track", t.ID(),
"pIDs", []string{t.params.ParticipantID, sub.ID()},
"participant", sub.Identity(),
"subscriber", sub.Identity(),
"subscriberID", sub.ID(),
"kind", t.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
@@ -279,7 +279,9 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
// been set to Inactive
t.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"participant", sub.Identity(), "pID", sub.ID())
"subscriber", sub.Identity(),
"subscriberID", sub.ID(),
)
}
}
@@ -327,8 +329,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
if buff == nil || rtcpReader == nil {
logger.Errorw("could not retrieve buffer pair", nil,
"participant", t.params.ParticipantIdentity,
"participantID", t.params.ParticipantID,
"track", t.ID())
return
}
+26 -49
View File
@@ -36,6 +36,7 @@ const (
type ParticipantParams struct {
Identity string
SID string
Config *WebRTCConfig
Sink routing.MessageSink
AudioConfig config.AudioConfig
@@ -50,7 +51,6 @@ type ParticipantParams struct {
type ParticipantImpl struct {
params ParticipantParams
id string
publisher *PCTransport
subscriber *PCTransport
isClosed utils.AtomicFlag
@@ -102,7 +102,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p := &ParticipantImpl{
params: params,
id: utils.NewGuid(utils.ParticipantPrefix),
rtcpCh: make(chan []rtcp.Packet, 50),
pliThrottle: newPLIThrottle(params.ThrottleConfig),
subscribedTracks: make(map[string]types.SubscribedTrack),
@@ -118,7 +117,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, err
}
p.publisher, err = NewPCTransport(TransportParams{
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
@@ -130,7 +129,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, err
}
p.subscriber, err = NewPCTransport(TransportParams{
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
@@ -188,7 +187,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
}
func (p *ParticipantImpl) ID() string {
return p.id
return p.params.SID
}
func (p *ParticipantImpl) Identity() string {
@@ -231,7 +230,7 @@ func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet {
func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
info := &livekit.ParticipantInfo{
Sid: p.id,
Sid: p.params.SID,
Identity: p.params.Identity,
Metadata: p.metadata,
State: p.State(),
@@ -288,8 +287,8 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) {
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) {
p.params.Logger.Debugw("answering pub offer", "state", p.State().String(),
"participant", p.Identity(), "pID", p.ID(),
p.params.Logger.Debugw("answering pub offer",
"state", p.State().String(),
//"sdp", sdp.SDP,
)
@@ -313,10 +312,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
return
}
p.params.Logger.Debugw("sending answer to client",
"participant", p.Identity(), "pID", p.ID(),
//"answer sdp", answer.SDP,
)
p.params.Logger.Debugw("sending answer to client")
err = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
@@ -351,8 +348,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
}
if !p.CanPublish() {
p.params.Logger.Warnw("no permission to publish track", nil,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Warnw("no permission to publish track", nil)
return
}
@@ -385,10 +381,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
if sdp.Type != webrtc.SDPTypeAnswer {
return ErrUnexpectedOffer
}
p.params.Logger.Debugw("setting subPC answer",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", sdp.SDP,
)
p.params.Logger.Debugw("setting subPC answer")
if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
return errors.Wrap(err, "could not set remote description")
@@ -485,8 +478,8 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
}
p.params.Logger.Debugw("subscribing new participant to tracks",
"participants", []string{p.Identity(), op.Identity()},
"pIDs", []string{p.ID(), op.ID()},
"subscriber", op.Identity(),
"subscriberID", op.ID(),
"numTracks", len(tracks))
n := 0
@@ -646,8 +639,6 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bo
if currentMuted != track.IsMuted() && p.onTrackUpdated != nil {
p.params.Logger.Debugw("mute status changed",
"participant", p.Identity(),
"pID", p.ID(),
"track", trackId,
"muted", track.IsMuted())
p.onTrackUpdated(p, track)
@@ -799,8 +790,9 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {
// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("added subscribedTrack", "publisher", subTrack.PublisherIdentity(),
"participant", p.Identity(), "track", subTrack.ID())
p.params.Logger.Debugw("added subscribedTrack",
"publisher", subTrack.PublisherIdentity(),
"track", subTrack.ID())
p.lock.Lock()
p.subscribedTracks[subTrack.ID()] = subTrack
p.lock.Unlock()
@@ -812,7 +804,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(),
"participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
"track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
p.subscriber.RemoveTrack(subTrack)
@@ -836,8 +828,6 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki
// write candidate
p.params.Logger.Debugw("sending ice candidates",
"participant", p.Identity(),
"pID", p.ID(),
"candidate", c.String())
trickle := ToProtoTrickle(ci)
trickle.Target = target
@@ -854,7 +844,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
return
}
p.state.Store(state)
p.params.Logger.Debugw("updating participant state", "state", state.String(), "participant", p.Identity(), "pID", p.ID())
p.params.Logger.Debugw("updating participant state", "state", state.String())
p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
@@ -877,8 +867,6 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
err := sink.WriteMessage(msg)
if err != nil {
p.params.Logger.Warnw("could not send message to participant", err,
"pID", p.ID(),
"participant", p.Identity(),
"message", fmt.Sprintf("%T", msg.Message))
return err
}
@@ -888,15 +876,11 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
// when the server has an offer for participant
func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
p.params.Logger.Debugw("skipping server offer", "participant", p.Identity(), "pID", p.ID())
// skip when disconnected
return
}
p.params.Logger.Debugw("sending server offer to participant",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", offer.SDP,
)
p.params.Logger.Debugw("sending server offer to participant")
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
@@ -918,15 +902,12 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
p.params.Logger.Debugw("mediaTrack added",
"kind", track.Kind().String(),
"participant", p.Identity(),
"pID", p.ID(),
"track", track.ID(),
"rid", track.RID(),
"SSRC", track.SSRC())
if !p.CanPublish() {
p.params.Logger.Warnw("no permission to publish mediaTrack", nil,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Warnw("no permission to publish mediaTrack", nil)
return
}
@@ -946,7 +927,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
TrackInfo: ti,
SignalCid: signalCid,
SdpCid: track.ID(),
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.Identity(),
RTCPChan: p.rtcpCh,
BufferFactory: p.params.Config.BufferFactory,
@@ -996,7 +977,7 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
p.handleDataMessage(livekit.DataPacket_LOSSY, msg.Data)
})
default:
p.params.Logger.Warnw("unsupported datachannel added", nil, "participant", p.Identity(), "pID", p.ID(), "label", dc.Label())
p.params.Logger.Warnw("unsupported datachannel added", nil, "label", dc.Label())
}
}
@@ -1060,7 +1041,7 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [
switch payload := dp.Value.(type) {
case *livekit.DataPacket_User:
if p.onDataPacket != nil {
payload.User.ParticipantSid = p.id
payload.User.ParticipantSid = p.params.SID
p.onDataPacket(p, &dp)
}
default:
@@ -1092,8 +1073,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
}
func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnectionState) {
// p.params.Logger.Debugw("ICE connection state changed", "state", state.String(),
// "participant", p.identity, "pID", p.ID())
if state == webrtc.ICEConnectionStateConnected {
prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
p.updateState(livekit.ParticipantInfo_ACTIVE)
@@ -1160,8 +1139,7 @@ func (p *ParticipantImpl) downTracksRTCPWorker() {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
logger.Errorw("could not send downtrack reports", err,
"participant", p.Identity(), "pID", p.ID())
logger.Errorw("could not send downtrack reports", err)
}
}
@@ -1200,8 +1178,7 @@ func (p *ParticipantImpl) rtcpSendWorker() {
if len(fwdPkts) > 0 {
if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil {
p.params.Logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Errorw("could not write RTCP to participant", err)
}
}
}
@@ -1336,7 +1313,7 @@ func (p *ParticipantImpl) onStreamStateChange(update *sfu.StreamStateUpdate) err
func (p *ParticipantImpl) DebugInfo() map[string]interface{} {
info := map[string]interface{}{
"ID": p.id,
"ID": p.params.SID,
"State": p.State().String(),
}
+5 -2
View File
@@ -60,7 +60,7 @@ type ParticipantOptions struct {
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room {
r := &Room{
Room: proto.Clone(room).(*livekit.Room),
Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)),
Logger: LoggerWithRoom(logger.Logger(logger.GetLogger()), room.Name),
config: config,
audioConfig: audioConfig,
telemetry: telemetry,
@@ -172,7 +172,10 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
// it's important to set this before connection, we don't want to miss out on any publishedTracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
r.Logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(), "pID", p.ID(),
r.Logger.Debugw("participant state changed",
"state", p.State(),
"participant", p.Identity(),
"pID", p.ID(),
"oldState", oldState)
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
+10 -40
View File
@@ -6,53 +6,32 @@ import (
"github.com/livekit/protocol/logger"
)
func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest) error {
func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest, pLogger logger.Logger) error {
switch msg := req.Message.(type) {
case *livekit.SignalRequest_Offer:
_, err := participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
if err != nil {
logger.Errorw("could not handle offer", err,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
)
pLogger.Errorw("could not handle offer", err)
return err
}
case *livekit.SignalRequest_AddTrack:
logger.Debugw("add track request",
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
"track", msg.AddTrack.Cid)
pLogger.Debugw("add track request", "track", msg.AddTrack.Cid)
participant.AddTrack(msg.AddTrack)
case *livekit.SignalRequest_Answer:
sd := FromProtoSessionDescription(msg.Answer)
if err := participant.HandleAnswer(sd); err != nil {
logger.Errorw("could not handle answer", err,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
)
pLogger.Errorw("could not handle answer", err)
// connection cannot be successful if we can't answer
return err
}
case *livekit.SignalRequest_Trickle:
candidateInit, err := FromProtoTrickle(msg.Trickle)
if err != nil {
logger.Warnw("could not decode trickle", err,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
)
pLogger.Warnw("could not decode trickle", err)
return nil
}
// logger.Debugw("adding peer candidate", "participant", participant.Identity())
if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil {
logger.Warnw("could not handle trickle", err,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
)
pLogger.Warnw("could not handle trickle", err)
}
case *livekit.SignalRequest_Mute:
participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted, false)
@@ -67,10 +46,7 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req
err = ErrCannotSubscribe
}
if err != nil {
logger.Warnw("could not update subscription", err,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
pLogger.Warnw("could not update subscription", err,
"tracks", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
}
@@ -78,26 +54,20 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req
for _, sid := range msg.TrackSetting.TrackSids {
subTrack := participant.GetSubscribedTrack(sid)
if subTrack == nil {
logger.Warnw("unable to find SubscribedTrack", nil,
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
pLogger.Warnw("unable to find SubscribedTrack", nil,
"track", sid)
continue
}
// find quality for published track
logger.Debugw("updating track settings",
"room", room.Name(),
"participant", participant.Identity(),
"pID", participant.ID(),
pLogger.Debugw("updating track settings",
"settings", msg.TrackSetting)
subTrack.UpdateSubscriberSettings(msg.TrackSetting)
}
case *livekit.SignalRequest_UpdateLayers:
track := participant.GetPublishedTrack(msg.UpdateLayers.TrackSid)
if track == nil {
logger.Warnw("could not find published track", nil,
pLogger.Warnw("could not find published track", nil,
"track", msg.UpdateLayers.TrackSid)
return nil
}
+2 -3
View File
@@ -5,8 +5,8 @@ import (
"time"
"github.com/bep/debounce"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/livekit"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
@@ -100,8 +100,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{
ParticipantID: params.ParticipantID,
Logger: params.Logger,
Logger: params.Logger,
})
t.streamAllocator.Start()
}
+23 -1
View File
@@ -6,8 +6,9 @@ import (
"io"
"strings"
"github.com/go-logr/logr"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/livekit"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -130,3 +131,24 @@ func Recover() {
logger.GetLogger().Error(err, "recovered panic", "panic", r)
}
}
// logger helpers
func LoggerWithParticipant(l logger.Logger, identity, sid string) logger.Logger {
lr := logr.Logger(l)
if identity != "" {
lr = lr.WithValues("participant", identity)
}
if sid != "" {
lr = lr.WithValues("pID", sid)
}
return logger.Logger(lr)
}
func LoggerWithRoom(l logger.Logger, name string) logger.Logger {
lr := logr.Logger(l)
return logger.Logger(
lr.WithValues(
"room", name,
),
)
}
+27 -15
View File
@@ -8,6 +8,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -225,8 +226,11 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout
pv := types.ProtocolVersion(pi.Client.Protocol)
rtcConf := *r.rtcConfig
rtcConf.SetBufferFactory(room.GetBufferFactor())
sid := utils.NewGuid(utils.ParticipantPrefix)
pLogger := rtc.LoggerWithParticipant(room.Logger, pi.Identity, sid)
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
Identity: pi.Identity,
SID: sid,
Config: &rtcConf,
Sink: responseSink,
AudioConfig: r.config.Audio,
@@ -235,7 +239,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout
ThrottleConfig: r.config.RTC.PLIThrottle,
EnabledCodecs: room.Room.EnabledCodecs,
Hidden: pi.Hidden,
Logger: room.Logger,
Logger: pLogger,
})
if err != nil {
logger.Errorw("could not create participant", err)
@@ -254,11 +258,11 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout
AutoSubscribe: pi.AutoSubscribe,
}
if err = room.Join(participant, &opts, r.iceServersForRoom(room.Room)); err != nil {
logger.Errorw("could not join room", err)
pLogger.Errorw("could not join room", err)
return
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
logger.Errorw("could not store participant", err)
pLogger.Errorw("could not store participant", err)
}
// update roomstore with new numParticipants
if !participant.Hidden() {
@@ -271,7 +275,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto())
participant.OnClose(func(p types.Participant) {
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
logger.Errorw("could not delete participant", err)
pLogger.Errorw("could not delete participant", err)
}
// update roomstore with new numParticipants
if !participant.Hidden() {
@@ -346,6 +350,11 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
}()
defer rtc.Recover()
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), room.Name()),
participant.Identity(), participant.ID(),
)
for {
select {
case <-time.After(time.Millisecond * 50):
@@ -361,7 +370,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
}
req := obj.(*livekit.SignalRequest)
if err := rtc.HandleParticipantSignal(room, participant, req); err != nil {
if err := rtc.HandleParticipantSignal(room, participant, req, pLogger); err != nil {
// more specific errors are already logged
// treat errors returned as fatal
return
@@ -382,22 +391,27 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s
}
participant := room.GetParticipant(identity)
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), roomName),
identity,
"",
)
switch rm := msg.Message.(type) {
case *livekit.RTCNodeMessage_RemoveParticipant:
if participant == nil {
return
}
logger.Infow("removing participant", "room", roomName, "participant", identity)
pLogger.Infow("removing participant")
room.RemoveParticipant(identity)
case *livekit.RTCNodeMessage_MuteTrack:
if participant == nil {
return
}
logger.Debugw("setting track muted", "room", roomName, "participant", identity,
pLogger.Debugw("setting track muted",
"track", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted)
if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute {
logger.Errorw("cannot unmute track, remote unmute is disabled", nil)
pLogger.Errorw("cannot unmute track, remote unmute is disabled", nil)
return
}
participant.SetTrackMuted(rm.MuteTrack.TrackSid, rm.MuteTrack.Muted, true)
@@ -405,7 +419,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s
if participant == nil {
return
}
logger.Debugw("updating participant", "room", roomName, "participant", identity)
pLogger.Debugw("updating participant")
if rm.UpdateParticipant.Metadata != "" {
participant.SetMetadata(rm.UpdateParticipant.Metadata)
}
@@ -421,23 +435,21 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s
if participant == nil {
return
}
logger.Debugw("updating participant subscriptions", "room", roomName, "participant", identity)
pLogger.Debugw("updating participant subscriptions")
if err := room.UpdateSubscriptions(participant, rm.UpdateSubscriptions.TrackSids, rm.UpdateSubscriptions.Subscribe); err != nil {
logger.Warnw("could not update subscription", err,
"participant", participant.Identity(),
"pID", participant.ID(),
pLogger.Warnw("could not update subscription", err,
"tracks", rm.UpdateSubscriptions.TrackSids,
"subscribe", rm.UpdateSubscriptions.Subscribe)
}
case *livekit.RTCNodeMessage_SendData:
logger.Debugw("SendData", "message", rm)
pLogger.Debugw("SendData", "size", len(rm.SendData.Data))
up := &livekit.UserPacket{
Payload: rm.SendData.Data,
DestinationSids: rm.SendData.DestinationSids,
}
room.SendDataPacket(up, rm.SendData.Kind)
case *livekit.RTCNodeMessage_UpdateRoomMetadata:
logger.Debugw("updating room", "room", roomName)
pLogger.Debugw("updating room")
room.SetMetadata(rm.UpdateRoomMetadata.Metadata)
}
}
+13 -14
View File
@@ -9,8 +9,8 @@ import (
"strings"
"github.com/gorilla/websocket"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -144,10 +144,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), roomName),
pi.Identity, "",
)
done := make(chan struct{})
// function exits when websocket terminates, it'll close the event reading off of response sink as well
defer func() {
logger.Infow("server closing WS connection", "participant", pi.Identity, "connID", connId)
pLogger.Infow("server closing WS connection", "connID", connId)
reqSink.Close()
close(done)
}()
@@ -156,7 +160,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1)
logger.Warnw("could not upgrade to WS", err)
pLogger.Warnw("could not upgrade to WS", err)
handleError(w, http.StatusInternalServerError, err.Error())
return
}
@@ -166,11 +170,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
logger.Infow("new client WS connected",
pLogger.Infow("new client WS connected",
"connID", connId,
"roomID", rm.Sid,
"room", rm.Name,
"participant", pi.Identity,
)
// handle responses
@@ -187,22 +189,20 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
case msg := <-resSource.ReadChan():
if msg == nil {
logger.Infow("source closed connection",
"participant", pi.Identity,
pLogger.Infow("source closed connection",
"connID", connId)
return
}
res, ok := msg.(*livekit.SignalResponse)
if !ok {
logger.Errorw("unexpected message type", nil,
pLogger.Errorw("unexpected message type", nil,
"type", fmt.Sprintf("%T", msg),
"participant", pi.Identity,
"connID", connId)
continue
}
if err = sigConn.WriteResponse(res); err != nil {
logger.Warnw("error writing to websocket", err)
pLogger.Warnw("error writing to websocket", err)
return
}
}
@@ -218,13 +218,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
return
} else {
logger.Errorw("error reading from websocket", err)
pLogger.Errorw("error reading from websocket", err)
return
}
}
if err := reqSink.WriteMessage(req); err != nil {
logger.Warnw("error writing to request sink", err,
"participant", pi.Identity,
pLogger.Warnw("error writing to request sink", err,
"connID", connId)
}
}
+6 -9
View File
@@ -117,13 +117,11 @@ import (
)
type ProberParams struct {
ParticipantID string
Logger logger.Logger
Logger logger.Logger
}
type Prober struct {
participantID string
logger logger.Logger
logger logger.Logger
clustersMu sync.RWMutex
clusters deque.Deque
@@ -134,8 +132,7 @@ type Prober struct {
func NewProber(params ProberParams) *Prober {
p := &Prober{
participantID: params.ParticipantID,
logger: params.Logger,
logger: params.Logger,
}
p.clusters.SetMinCapacity(2)
return p
@@ -153,7 +150,7 @@ func (p *Prober) Reset() {
defer p.clustersMu.Unlock()
if p.activeCluster != nil {
p.logger.Debugw("resetting active cluster", "participant", p.participantID, "cluster", p.activeCluster.String())
p.logger.Debugw("resetting active cluster", "cluster", p.activeCluster.String())
}
p.clusters.Clear()
@@ -170,7 +167,7 @@ func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration
}
cluster := NewCluster(desiredRateBps, expectedRateBps, minDuration, maxDuration)
p.logger.Debugw("cluster added", "participant", p.participantID, "cluster", cluster.String())
p.logger.Debugw("cluster added", "cluster", cluster.String())
p.pushBackClusterAndMaybeStart(cluster)
}
@@ -258,7 +255,7 @@ func (p *Prober) run() {
cluster.Process(p)
if cluster.IsFinished() {
p.logger.Debugw("cluster finished", "participant", p.participantID, "cluster", cluster.String())
p.logger.Debugw("cluster finished", "cluster", cluster.String())
p.popFrontCluster(cluster)
continue
}
+15 -16
View File
@@ -192,13 +192,11 @@ func (s Signal) String() string {
}
type StreamAllocatorParams struct {
ParticipantID string
Logger logger.Logger
Logger logger.Logger
}
type StreamAllocator struct {
participantID string
logger logger.Logger
logger logger.Logger
onStreamStateChange func(update *StreamStateUpdate) error
@@ -234,13 +232,11 @@ type Event struct {
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
s := &StreamAllocator{
participantID: params.ParticipantID,
logger: params.Logger,
audioTracks: make(map[string]*Track),
videoTracks: make(map[string]*Track),
logger: params.Logger,
audioTracks: make(map[string]*Track),
videoTracks: make(map[string]*Track),
prober: NewProber(ProberParams{
ParticipantID: params.ParticipantID,
Logger: params.Logger,
Logger: params.Logger,
}),
eventCh: make(chan Event, 20),
runningCh: make(chan struct{}),
@@ -517,7 +513,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
}
if !found {
if len(remb.SSRCs) == 0 {
s.logger.Warnw("no SSRC to track REMB", nil, "participant", s.participantID)
s.logger.Warnw("no SSRC to track REMB", nil)
return
}
@@ -542,7 +538,10 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
s.prevReceivedEstimate = s.receivedEstimate
s.receivedEstimate = int64(remb.Bitrate)
if s.prevReceivedEstimate != s.receivedEstimate {
s.logger.Debugw("received new estimate", "participant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate)
s.logger.Debugw("received new estimate",
"old(bps)", s.prevReceivedEstimate,
"new(bps)", s.receivedEstimate,
)
}
if s.maybeCommitEstimate() {
@@ -647,7 +646,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) {
func (s *StreamAllocator) setState(state State) {
if s.state != state {
s.logger.Infow("state change", "participant", s.participantID, "from", s.state, "to", state)
s.logger.Infow("state change", "from", s.state, "to", state)
}
s.state = state
@@ -700,7 +699,7 @@ func (s *StreamAllocator) maybeCommitEstimate() (isDecreasing bool) {
s.committedChannelCapacity = s.receivedEstimate
s.lastCommitTime = time.Now()
s.logger.Debugw("committing channel capacity", "participant", s.participantID, "capacity(bps)", s.committedChannelCapacity)
s.logger.Debugw("committing channel capacity", "capacity(bps)", s.committedChannelCapacity)
return
}
@@ -874,11 +873,11 @@ func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
return
}
s.logger.Debugw("streamed tracks changed", "participant", s.participantID, "update", update)
s.logger.Debugw("streamed tracks changed", "update", update)
if s.onStreamStateChange != nil {
err := s.onStreamStateChange(update)
if err != nil {
s.logger.Errorw("could not send streamed tracks update", err, "participant", s.participantID)
s.logger.Errorw("could not send streamed tracks update", err)
}
}
}