From f365481dde7c445344c70094dffd43e2bb64e986 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 23 Aug 2022 18:22:44 +0800 Subject: [PATCH] unify resume/restart in single/multi node mode (#946) --- pkg/routing/localrouter.go | 6 +++++- pkg/routing/redisrouter.go | 18 ++++++++---------- pkg/service/roommanager.go | 8 ++++++-- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 1b6640583..b9efc81a0 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -10,6 +10,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) // aggregated channel for all participants @@ -87,8 +88,11 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek return } + // create a new connection id + connectionID = livekit.ConnectionID(utils.NewGuid("CO_")) // index channels by roomName | identity key := participantKey(roomName, pi.Identity) + key = key + "|" + livekit.ParticipantKey(connectionID) // close older channels if one already exists reqChan := r.getMessageChannel(r.requestChannels, string(key)) @@ -121,7 +125,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek ) } }() - return livekit.ConnectionID(pi.Identity), reqChan, resChan, nil + return connectionID, reqChan, resChan, nil } func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 424cb1d2e..86190a4e1 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -230,16 +230,14 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK return ErrHandlerNotDefined } - if !ss.Reconnect { - // when it's not reconnecting, we do not want to re-use the same response sink - // the previous rtc worker thread is still consuming off of it. - // we'll want to sever the connection and switch to the new one - r.lock.RLock() - requestChan, ok := r.requestChannels[string(participantKey)] - r.lock.RUnlock() - if ok { - requestChan.Close() - } + // we do not want to re-use the same response sink + // the previous rtc worker thread is still consuming off of it. + // we'll want to sever the connection and switch to the new one + r.lock.RLock() + requestChan, ok := r.requestChannels[string(participantKey)] + r.lock.RUnlock() + if ok { + requestChan.Close() } pi, err := ParticipantInitFromStartSession(ss, r.currentNode.Region) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 9ffd2069d..dd915eb83 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -226,7 +226,12 @@ func (r *RoomManager) StartSession( "nodeID", r.currentNode.Id, "participant", pi.Identity, ) - return room.ResumeParticipant(participant, responseSink) + if err = room.ResumeParticipant(participant, responseSink); err != nil { + logger.Warnw("could not resume participant", err, "participant", pi.Identity) + return err + } + go r.rtcSessionWorker(room, participant, requestSource) + return nil } else { participant.GetLogger().Infow("removing duplicate participant") // we need to clean up the existing participant, so a new one can join @@ -426,7 +431,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa "room", room.Name(), "roomID", room.ID(), ) - _ = participant.Close(true, types.ParticipantCloseReasonRTCSessionFinish) requestSource.Close() }() defer rtc.Recover()