diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index d936fbd04..1b6640583 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -139,10 +139,12 @@ func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam } func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error { + r.lock.Lock() if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) } + r.lock.Unlock() return r.writeRTCMessage(r.rtcMessageChan, msg) } @@ -206,13 +208,19 @@ func (r *LocalRouter) rtcMessageWorker() { go r.rtcMessageWorker() }() - if r.rtcMessageChan.IsClosed() { + r.lock.RLock() + isClosed := r.rtcMessageChan.IsClosed() + r.lock.RUnlock() + if isClosed { // sleep and retry time.Sleep(time.Second) } + r.lock.RLock() + msgChan := r.rtcMessageChan.ReadChan() + r.lock.RUnlock() // consume messages from - for msg := range r.rtcMessageChan.ReadChan() { + for msg := range msgChan { if rtcMsg, ok := msg.(*livekit.RTCNodeMessage); ok { room, identity, err := parseParticipantKey(livekit.ParticipantKey(rtcMsg.ParticipantKey)) if err != nil { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index de7e7b04d..f9ca84b7a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -435,14 +435,24 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa } // handles RTC messages resulted from Room API calls -func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) { +func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() if room == nil { - logger.Warnw("Could not find room", nil, "room", roomName) - return + if _, ok := msg.Message.(*livekit.RTCNodeMessage_DeleteRoom); ok { + // special case of a non-RTC room e.g. room created but no participants joined + logger.Debugw("Deleting non-rtc room, loading from roomstore") + err := r.roomStore.DeleteRoom(ctx, roomName) + if err != nil { + logger.Debugw("Error deleting non-rtc room", "err", err) + } + return + } else { + logger.Warnw("Could not find room", nil, "room", roomName) + return + } } participant := room.GetParticipant(identity) diff --git a/test/multinode_test.go b/test/multinode_test.go index 2c2ba2ef6..57a2ed57e 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -153,6 +153,18 @@ func TestMultiNodeJoinAfterClose(t *testing.T) { scenarioJoinClosedRoom(t) } +func TestMultiNodeCloseNonRTCRoom(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + _, _, finish := setupMultiNodeTest("closeNonRTCRoom") + defer finish() + + closeNonRTCRoom(t) +} + // ensure that token accurately reflects out of band updates func TestMultiNodeRefreshToken(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultiNodeJoinAfterClose") diff --git a/test/scenarios.go b/test/scenarios.go index c15a980e2..d5c7878e0 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -166,6 +166,20 @@ func scenarioJoinClosedRoom(t *testing.T) { stopClients(c2) } +// close a room that has been created, but no participant has joined +func closeNonRTCRoom(t *testing.T) { + createCtx := contextWithToken(createRoomToken()) + _, err := roomClient.CreateRoom(createCtx, &livekit.CreateRoomRequest{ + Name: testRoom, + }) + require.NoError(t, err) + + _, err = roomClient.DeleteRoom(createCtx, &livekit.DeleteRoomRequest{ + Room: testRoom, + }) + require.NoError(t, err) +} + func publishTracksForClients(t *testing.T, clients ...*testclient.RTCClient) []*testclient.TrackWriter { logger.Infow("publishing tracks for clients") var writers []*testclient.TrackWriter diff --git a/test/singlenode_test.go b/test/singlenode_test.go index dea40dace..b0a8e9fb4 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -340,6 +340,18 @@ func TestSingleNodeJoinAfterClose(t *testing.T) { scenarioJoinClosedRoom(t) } +func TestSingleNodeCloseNonRTCRoom(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + _, finish := setupSingleNodeTest("closeNonRTCRoom") + defer finish() + + closeNonRTCRoom(t) +} + func TestAutoCreate(t *testing.T) { if testing.Short() { t.SkipNow()