Room logger with room name (#165)

* room with logger

* participant with room logger

* transport with room logger

* simplify room logger usage

* simplify logger

* update protocol

* more room logging, test fix
This commit is contained in:
Mathew Kamkar
2021-11-02 14:02:45 -07:00
committed by GitHub
parent 5e0833a250
commit 05c4df4e23
6 changed files with 84 additions and 57 deletions

4
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.9.11
github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
@@ -48,3 +48,5 @@ require (
)
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.14
// replace github.com/livekit/protocol => ../protocol

4
go.sum
View File

@@ -250,8 +250,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.14 h1:/eCaRUIa8KxyaTdjOQFJJJGDrjIVClL14BkYO2NqVSw=
github.com/livekit/ion-sfu v1.20.14/go.mod h1:Nmf1dro+y5vr0laNSJSrYDMDYrjERcEWVbWyenEWZ0A=
github.com/livekit/protocol v0.9.11 h1:lSsxWrnqxGX6IkPOQfGZsboMSzwQRKbneNgDoRvK7zA=
github.com/livekit/protocol v0.9.11/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b h1:jdos5NJ+yOn6AdOLrGp6IeGit+hJyN6W9XtAP5wC9YY=
github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -42,6 +42,7 @@ type ParticipantParams struct {
ThrottleConfig config.PLIThrottleConfig
EnabledCodecs []*livekit.Codec
Hidden bool
Logger logger.Logger
}
type ParticipantImpl struct {
@@ -116,6 +117,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
Config: params.Config,
Stats: p.params.Stats,
EnabledCodecs: p.params.EnabledCodecs,
Logger: params.Logger,
})
if err != nil {
return nil, err
@@ -124,6 +126,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
Stats: p.params.Stats,
Logger: params.Logger,
})
if err != nil {
return nil, err
@@ -272,7 +275,7 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) {
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) {
logger.Debugw("answering pub offer", "state", p.State().String(),
p.params.Logger.Debugw("answering pub offer", "state", p.State().String(),
"participant", p.Identity(), "pID", p.ID(),
//"sdp", sdp.SDP,
)
@@ -297,7 +300,7 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
return
}
logger.Debugw("sending answer to client",
p.params.Logger.Debugw("sending answer to client",
"participant", p.Identity(), "pID", p.ID(),
//"answer sdp", answer.SDP,
)
@@ -335,7 +338,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
}
if !p.CanPublish() {
logger.Warnw("no permission to publish track", nil,
p.params.Logger.Warnw("no permission to publish track", nil,
"participant", p.Identity(), "pID", p.ID())
return
}
@@ -368,7 +371,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
if sdp.Type != webrtc.SDPTypeAnswer {
return ErrUnexpectedOffer
}
logger.Debugw("setting subPC answer",
p.params.Logger.Debugw("setting subPC answer",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", sdp.SDP,
)
@@ -473,7 +476,7 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
return 0, nil
}
logger.Debugw("subscribing new participant to tracks",
p.params.Logger.Debugw("subscribing new participant to tracks",
"participants", []string{p.Identity(), op.Identity()},
"pIDs", []string{p.ID(), op.ID()},
"numTracks", len(tracks))
@@ -611,7 +614,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bo
if track == nil {
if !isPending {
logger.Warnw("could not locate track", nil, "track", trackId)
p.params.Logger.Warnw("could not locate track", nil, "track", trackId)
}
return
}
@@ -631,7 +634,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bo
}
if currentMuted != track.IsMuted() && p.onTrackUpdated != nil {
logger.Debugw("mute status changed",
p.params.Logger.Debugw("mute status changed",
"participant", p.Identity(),
"pID", p.ID(),
"track", trackId,
@@ -715,7 +718,7 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {
// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.SubscribedTrack) {
logger.Debugw("added subscribedTrack", "pIDs", []string{pubId, p.ID()},
p.params.Logger.Debugw("added subscribedTrack", "pIDs", []string{pubId, p.ID()},
"participant", p.Identity(), "track", subTrack.ID())
p.lock.Lock()
p.subscribedTracks[subTrack.ID()] = subTrack
@@ -724,7 +727,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.Subscr
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(pubId string, subTrack types.SubscribedTrack) {
logger.Debugw("removed subscribedTrack", "pIDs", []string{pubId, p.ID()},
p.params.Logger.Debugw("removed subscribedTrack", "pIDs", []string{pubId, p.ID()},
"participant", p.Identity(), "track", subTrack.ID())
p.lock.Lock()
delete(p.subscribedTracks, subTrack.ID())
@@ -735,7 +738,7 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki
ci := c.ToJSON()
// write candidate
logger.Debugw("sending ice candidates",
p.params.Logger.Debugw("sending ice candidates",
"participant", p.Identity(),
"pID", p.ID(),
"candidate", c.String())
@@ -754,7 +757,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
return
}
p.state.Store(state)
logger.Debugw("updating participant state", "state", state.String(), "participant", p.Identity(), "pID", p.ID())
p.params.Logger.Debugw("updating participant state", "state", state.String(), "participant", p.Identity(), "pID", p.ID())
p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
@@ -776,7 +779,7 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
}
err := sink.WriteMessage(msg)
if err != nil {
logger.Warnw("could not send message to participant", err,
p.params.Logger.Warnw("could not send message to participant", err,
"pID", p.ID(),
"participant", p.Identity(),
"message", fmt.Sprintf("%T", msg.Message))
@@ -788,12 +791,12 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
// when the server has an offer for participant
func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
logger.Debugw("skipping server offer", "participant", p.Identity(), "pID", p.ID())
p.params.Logger.Debugw("skipping server offer", "participant", p.Identity(), "pID", p.ID())
// skip when disconnected
return
}
logger.Debugw("sending server offer to participant",
p.params.Logger.Debugw("sending server offer to participant",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", offer.SDP,
)
@@ -816,7 +819,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
return
}
logger.Debugw("mediaTrack added",
p.params.Logger.Debugw("mediaTrack added",
"kind", track.Kind().String(),
"participant", p.Identity(),
"pID", p.ID(),
@@ -825,7 +828,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"SSRC", track.SSRC())
if !p.CanPublish() {
logger.Warnw("no permission to publish mediaTrack", nil,
p.params.Logger.Warnw("no permission to publish mediaTrack", nil,
"participant", p.Identity(), "pID", p.ID())
return
}
@@ -895,7 +898,7 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
p.handleDataMessage(livekit.DataPacket_LOSSY, msg.Data)
})
default:
logger.Warnw("unsupported datachannel added", nil, "participant", p.Identity(), "pID", p.ID(), "label", dc.Label())
p.params.Logger.Warnw("unsupported datachannel added", nil, "participant", p.Identity(), "pID", p.ID(), "label", dc.Label())
}
}
@@ -940,7 +943,7 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp
// if still not found, we are done
if ti == nil {
logger.Errorw("track info not published prior to track", nil, "clientId", clientId)
p.params.Logger.Errorw("track info not published prior to track", nil, "clientId", clientId)
}
return signalCid, ti
}
@@ -948,7 +951,7 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp
func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data []byte) {
dp := livekit.DataPacket{}
if err := proto.Unmarshal(data, &dp); err != nil {
logger.Warnw("could not parse data packet", err)
p.params.Logger.Warnw("could not parse data packet", err)
return
}
@@ -963,7 +966,7 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [
p.onDataPacket(p, &dp)
}
default:
logger.Warnw("received unsupported data packet", nil, "payload", payload)
p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload)
}
}
@@ -994,7 +997,7 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
}
func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnectionState) {
// logger.Debugw("ICE connection state changed", "state", state.String(),
// p.params.Logger.Debugw("ICE connection state changed", "state", state.String(),
// "participant", p.identity, "pID", p.ID())
if state == webrtc.ICEConnectionStateConnected {
stats.PromServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
@@ -1102,7 +1105,7 @@ func (p *ParticipantImpl) rtcpSendWorker() {
if len(fwdPkts) > 0 {
if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil {
logger.Errorw("could not write RTCP to participant", err,
p.params.Logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity(), "pID", p.ID())
}
}
@@ -1206,7 +1209,7 @@ func (p *ParticipantImpl) configureReceiverDTX() {
err := transceiver.SetCodecPreferences(append(modifiedReceiverCodecs, senderCodecs...))
if err != nil {
logger.Warnw("failed to SetCodecPreferences", err)
p.params.Logger.Warnw("failed to SetCodecPreferences", err)
}
}
}

View File

@@ -27,6 +27,7 @@ const (
type Room struct {
Room *livekit.Room
Logger logger.Logger
config WebRTCConfig
lock sync.RWMutex
// map of identity -> Participant
@@ -58,6 +59,7 @@ type ParticipantOptions struct {
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig) *Room {
r := &Room{
Room: proto.Clone(room).(*livekit.Room),
Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)),
config: config,
audioConfig: audioConfig,
statsReporter: stats.NewRoomStatsReporter(room.Name),
@@ -167,7 +169,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
// it's important to set this before connection, we don't want to miss out on any publishedTracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(), "pID", p.ID(),
r.Logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(), "pID", p.ID(),
"oldState", oldState)
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
@@ -190,7 +192,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
participant.OnTrackUpdated(r.onTrackUpdated)
participant.OnMetadataUpdate(r.onParticipantMetadataUpdate)
participant.OnDataPacket(r.onDataPacket)
logger.Infow("new participant joined",
r.Logger.Infow("new participant joined",
"pID", participant.ID(),
"participant", participant.Identity(),
"protocol", participant.ProtocolVersion(),
@@ -370,7 +372,7 @@ func (r *Room) CloseIfEmpty() {
func (r *Room) Close() {
r.closeOnce.Do(func() {
close(r.closed)
logger.Infow("closing room", "roomID", r.Room.Sid, "room", r.Room.Name)
r.Logger.Infow("closing room", "roomID", r.Room.Sid, "room", r.Room.Name)
r.statsReporter.RoomEnded()
if r.onClose != nil {
@@ -416,7 +418,7 @@ func (r *Room) SetMetadata(metadata string) {
err := p.SendRoomUpdate(r.Room)
if err != nil {
logger.Warnw("failed to send room update", err, "room", r.Room.Name, "participant", p.Identity())
r.Logger.Warnw("failed to send room update", err, "room", r.Room.Name, "participant", p.Identity())
}
}
@@ -465,12 +467,12 @@ func (r *Room) onTrackPublished(participant types.Participant, track types.Publi
continue
}
logger.Debugw("subscribing to new track",
r.Logger.Debugw("subscribing to new track",
"participants", []string{participant.Identity(), existingParticipant.Identity()},
"pIDs", []string{participant.ID(), existingParticipant.ID()},
"track", track.ID())
if err := track.AddSubscriber(existingParticipant); err != nil {
logger.Errorw("could not subscribe to remoteTrack", err,
r.Logger.Errorw("could not subscribe to remoteTrack", err,
"participants", []string{participant.Identity(), existingParticipant.Identity()},
"pIDs", []string{participant.ID(), existingParticipant.ID()},
"track", track.ID())
@@ -543,7 +545,7 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) {
}
if n, err := op.AddSubscriber(p); err != nil {
// TODO: log error? or disconnect?
logger.Errorw("could not subscribe to participant", err,
r.Logger.Errorw("could not subscribe to participant", err,
"participants", []string{op.Identity(), p.Identity()},
"pIDs", []string{op.ID(), p.ID()})
} else {
@@ -551,7 +553,7 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) {
}
}
if tracksAdded > 0 {
logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded)
r.Logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded)
}
}
@@ -566,7 +568,7 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
// send update only to hidden participant
err := p.SendParticipantUpdate(updates, updatedAt)
if err != nil {
logger.Errorw("could not send update to participant", err,
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
}
}
@@ -582,7 +584,7 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
err := op.SendParticipantUpdate(updates, updatedAt)
if err != nil {
logger.Errorw("could not send update to participant", err,
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
}
}

View File

@@ -36,6 +36,7 @@ type PCTransport struct {
onOffer func(offer webrtc.SessionDescription)
restartAfterGathering bool
negotiationState int
logger logger.Logger
}
type TransportParams struct {
@@ -43,6 +44,7 @@ type TransportParams struct {
Config *WebRTCConfig
Stats *stats.RoomStatsReporter
EnabledCodecs []*livekit.Codec
Logger logger.Logger
}
func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
@@ -92,6 +94,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
me: me,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: negotiationStateNone,
logger: params.Logger,
}
t.pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
if state == webrtc.ICEGathererStateComplete {
@@ -99,9 +102,9 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
t.lock.Lock()
defer t.lock.Unlock()
if t.restartAfterGathering {
logger.Debugw("restarting ICE after ICE gathering")
params.Logger.Debugw("restarting ICE after ICE gathering")
if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
logger.Warnw("could not restart ICE", err)
params.Logger.Warnw("could not restart ICE", err)
}
}
}()
@@ -151,9 +154,9 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
// only initiate when we are the offerer
if lastState == negotiationRetry && sd.Type == webrtc.SDPTypeAnswer {
logger.Debugw("re-negotiate after answering")
t.logger.Debugw("re-negotiate after answering")
if err := t.createAndSendOffer(nil); err != nil {
logger.Errorw("could not negotiate", err)
t.logger.Errorw("could not negotiate", err)
}
}
return nil
@@ -167,7 +170,7 @@ func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) {
func (t *PCTransport) Negotiate() {
t.debouncedNegotiate(func() {
if err := t.CreateAndSendOffer(nil); err != nil {
logger.Errorw("could not negotiate", err)
t.logger.Errorw("could not negotiate", err)
}
})
}
@@ -192,24 +195,24 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
// if restart is requested, and we are not ready, then continue afterwards
if iceRestart {
if t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
logger.Debugw("restart ICE after gathering")
t.logger.Debugw("restart ICE after gathering")
t.restartAfterGathering = true
return nil
}
logger.Debugw("restarting ICE")
t.logger.Debugw("restarting ICE")
}
// when there's an ongoing negotiation, let it finish and not disrupt its state
if t.negotiationState == negotiationStateClient {
currentSD := t.pc.CurrentRemoteDescription()
if iceRestart && currentSD != nil {
logger.Debugw("recovering from client negotiation state")
t.logger.Debugw("recovering from client negotiation state")
if err := t.pc.SetRemoteDescription(*currentSD); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
return err
}
} else {
logger.Debugw("skipping negotiation, trying again later")
t.logger.Debugw("skipping negotiation, trying again later")
t.negotiationState = negotiationRetry
return nil
}
@@ -221,14 +224,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
offer, err := t.pc.CreateOffer(options)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
logger.Errorw("could not create offer", err)
t.logger.Errorw("could not create offer", err)
return err
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
logger.Errorw("could not set local description", err)
t.logger.Errorw("could not set local description", err)
return err
}

View File

@@ -232,6 +232,7 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi
ThrottleConfig: r.config.RTC.PLIThrottle,
EnabledCodecs: room.Room.EnabledCodecs,
Hidden: pi.Hidden,
Logger: room.Logger,
})
if err != nil {
logger.Errorw("could not create participant", err)
@@ -368,29 +369,45 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
return
}
case *livekit.SignalRequest_AddTrack:
logger.Debugw("add track request", "participant", participant.Identity(), "pID", participant.ID(),
logger.Debugw("add track request",
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
"track", msg.AddTrack.Cid)
participant.AddTrack(msg.AddTrack)
case *livekit.SignalRequest_Answer:
sd := rtc.FromProtoSessionDescription(msg.Answer)
if err := participant.HandleAnswer(sd); err != nil {
logger.Errorw("could not handle answer", err, "participant", participant.Identity(), "pID", participant.ID())
logger.Errorw("could not handle answer", err,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
)
}
case *livekit.SignalRequest_Trickle:
candidateInit, err := rtc.FromProtoTrickle(msg.Trickle)
if err != nil {
logger.Errorw("could not decode trickle", err, "participant", participant.Identity(), "pID", participant.ID())
logger.Errorw("could not decode trickle", err,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
)
break
}
// logger.Debugw("adding peer candidate", "participant", participant.Identity())
if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil {
logger.Errorw("could not handle trickle", err, "participant", participant.Identity(), "pID", participant.ID())
logger.Errorw("could not handle trickle", err,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
)
}
case *livekit.SignalRequest_Mute:
participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted, false)
case *livekit.SignalRequest_Subscription:
if err := room.UpdateSubscriptions(participant, msg.Subscription.TrackSids, msg.Subscription.Subscribe); err != nil {
logger.Warnw("could not update subscription", err,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
"tracks", msg.Subscription.TrackSids,
@@ -400,8 +417,8 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
for _, sid := range msg.TrackSetting.TrackSids {
subTrack := participant.GetSubscribedTrack(sid)
if subTrack == nil {
logger.Warnw("unable to find SubscribedTrack",
nil,
logger.Warnw("unable to find SubscribedTrack", nil,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
"track", sid)
@@ -411,8 +428,8 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
// find the source PublishedTrack
publisher := room.GetParticipant(subTrack.PublisherIdentity())
if publisher == nil {
logger.Warnw("unable to find publisher of SubscribedTrack",
nil,
logger.Warnw("unable to find publisher of SubscribedTrack", nil,
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
"publisher", subTrack.PublisherIdentity(),
@@ -422,8 +439,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
pubTrack := publisher.GetPublishedTrack(sid)
if pubTrack == nil {
logger.Warnw("unable to find PublishedTrack",
nil,
logger.Warnw("unable to find PublishedTrack", nil,
"participant", publisher.Identity(),
"pID", publisher.ID(),
"track", sid)
@@ -435,6 +451,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
// find quality for published track
logger.Debugw("updating track settings",
"room", room.Room.Name,
"participant", participant.Identity(),
"pID", participant.ID(),
"settings", msg.TrackSetting)