From ffb2c50a70597685b96cb0a32b6cbe4435455439 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 14 Nov 2021 11:18:01 -0800 Subject: [PATCH] Fixed room API breakage (#190) --- pkg/routing/redisrouter.go | 2 +- pkg/service/roommanager.go | 15 ++++++++++++--- pkg/service/roomservice.go | 7 +++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 999602160..b786efdbb 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -181,7 +181,7 @@ func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity st } func (r *RedisRouter) WriteRTCNodeMessage(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { - rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, "") + rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, msg.ParticipantKey) return r.writeRTCMessage(rtcSink, msg) } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index eda2a649c..eeabc725c 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -464,15 +464,18 @@ func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, ident } participant := room.GetParticipant(identity) - if participant == nil { - return - } switch rm := msg.Message.(type) { case *livekit.RTCNodeMessage_RemoveParticipant: + if participant == nil { + return + } logger.Infow("removing participant", "room", roomName, "participant", identity) room.RemoveParticipant(identity) case *livekit.RTCNodeMessage_MuteTrack: + if participant == nil { + return + } logger.Debugw("setting track muted", "room", roomName, "participant", identity, "track", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted) if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute { @@ -481,6 +484,9 @@ func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, ident } participant.SetTrackMuted(rm.MuteTrack.TrackSid, rm.MuteTrack.Muted, true) case *livekit.RTCNodeMessage_UpdateParticipant: + if participant == nil { + return + } logger.Debugw("updating participant", "room", roomName, "participant", identity) if rm.UpdateParticipant.Metadata != "" { participant.SetMetadata(rm.UpdateParticipant.Metadata) @@ -494,6 +500,9 @@ func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, ident } room.Close() case *livekit.RTCNodeMessage_UpdateSubscriptions: + if participant == nil { + return + } logger.Debugw("updating participant subscriptions", "room", roomName, "participant", identity) if err := room.UpdateSubscriptions(participant, rm.UpdateSubscriptions.TrackSids, rm.UpdateSubscriptions.Subscribe); err != nil { logger.Warnw("could not update subscription", err, diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 02b80a91f..0330e9bc6 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -69,6 +69,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + ParticipantKey: s.roomParticipantKey(req.Room), Message: &livekit.RTCNodeMessage_DeleteRoom{ DeleteRoom: req, }, @@ -198,6 +199,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest } err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + ParticipantKey: s.roomParticipantKey(req.Room), Message: &livekit.RTCNodeMessage_SendData{ SendData: req, }, @@ -227,6 +229,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat } err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ + ParticipantKey: s.roomParticipantKey(req.Room), Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ UpdateRoomMetadata: req, }, @@ -250,3 +253,7 @@ func (s *RoomService) writeMessage(ctx context.Context, room, identity string, m return s.router.WriteRTCMessage(ctx, room, identity, msg) } + +func (s *RoomService) roomParticipantKey(room string) string { + return room + "|" +}