unify resume/restart in single/multi node mode (#946)

This commit is contained in:
cnderrauber
2022-08-23 18:22:44 +08:00
committed by GitHub
parent 0b42a8f2c6
commit f365481dde
3 changed files with 19 additions and 13 deletions
+5 -1
View File
@@ -10,6 +10,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
// aggregated channel for all participants
@@ -87,8 +88,11 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek
return
}
// create a new connection id
connectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
// index channels by roomName | identity
key := participantKey(roomName, pi.Identity)
key = key + "|" + livekit.ParticipantKey(connectionID)
// close older channels if one already exists
reqChan := r.getMessageChannel(r.requestChannels, string(key))
@@ -121,7 +125,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livek
)
}
}()
return livekit.ConnectionID(pi.Identity), reqChan, resChan, nil
return connectionID, reqChan, resChan, nil
}
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
+8 -10
View File
@@ -230,16 +230,14 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
return ErrHandlerNotDefined
}
if !ss.Reconnect {
// when it's not reconnecting, we do not want to re-use the same response sink
// the previous rtc worker thread is still consuming off of it.
// we'll want to sever the connection and switch to the new one
r.lock.RLock()
requestChan, ok := r.requestChannels[string(participantKey)]
r.lock.RUnlock()
if ok {
requestChan.Close()
}
// we do not want to re-use the same response sink
// the previous rtc worker thread is still consuming off of it.
// we'll want to sever the connection and switch to the new one
r.lock.RLock()
requestChan, ok := r.requestChannels[string(participantKey)]
r.lock.RUnlock()
if ok {
requestChan.Close()
}
pi, err := ParticipantInitFromStartSession(ss, r.currentNode.Region)
+6 -2
View File
@@ -226,7 +226,12 @@ func (r *RoomManager) StartSession(
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
)
return room.ResumeParticipant(participant, responseSink)
if err = room.ResumeParticipant(participant, responseSink); err != nil {
logger.Warnw("could not resume participant", err, "participant", pi.Identity)
return err
}
go r.rtcSessionWorker(room, participant, requestSource)
return nil
} else {
participant.GetLogger().Infow("removing duplicate participant")
// we need to clean up the existing participant, so a new one can join
@@ -426,7 +431,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
"room", room.Name(),
"roomID", room.ID(),
)
_ = participant.Close(true, types.ParticipantCloseReasonRTCSessionFinish)
requestSource.Close()
}()
defer rtc.Recover()