From 281d2b674b12dde2c9a8a34aaf7efc0397f2b759 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 20 Sep 2023 14:13:48 +0530 Subject: [PATCH 1/2] Changing some log levels (#2094) Logging expected WS close at Infow to understand reasons for closure. Moving "read from ws" to Debugw as it happens when signalling closes. Also filter out a data channel abort chunk log as it shows a bunch of errors, but those are expected though. --- pkg/rtc/participant.go | 14 ++++++-------- pkg/rtc/room.go | 1 - pkg/rtc/transport.go | 2 +- pkg/service/rtcservice.go | 16 ++++++++-------- 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 07ae3036b..30dfea7d0 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -561,10 +561,9 @@ func (p *ParticipantImpl) HandleSignalSourceClose() { p.TransportManager.SetSignalSourceValid(false) if !p.TransportManager.HasPublisherEverConnected() && !p.TransportManager.HasSubscriberEverConnected() { - p.params.Logger.Infow("closing disconnected participant", - "reason", types.ParticipantCloseReasonJoinFailed, - ) - _ = p.Close(false, types.ParticipantCloseReasonJoinFailed, false) + reason := types.ParticipantCloseReasonJoinFailed + p.params.Logger.Infow("closing disconnected participant", "reason", reason) + _ = p.Close(false, reason, false) } } @@ -1404,10 +1403,9 @@ func (p *ParticipantImpl) setupDisconnectTimer() { if p.IsClosed() || p.IsDisconnected() { return } - p.params.Logger.Infow("closing disconnected participant", - "reason", types.ParticipantCloseReasonPeerConnectionDisconnected, - ) - _ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected, false) + reason := types.ParticipantCloseReasonPeerConnectionDisconnected + p.params.Logger.Infow("closing disconnected participant", "reason", reason) + _ = p.Close(true, reason, false) }) p.lock.Unlock() } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index cbc1cf54c..7d28fd0be 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -426,7 +426,6 @@ func (r *Room) ReplaceParticipantRequestSource(identity livekit.ParticipantIdent rs.Close() } r.participantRequestSources[identity] = reqSource - r.lock.Unlock() } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 33ba7ade2..517c90c1f 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -832,7 +832,7 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni } dcErrorHandler := func(err error) { - if !errors.Is(err, sctp.ErrResetPacketInStateNotExist) { + if !errors.Is(err, sctp.ErrResetPacketInStateNotExist) && !errors.Is(err, sctp.ErrChunk) { t.params.Logger.Errorw(dc.Label()+" data channel error", err) } } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ab49ba24a..dc203fec9 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -257,7 +257,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) done := make(chan struct{}) - // function exits when websocket terminates, it'll close the event reading off of response sink as well + // function exits when websocket terminates, it'll close the event reading off of request sink and response source as well defer func() { pLogger.Infow("finishing WS connection", "connID", cr.ConnectionID) cr.ResponseSource.Close() @@ -288,13 +288,13 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // websocket established sigConn := NewWSSignalConnection(conn) - if count, err := sigConn.WriteResponse(initialResponse); err != nil { + count, err := sigConn.WriteResponse(initialResponse) + if err != nil { pLogger.Warnw("could not write initial response", err) return - } else { - if signalStats != nil { - signalStats.AddBytes(uint64(count), true) - } + } + if signalStats != nil { + signalStats.AddBytes(uint64(count), true) } pLogger.Infow("new client WS connected", "connID", cr.ConnectionID, @@ -321,7 +321,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return case msg := <-cr.ResponseSource.ReadChan(): if msg == nil { - pLogger.Infow("nothing to read from response source", "connID", cr.ConnectionID) + pLogger.Debugw("nothing to read from response source", "connID", cr.ConnectionID) return } res, ok := msg.(*livekit.SignalResponse) @@ -372,7 +372,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { websocket.CloseNormalClosure, websocket.CloseNoStatusReceived, ) { - pLogger.Debugw("exit ws read loop for closed connection", "connID", cr.ConnectionID, "wsError", err) + pLogger.Infow("exit ws read loop for closed connection", "connID", cr.ConnectionID, "wsError", err) } else { pLogger.Errorw("error reading from websocket", err, "connID", cr.ConnectionID) } From 1200a960a283817c704c30afba674a64f7951017 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 20 Sep 2023 17:09:24 +0800 Subject: [PATCH 2/2] Use generic type cast for IDs (#2095) --- go.mod | 2 +- go.sum | 2 ++ pkg/rtc/room.go | 4 ++-- pkg/rtc/signalhandler.go | 4 ++-- pkg/service/redisstore.go | 2 +- pkg/service/roommanager.go | 2 +- pkg/service/roomservice.go | 2 +- 7 files changed, 10 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 8e426f067..bb0795cb2 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5 - github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84 + github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 2c28c4de1..757166a60 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5 h1:CjXY github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84 h1:4WOaspDesbbrjGPsu6Vp1VjcWxTXVjyjdtQAzIUXn5s= github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2 h1:yIRqvyO3qDPO+4EdcHMjsINJYl6KE9AXJUgfChsw+0s= +github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 7d28fd0be..544713158 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -549,7 +549,7 @@ func (r *Room) UpdateSubscriptions( } for _, pt := range participantTracks { - for _, trackID := range livekit.StringsAsTrackIDs(pt.TrackSids) { + for _, trackID := range livekit.StringsAsIDs[livekit.TrackID](pt.TrackSids) { if subscribe { participant.SubscribeToTrack(trackID) } else { @@ -600,7 +600,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync r.UpdateSubscriptions( participant, - livekit.StringsAsTrackIDs(state.Subscription.TrackSids), + livekit.StringsAsIDs[livekit.TrackID](state.Subscription.TrackSids), state.Subscription.ParticipantTracks, state.Subscription.Subscribe, ) diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 3c90209b6..6f8ba407e 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -46,12 +46,12 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant // permission check happens later in SubscriptionManager room.UpdateSubscriptions( participant, - livekit.StringsAsTrackIDs(msg.Subscription.TrackSids), + livekit.StringsAsIDs[livekit.TrackID](msg.Subscription.TrackSids), msg.Subscription.ParticipantTracks, msg.Subscription.Subscribe, ) case *livekit.SignalRequest_TrackSetting: - for _, sid := range livekit.StringsAsTrackIDs(msg.TrackSetting.TrackSids) { + for _, sid := range livekit.StringsAsIDs[livekit.TrackID](msg.TrackSetting.TrackSids) { participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting) } case *livekit.SignalRequest_Leave: diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index df119a5e5..71fe77a9d 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -194,7 +194,7 @@ func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) return nil, errors.Wrap(err, "could not get rooms") } } else { - names := livekit.RoomNamesAsStrings(roomNames) + names := livekit.IDsAsStrings(roomNames) var results []interface{} results, err = s.rc.HMGet(s.ctx, RoomsKey, names...).Result() if err != nil && err != redis.Nil { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 2d7bdda81..b436c7bca 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -674,7 +674,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo pLogger.Debugw("updating participant subscriptions") room.UpdateSubscriptions( participant, - livekit.StringsAsTrackIDs(rm.UpdateSubscriptions.TrackSids), + livekit.StringsAsIDs[livekit.TrackID](rm.UpdateSubscriptions.TrackSids), rm.UpdateSubscriptions.ParticipantTracks, rm.UpdateSubscriptions.Subscribe, ) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 3e8997587..c81274093 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -122,7 +122,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque var names []livekit.RoomName if len(req.Names) > 0 { - names = livekit.StringsAsRoomNames(req.Names) + names = livekit.StringsAsIDs[livekit.RoomName](req.Names) } rooms, err := s.roomStore.ListRooms(ctx, names) if err != nil {