deleting nonRtc rooms and fixing race when deleting rooms (#721)

This commit is contained in:
Dan McFaul
2022-05-27 09:45:22 -06:00
committed by David Zhao
parent 87dea25359
commit cbbd553f8e
5 changed files with 61 additions and 5 deletions
+10 -2
View File
@@ -139,10 +139,12 @@ func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam
}
func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error {
r.lock.Lock()
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
}
r.lock.Unlock()
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
@@ -206,13 +208,19 @@ func (r *LocalRouter) rtcMessageWorker() {
go r.rtcMessageWorker()
}()
if r.rtcMessageChan.IsClosed() {
r.lock.RLock()
isClosed := r.rtcMessageChan.IsClosed()
r.lock.RUnlock()
if isClosed {
// sleep and retry
time.Sleep(time.Second)
}
r.lock.RLock()
msgChan := r.rtcMessageChan.ReadChan()
r.lock.RUnlock()
// consume messages from
for msg := range r.rtcMessageChan.ReadChan() {
for msg := range msgChan {
if rtcMsg, ok := msg.(*livekit.RTCNodeMessage); ok {
room, identity, err := parseParticipantKey(livekit.ParticipantKey(rtcMsg.ParticipantKey))
if err != nil {
+13 -3
View File
@@ -435,14 +435,24 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
}
// handles RTC messages resulted from Room API calls
func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) {
func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
if room == nil {
logger.Warnw("Could not find room", nil, "room", roomName)
return
if _, ok := msg.Message.(*livekit.RTCNodeMessage_DeleteRoom); ok {
// special case of a non-RTC room e.g. room created but no participants joined
logger.Debugw("Deleting non-rtc room, loading from roomstore")
err := r.roomStore.DeleteRoom(ctx, roomName)
if err != nil {
logger.Debugw("Error deleting non-rtc room", "err", err)
}
return
} else {
logger.Warnw("Could not find room", nil, "room", roomName)
return
}
}
participant := room.GetParticipant(identity)
+12
View File
@@ -153,6 +153,18 @@ func TestMultiNodeJoinAfterClose(t *testing.T) {
scenarioJoinClosedRoom(t)
}
func TestMultiNodeCloseNonRTCRoom(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, _, finish := setupMultiNodeTest("closeNonRTCRoom")
defer finish()
closeNonRTCRoom(t)
}
// ensure that token accurately reflects out of band updates
func TestMultiNodeRefreshToken(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeJoinAfterClose")
+14
View File
@@ -166,6 +166,20 @@ func scenarioJoinClosedRoom(t *testing.T) {
stopClients(c2)
}
// close a room that has been created, but no participant has joined
func closeNonRTCRoom(t *testing.T) {
createCtx := contextWithToken(createRoomToken())
_, err := roomClient.CreateRoom(createCtx, &livekit.CreateRoomRequest{
Name: testRoom,
})
require.NoError(t, err)
_, err = roomClient.DeleteRoom(createCtx, &livekit.DeleteRoomRequest{
Room: testRoom,
})
require.NoError(t, err)
}
func publishTracksForClients(t *testing.T, clients ...*testclient.RTCClient) []*testclient.TrackWriter {
logger.Infow("publishing tracks for clients")
var writers []*testclient.TrackWriter
+12
View File
@@ -340,6 +340,18 @@ func TestSingleNodeJoinAfterClose(t *testing.T) {
scenarioJoinClosedRoom(t)
}
func TestSingleNodeCloseNonRTCRoom(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, finish := setupSingleNodeTest("closeNonRTCRoom")
defer finish()
closeNonRTCRoom(t)
}
func TestAutoCreate(t *testing.T) {
if testing.Short() {
t.SkipNow()