mirror of
https://github.com/livekit/livekit.git
synced 2026-04-26 13:07:39 +00:00
fixed deadlock when client disconnecting
This commit is contained in:
@@ -106,10 +106,13 @@ func (r *Room) RemoveParticipant(id string) {
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if p, ok := r.participants[id]; ok {
|
||||
// also stop connection if needed
|
||||
p.Close()
|
||||
// update clients
|
||||
r.broadcastParticipantState(p)
|
||||
// avoid blocking lock
|
||||
go func() {
|
||||
// also stop connection if needed
|
||||
p.Close()
|
||||
// update clients
|
||||
r.broadcastParticipantState(p)
|
||||
}()
|
||||
}
|
||||
|
||||
delete(r.participants, id)
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/pion/webrtc/v3/pkg/rtcerr"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/logger"
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
@@ -72,7 +73,7 @@ func (t *Track) AddSubscriber(participant *Participant) error {
|
||||
}
|
||||
|
||||
// pack ID to identify all tracks
|
||||
packedId := PackTrackId(t.participantId, t.mediaTrack.ID())
|
||||
packedId := PackTrackId(t.participantId, t.id)
|
||||
|
||||
// use existing SSRC with simple forwarders. adaptive forwarders require unique SSRC per layer
|
||||
outTrack, err := participant.peerConn.NewTrack(codecs[0].PayloadType, t.mediaTrack.SSRC(), packedId, t.mediaTrack.Label())
|
||||
@@ -95,9 +96,11 @@ func (t *Track) AddSubscriber(participant *Participant) error {
|
||||
return
|
||||
}
|
||||
if err := participant.peerConn.RemoveTrack(rtpSender); err != nil {
|
||||
logger.GetLogger().Warnw("could not remove mediaTrack from forwarder",
|
||||
"participant", participant.ID(),
|
||||
"err", err)
|
||||
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
|
||||
logger.GetLogger().Warnw("could not remove mediaTrack from forwarder",
|
||||
"participant", participant.ID(),
|
||||
"err", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@@ -16,12 +18,14 @@ type SignalConnection interface {
|
||||
|
||||
type WSSignalConnection struct {
|
||||
conn *websocket.Conn
|
||||
mu sync.Mutex
|
||||
useJSON bool
|
||||
}
|
||||
|
||||
func NewWSSignalConnection(conn *websocket.Conn) *WSSignalConnection {
|
||||
return &WSSignalConnection{
|
||||
conn: conn,
|
||||
mu: sync.Mutex{},
|
||||
useJSON: true,
|
||||
}
|
||||
}
|
||||
@@ -71,5 +75,7 @@ func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error {
|
||||
return err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.conn.WriteMessage(msgType, payload)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -95,11 +96,16 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
//ctx := context.Background()
|
||||
for {
|
||||
req, err := signalConn.ReadRequest()
|
||||
if err == io.EOF {
|
||||
// client disconnected from websocket
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.GetLogger().Errorw("error reading WS",
|
||||
"err", err,
|
||||
"participantName", pName,
|
||||
"roomId", roomId)
|
||||
// most of these are disconnection, just return vs clogging up logs
|
||||
//logger.GetLogger().Errorw("error reading WS",
|
||||
// "err", err,
|
||||
// "participantName", pName,
|
||||
// "roomId", roomId)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user