Better tracking of signalling connection. (#1794)

* Better tracking of signalling connection.

- Reason for closing signaling channel.
- ConnectionID attached to request source/response sink

* Tests
This commit is contained in:
Raja Subramanian
2023-06-15 12:53:34 +05:30
committed by GitHub
parent 41bb1d8745
commit 12db469297
18 changed files with 275 additions and 49 deletions
+2
View File
@@ -23,6 +23,7 @@ type MessageSink interface {
WriteMessage(msg proto.Message) error
IsClosed() bool
Close()
ConnectionID() livekit.ConnectionID
}
//counterfeiter:generate . MessageSource
@@ -31,6 +32,7 @@ type MessageSource interface {
ReadChan() <-chan proto.Message
IsClosed() bool
Close()
ConnectionID() livekit.ConnectionID
}
type ParticipantInit struct {
+5 -5
View File
@@ -38,7 +38,7 @@ func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRout
signalClient: signalClient,
requestChannels: make(map[string]*MessageChannel),
responseChannels: make(map[string]*MessageChannel),
rtcMessageChan: NewMessageChannel(localRTCChannelSize),
rtcMessageChan: NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize),
}
}
@@ -93,7 +93,7 @@ func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, room
logger.Errorw("could not handle new participant", err,
"room", roomName,
"participant", pi.Identity,
"connectionID", connectionID,
"connID", connectionID,
)
}
return
@@ -103,7 +103,7 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro
r.lock.Lock()
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize)
}
r.lock.Unlock()
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
@@ -121,7 +121,7 @@ func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTC
r.lock.Lock()
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize)
}
r.lock.Unlock()
return r.writeRTCMessage(r.rtcMessageChan, msg)
@@ -254,7 +254,7 @@ func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChanne
return mc
}
mc = NewMessageChannel(DefaultMessageChannelSize)
mc = NewMessageChannel(livekit.ConnectionID(key), DefaultMessageChannelSize)
mc.OnClose(func() {
r.lock.Lock()
delete(target, key)
+14 -7
View File
@@ -3,24 +3,27 @@ package routing
import (
"sync"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/proto"
)
const DefaultMessageChannelSize = 200
type MessageChannel struct {
msgChan chan proto.Message
onClose func()
isClosed bool
lock sync.RWMutex
connectionID livekit.ConnectionID
msgChan chan proto.Message
onClose func()
isClosed bool
lock sync.RWMutex
}
func NewDefaultMessageChannel() *MessageChannel {
return NewMessageChannel(DefaultMessageChannelSize)
func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel {
return NewMessageChannel(connectionID, DefaultMessageChannelSize)
}
func NewMessageChannel(size int) *MessageChannel {
func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel {
return &MessageChannel{
connectionID: connectionID,
// allow some buffer to avoid blocked writes
msgChan: make(chan proto.Message, size),
}
@@ -71,3 +74,7 @@ func (m *MessageChannel) Close() {
m.onClose()
}
}
func (m *MessageChannel) ConnectionID() livekit.ConnectionID {
return m.connectionID
}
+1 -1
View File
@@ -11,7 +11,7 @@ import (
func TestMessageChannel_WriteMessageClosed(t *testing.T) {
// ensure it doesn't panic when written to after closing
m := routing.NewMessageChannel(routing.DefaultMessageChannelSize)
m := routing.NewMessageChannel(livekit.ConnectionID("test"), routing.DefaultMessageChannelSize)
go func() {
for msg := range m.ReadChan() {
if msg == nil {
+19 -1
View File
@@ -98,16 +98,24 @@ func publishSignalMessage(rc redis.UniversalClient, nodeID livekit.NodeID, conne
type RTCNodeSink struct {
rc redis.UniversalClient
nodeID livekit.NodeID
connectionID livekit.ConnectionID
participantKey livekit.ParticipantKey
participantKeyB62 livekit.ParticipantKey
isClosed atomic.Bool
onClose func()
}
func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) *RTCNodeSink {
func NewRTCNodeSink(
rc redis.UniversalClient,
nodeID livekit.NodeID,
connectionID livekit.ConnectionID,
participantKey livekit.ParticipantKey,
participantKeyB62 livekit.ParticipantKey,
) *RTCNodeSink {
return &RTCNodeSink{
rc: rc,
nodeID: nodeID,
connectionID: connectionID,
participantKey: participantKey,
participantKeyB62: participantKeyB62,
}
@@ -137,6 +145,12 @@ func (s *RTCNodeSink) OnClose(f func()) {
s.onClose = f
}
func (s *RTCNodeSink) ConnectionID() livekit.ConnectionID {
return s.connectionID
}
// ----------------------------------------------------------------------
type SignalNodeSink struct {
rc redis.UniversalClient
nodeID livekit.NodeID
@@ -177,3 +191,7 @@ func (s *SignalNodeSink) IsClosed() bool {
func (s *SignalNodeSink) OnClose(f func()) {
s.onClose = f
}
func (s *SignalNodeSink) ConnectionID() livekit.ConnectionID {
return s.connectionID
}
+3 -3
View File
@@ -174,7 +174,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
// set up response channel before sending StartSession and be ready to receive responses.
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID))
sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), pKey, pKeyB62)
sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), connectionID, pKey, pKeyB62)
// serialize claims
ss, err := pi.ToStartSession(roomName, connectionID)
@@ -199,7 +199,7 @@ func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro
return err
}
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), pkey, pkeyB62)
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), livekit.ConnectionID("ephemeral"), pkey, pkeyB62)
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity))
return r.writeRTCMessage(rtcSink, msg)
@@ -216,7 +216,7 @@ func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam
}
func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error {
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNodeID), livekit.ParticipantKey(msg.ParticipantKey), livekit.ParticipantKey(msg.ParticipantKeyB62))
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNodeID), livekit.ConnectionID("ephemeral"), livekit.ParticipantKey(msg.ParticipantKey), livekit.ParticipantKey(msg.ParticipantKeyB62))
return r.writeRTCMessage(rtcSink, msg)
}
@@ -5,6 +5,7 @@ import (
"sync"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/reflect/protoreflect"
)
@@ -13,6 +14,16 @@ type FakeMessageSink struct {
closeMutex sync.RWMutex
closeArgsForCall []struct {
}
ConnectionIDStub func() livekit.ConnectionID
connectionIDMutex sync.RWMutex
connectionIDArgsForCall []struct {
}
connectionIDReturns struct {
result1 livekit.ConnectionID
}
connectionIDReturnsOnCall map[int]struct {
result1 livekit.ConnectionID
}
IsClosedStub func() bool
isClosedMutex sync.RWMutex
isClosedArgsForCall []struct {
@@ -62,6 +73,59 @@ func (fake *FakeMessageSink) CloseCalls(stub func()) {
fake.CloseStub = stub
}
func (fake *FakeMessageSink) ConnectionID() livekit.ConnectionID {
fake.connectionIDMutex.Lock()
ret, specificReturn := fake.connectionIDReturnsOnCall[len(fake.connectionIDArgsForCall)]
fake.connectionIDArgsForCall = append(fake.connectionIDArgsForCall, struct {
}{})
stub := fake.ConnectionIDStub
fakeReturns := fake.connectionIDReturns
fake.recordInvocation("ConnectionID", []interface{}{})
fake.connectionIDMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMessageSink) ConnectionIDCallCount() int {
fake.connectionIDMutex.RLock()
defer fake.connectionIDMutex.RUnlock()
return len(fake.connectionIDArgsForCall)
}
func (fake *FakeMessageSink) ConnectionIDCalls(stub func() livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = stub
}
func (fake *FakeMessageSink) ConnectionIDReturns(result1 livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = nil
fake.connectionIDReturns = struct {
result1 livekit.ConnectionID
}{result1}
}
func (fake *FakeMessageSink) ConnectionIDReturnsOnCall(i int, result1 livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = nil
if fake.connectionIDReturnsOnCall == nil {
fake.connectionIDReturnsOnCall = make(map[int]struct {
result1 livekit.ConnectionID
})
}
fake.connectionIDReturnsOnCall[i] = struct {
result1 livekit.ConnectionID
}{result1}
}
func (fake *FakeMessageSink) IsClosed() bool {
fake.isClosedMutex.Lock()
ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)]
@@ -181,6 +245,8 @@ func (fake *FakeMessageSink) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.connectionIDMutex.RLock()
defer fake.connectionIDMutex.RUnlock()
fake.isClosedMutex.RLock()
defer fake.isClosedMutex.RUnlock()
fake.writeMessageMutex.RLock()
@@ -5,6 +5,7 @@ import (
"sync"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/reflect/protoreflect"
)
@@ -13,6 +14,16 @@ type FakeMessageSource struct {
closeMutex sync.RWMutex
closeArgsForCall []struct {
}
ConnectionIDStub func() livekit.ConnectionID
connectionIDMutex sync.RWMutex
connectionIDArgsForCall []struct {
}
connectionIDReturns struct {
result1 livekit.ConnectionID
}
connectionIDReturnsOnCall map[int]struct {
result1 livekit.ConnectionID
}
IsClosedStub func() bool
isClosedMutex sync.RWMutex
isClosedArgsForCall []struct {
@@ -61,6 +72,59 @@ func (fake *FakeMessageSource) CloseCalls(stub func()) {
fake.CloseStub = stub
}
func (fake *FakeMessageSource) ConnectionID() livekit.ConnectionID {
fake.connectionIDMutex.Lock()
ret, specificReturn := fake.connectionIDReturnsOnCall[len(fake.connectionIDArgsForCall)]
fake.connectionIDArgsForCall = append(fake.connectionIDArgsForCall, struct {
}{})
stub := fake.ConnectionIDStub
fakeReturns := fake.connectionIDReturns
fake.recordInvocation("ConnectionID", []interface{}{})
fake.connectionIDMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMessageSource) ConnectionIDCallCount() int {
fake.connectionIDMutex.RLock()
defer fake.connectionIDMutex.RUnlock()
return len(fake.connectionIDArgsForCall)
}
func (fake *FakeMessageSource) ConnectionIDCalls(stub func() livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = stub
}
func (fake *FakeMessageSource) ConnectionIDReturns(result1 livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = nil
fake.connectionIDReturns = struct {
result1 livekit.ConnectionID
}{result1}
}
func (fake *FakeMessageSource) ConnectionIDReturnsOnCall(i int, result1 livekit.ConnectionID) {
fake.connectionIDMutex.Lock()
defer fake.connectionIDMutex.Unlock()
fake.ConnectionIDStub = nil
if fake.connectionIDReturnsOnCall == nil {
fake.connectionIDReturnsOnCall = make(map[int]struct {
result1 livekit.ConnectionID
})
}
fake.connectionIDReturnsOnCall[i] = struct {
result1 livekit.ConnectionID
}{result1}
}
func (fake *FakeMessageSource) IsClosed() bool {
fake.isClosedMutex.Lock()
ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)]
@@ -172,6 +236,8 @@ func (fake *FakeMessageSource) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.connectionIDMutex.RLock()
defer fake.connectionIDMutex.RUnlock()
fake.isClosedMutex.RLock()
defer fake.isClosedMutex.RUnlock()
fake.readChanMutex.RLock()
+7 -1
View File
@@ -105,8 +105,9 @@ func (r *signalClient) StartParticipantSignal(
Writer: signalRequestMessageWriter{},
CloseOnFailure: true,
BlockOnClose: true,
ConnectionID: connectionID,
})
resChan := NewDefaultMessageChannel()
resChan := NewDefaultMessageChannel(connectionID)
go func() {
r.active.Inc()
@@ -230,6 +231,7 @@ type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct {
Writer SignalMessageWriter[SendType]
CloseOnFailure bool
BlockOnClose bool
ConnectionID livekit.ConnectionID
}
func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink {
@@ -348,3 +350,7 @@ func (s *signalMessageSink[SendType, RecvType]) WriteMessage(msg proto.Message)
}
return nil
}
func (s *signalMessageSink[SendType, RecvType]) ConnectionID() livekit.ConnectionID {
return s.SignalSinkParams.ConnectionID
}
+15 -5
View File
@@ -688,7 +688,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.updateState(livekit.ParticipantInfo_DISCONNECTED)
// ensure this is synchronized
p.CloseSignalConnection()
p.CloseSignalConnection(types.SignallingCloseReasonParticipantClose)
p.lock.RLock()
onClose := p.onClose
p.lock.RUnlock()
@@ -741,7 +741,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
onStart()
}
p.CloseSignalConnection()
p.CloseSignalConnection(types.SignallingCloseReasonMigration)
//
// On subscriber peer connection, remote side will try ICE on both
@@ -1349,7 +1349,7 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
func (p *ParticipantImpl) onAnyTransportFailed() {
// clients support resuming of connections when websocket becomes disconnected
p.CloseSignalConnection()
p.CloseSignalConnection(types.SignallingCloseReasonTransportFailure)
// detect when participant has actually left.
p.setupDisconnectTimer()
@@ -2068,7 +2068,17 @@ func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason
},
},
})
p.CloseSignalConnection()
scr := types.SignallingCloseReasonUnknown
switch reason {
case types.ParticipantCloseReasonPublicationError:
scr = types.SignallingCloseReasonFullReconnectPublicationError
case types.ParticipantCloseReasonSubscriptionError:
scr = types.SignallingCloseReasonFullReconnectSubscriptionError
case types.ParticipantCloseReasonNegotiateFailed:
scr = types.SignallingCloseReasonFullReconnectNegotiateFailed
}
p.CloseSignalConnection(scr)
// on a full reconnect, no need to supervise this participant anymore
p.supervisor.Stop()
@@ -2101,7 +2111,7 @@ func (p *ParticipantImpl) onSubscriptionError(trackID livekit.TrackID, fatal boo
if p.params.ReconnectOnSubscriptionError && fatal {
p.params.Logger.Infow("issuing full reconnect on subscription error", "trackID", trackID)
p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError)
p.IssueFullReconnect(types.ParticipantCloseReasonSubscriptionError)
}
}
+1 -1
View File
@@ -229,7 +229,7 @@ func TestOutOfOrderUpdates(t *testing.T) {
func TestDisconnectTiming(t *testing.T) {
t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) {
p := newParticipantForTest("test")
msg := routing.NewMessageChannel(routing.DefaultMessageChannelSize)
msg := routing.NewMessageChannel(livekit.ConnectionID("test"), routing.DefaultMessageChannelSize)
p.params.Sink = msg
go func() {
for msg := range msg.ReadChan() {
+3 -2
View File
@@ -11,6 +11,7 @@ import (
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
func (p *ParticipantImpl) getResponseSink() routing.MessageSink {
@@ -280,10 +281,10 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
}
// closes signal connection to notify client to resume/reconnect
func (p *ParticipantImpl) CloseSignalConnection() {
func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) {
sink := p.getResponseSink()
if sink != nil {
p.params.Logger.Infow("closing signal connection")
p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
sink.Close()
p.SetResponseSink(nil)
}
+1 -1
View File
@@ -400,7 +400,7 @@ func (r *Room) GetParticipantRequestSource(identity livekit.ParticipantIdentity)
func (r *Room) ResumeParticipant(p types.LocalParticipant, requestSource routing.MessageSource, responseSink routing.MessageSink, iceServers []*livekit.ICEServer, reason livekit.ReconnectReason) error {
r.ReplaceParticipantRequestSource(p.Identity(), requestSource)
// close previous sink, and link to new one
p.CloseSignalConnection()
p.CloseSignalConnection(types.SignallingCloseReasonResume)
p.SetResponseSink(responseSink)
p.SetSignalSourceValid(true)
+43 -2
View File
@@ -86,6 +86,7 @@ const (
ParticipantCloseReasonMigrationRequested
ParticipantCloseReasonOvercommitted
ParticipantCloseReasonPublicationError
ParticipantCloseReasonSubscriptionError
)
func (p ParticipantCloseReason) String() string {
@@ -130,6 +131,8 @@ func (p ParticipantCloseReason) String() string {
return "OVERCOMMITTED"
case ParticipantCloseReasonPublicationError:
return "PUBLICATION_ERROR"
case ParticipantCloseReasonSubscriptionError:
return "SUBSCRIPTION_ERROR"
default:
return fmt.Sprintf("%d", int(p))
}
@@ -160,7 +163,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonOvercommitted:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError:
case ParticipantCloseReasonNegotiateFailed, ParticipantCloseReasonPublicationError, ParticipantCloseReasonSubscriptionError:
return livekit.DisconnectReason_STATE_MISMATCH
default:
// the other types will map to unknown reason
@@ -170,6 +173,44 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
// ---------------------------------------------
type SignallingCloseReason int
const (
SignallingCloseReasonUnknown SignallingCloseReason = iota
SignallingCloseReasonMigration
SignallingCloseReasonResume
SignallingCloseReasonTransportFailure
SignallingCloseReasonFullReconnectPublicationError
SignallingCloseReasonFullReconnectSubscriptionError
SignallingCloseReasonFullReconnectNegotiateFailed
SignallingCloseReasonParticipantClose
)
func (s SignallingCloseReason) String() string {
switch s {
case SignallingCloseReasonUnknown:
return "UNKNOWN"
case SignallingCloseReasonMigration:
return "MIGRATION"
case SignallingCloseReasonResume:
return "RESUME"
case SignallingCloseReasonTransportFailure:
return "TRANSPORT_FAILURE"
case SignallingCloseReasonFullReconnectPublicationError:
return "FULL_RECONNECT_PUBLICATION_ERROR"
case SignallingCloseReasonFullReconnectSubscriptionError:
return "FULL_RECONNECT_SUBSCRIPTION_ERROR"
case SignallingCloseReasonFullReconnectNegotiateFailed:
return "FULL_RECONNECT_NEGOTIATE_FAILED"
case SignallingCloseReasonParticipantClose:
return "PARTICIPANT_CLOSE"
default:
return fmt.Sprintf("%d", int(s))
}
}
// ---------------------------------------------
//counterfeiter:generate . Participant
type Participant interface {
ID() livekit.ParticipantID
@@ -249,7 +290,7 @@ type LocalParticipant interface {
GetBufferFactory() *buffer.Factory
SetResponseSink(sink routing.MessageSink)
CloseSignalConnection()
CloseSignalConnection(reason SignallingCloseReason)
UpdateLastSeenSignal()
SetSignalSourceValid(valid bool)
@@ -131,9 +131,10 @@ type FakeLocalParticipant struct {
closeReturnsOnCall map[int]struct {
result1 error
}
CloseSignalConnectionStub func()
CloseSignalConnectionStub func(types.SignallingCloseReason)
closeSignalConnectionMutex sync.RWMutex
closeSignalConnectionArgsForCall []struct {
arg1 types.SignallingCloseReason
}
ConnectedAtStub func() time.Time
connectedAtMutex sync.RWMutex
@@ -1429,15 +1430,16 @@ func (fake *FakeLocalParticipant) CloseReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeLocalParticipant) CloseSignalConnection() {
func (fake *FakeLocalParticipant) CloseSignalConnection(arg1 types.SignallingCloseReason) {
fake.closeSignalConnectionMutex.Lock()
fake.closeSignalConnectionArgsForCall = append(fake.closeSignalConnectionArgsForCall, struct {
}{})
arg1 types.SignallingCloseReason
}{arg1})
stub := fake.CloseSignalConnectionStub
fake.recordInvocation("CloseSignalConnection", []interface{}{})
fake.recordInvocation("CloseSignalConnection", []interface{}{arg1})
fake.closeSignalConnectionMutex.Unlock()
if stub != nil {
fake.CloseSignalConnectionStub()
fake.CloseSignalConnectionStub(arg1)
}
}
@@ -1447,12 +1449,19 @@ func (fake *FakeLocalParticipant) CloseSignalConnectionCallCount() int {
return len(fake.closeSignalConnectionArgsForCall)
}
func (fake *FakeLocalParticipant) CloseSignalConnectionCalls(stub func()) {
func (fake *FakeLocalParticipant) CloseSignalConnectionCalls(stub func(types.SignallingCloseReason)) {
fake.closeSignalConnectionMutex.Lock()
defer fake.closeSignalConnectionMutex.Unlock()
fake.CloseSignalConnectionStub = stub
}
func (fake *FakeLocalParticipant) CloseSignalConnectionArgsForCall(i int) types.SignallingCloseReason {
fake.closeSignalConnectionMutex.RLock()
defer fake.closeSignalConnectionMutex.RUnlock()
argsForCall := fake.closeSignalConnectionArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ConnectedAt() time.Time {
fake.connectedAtMutex.Lock()
ret, specificReturn := fake.connectedAtReturnsOnCall[len(fake.connectedAtArgsForCall)]
+2 -2
View File
@@ -485,7 +485,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
false,
)
defer func() {
pLogger.Debugw("RTC session finishing")
pLogger.Debugw("RTC session finishing", "connID", requestSource.ConnectionID())
requestSource.Close()
}()
@@ -511,7 +511,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
case <-tokenTicker.C:
// refresh token with the first API Key/secret pair
if err := r.refreshToken(participant); err != nil {
pLogger.Errorw("could not refresh token", err)
pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID())
}
case obj := <-requestSource.ReadChan():
// In single node mode, the request source is directly tied to the signal message channel
+5 -6
View File
@@ -285,6 +285,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
case msg := <-cr.ResponseSource.ReadChan():
if msg == nil {
pLogger.Infow("nothing to read from response source", "connID", cr.ConnectionID)
return
}
res, ok := msg.(*livekit.SignalResponse)
@@ -323,16 +324,15 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// handle incoming requests from websocket
for {
req, count, err := sigConn.ReadRequest()
// normal closure
if err != nil {
// normal/expected closure
if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
pLogger.Debugw("exit ws read loop for closed connection", "connID", cr.ConnectionID)
return
pLogger.Infow("exit ws read loop for closed connection", "connID", cr.ConnectionID, "wsError", err)
} else {
pLogger.Errorw("error reading from websocket", err)
return
}
return
}
if signalStats != nil {
signalStats.AddBytes(uint64(count), false)
@@ -374,8 +374,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if err := cr.RequestSink.WriteMessage(req); err != nil {
pLogger.Warnw("error writing to request sink", err,
"connID", cr.ConnectionID)
pLogger.Warnw("error writing to request sink", err, "connID", cr.ConnectionID)
}
}
}
+7 -6
View File
@@ -136,17 +136,18 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
l := logger.GetLogger().WithValues(
"room", ss.RoomName,
"participant", ss.Identity,
"connectionID", ss.ConnectionId,
"connID", ss.ConnectionId,
)
reqChan := routing.NewDefaultMessageChannel()
reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId))
defer reqChan.Close()
sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{
Logger: l,
Stream: stream,
Config: r.config,
Writer: signalResponseMessageWriter{},
Logger: l,
Stream: stream,
Config: r.config,
Writer: signalResponseMessageWriter{},
ConnectionID: livekit.ConnectionID(ss.ConnectionId),
})
err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink)