diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index bb6aa63f2..6796c9a99 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -106,10 +106,13 @@ func (r *Room) RemoveParticipant(id string) { defer r.lock.Unlock() if p, ok := r.participants[id]; ok { - // also stop connection if needed - p.Close() - // update clients - r.broadcastParticipantState(p) + // avoid blocking lock + go func() { + // also stop connection if needed + p.Close() + // update clients + r.broadcastParticipantState(p) + }() } delete(r.participants, id) diff --git a/pkg/rtc/track.go b/pkg/rtc/track.go index 8c67f6a01..ef5b0f0d6 100644 --- a/pkg/rtc/track.go +++ b/pkg/rtc/track.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/rtcerr" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/utils" @@ -72,7 +73,7 @@ func (t *Track) AddSubscriber(participant *Participant) error { } // pack ID to identify all tracks - packedId := PackTrackId(t.participantId, t.mediaTrack.ID()) + packedId := PackTrackId(t.participantId, t.id) // use existing SSRC with simple forwarders. adaptive forwarders require unique SSRC per layer outTrack, err := participant.peerConn.NewTrack(codecs[0].PayloadType, t.mediaTrack.SSRC(), packedId, t.mediaTrack.Label()) @@ -95,9 +96,11 @@ func (t *Track) AddSubscriber(participant *Participant) error { return } if err := participant.peerConn.RemoveTrack(rtpSender); err != nil { - logger.GetLogger().Warnw("could not remove mediaTrack from forwarder", - "participant", participant.ID(), - "err", err) + if _, ok := err.(*rtcerr.InvalidStateError); !ok { + logger.GetLogger().Warnw("could not remove mediaTrack from forwarder", + "participant", participant.ID(), + "err", err) + } } }) diff --git a/pkg/rtc/wsprotocol.go b/pkg/rtc/wsprotocol.go index d6ff20251..d01843a28 100644 --- a/pkg/rtc/wsprotocol.go +++ b/pkg/rtc/wsprotocol.go @@ -1,6 +1,8 @@ package rtc import ( + "sync" + "github.com/gorilla/websocket" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -16,12 +18,14 @@ type SignalConnection interface { type WSSignalConnection struct { conn *websocket.Conn + mu sync.Mutex useJSON bool } func NewWSSignalConnection(conn *websocket.Conn) *WSSignalConnection { return &WSSignalConnection{ conn: conn, + mu: sync.Mutex{}, useJSON: true, } } @@ -71,5 +75,7 @@ func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error { return err } + c.mu.Lock() + defer c.mu.Unlock() return c.conn.WriteMessage(msgType, payload) } diff --git a/pkg/service/rtc.go b/pkg/service/rtc.go index 3569c5926..c2857ca30 100644 --- a/pkg/service/rtc.go +++ b/pkg/service/rtc.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "io" "net/http" "github.com/gorilla/websocket" @@ -95,11 +96,16 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { //ctx := context.Background() for { req, err := signalConn.ReadRequest() + if err == io.EOF { + // client disconnected from websocket + return + } if err != nil { - logger.GetLogger().Errorw("error reading WS", - "err", err, - "participantName", pName, - "roomId", roomId) + // most of these are disconnection, just return vs clogging up logs + //logger.GetLogger().Errorw("error reading WS", + // "err", err, + // "participantName", pName, + // "roomId", roomId) return }