mirror of
https://github.com/livekit/livekit.git
synced 2026-05-17 17:55:40 +00:00
@@ -75,7 +75,7 @@ type MessageRouter interface {
|
||||
|
||||
// Write a message to a participant or room
|
||||
WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
|
||||
WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
|
||||
WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
|
||||
}
|
||||
|
||||
func CreateRouter(rc *redis.Client, node LocalNode) Router {
|
||||
|
||||
@@ -119,8 +119,8 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro
|
||||
return r.writeRTCMessage(r.rtcMessageChan, msg)
|
||||
}
|
||||
|
||||
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
msg.ParticipantKey = participantKey(roomName, identity)
|
||||
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error {
|
||||
msg.ParticipantKey = participantKey(roomName, "")
|
||||
return r.WriteNodeRTC(ctx, r.currentNode.Id, msg)
|
||||
}
|
||||
|
||||
|
||||
@@ -191,12 +191,12 @@ func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro
|
||||
return r.writeRTCMessage(rtcSink, msg)
|
||||
}
|
||||
|
||||
func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error {
|
||||
node, err := r.GetNodeForRoom(ctx, roomName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg.ParticipantKey = participantKey(roomName, identity)
|
||||
msg.ParticipantKey = participantKey(roomName, "")
|
||||
return r.WriteNodeRTC(ctx, node.Id, msg)
|
||||
}
|
||||
|
||||
|
||||
@@ -152,13 +152,12 @@ type FakeRouter struct {
|
||||
writeParticipantRTCReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
WriteRoomRTCStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error
|
||||
WriteRoomRTCStub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error
|
||||
writeRoomRTCMutex sync.RWMutex
|
||||
writeRoomRTCArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 livekit.ParticipantIdentity
|
||||
arg4 *livekit.RTCNodeMessage
|
||||
arg3 *livekit.RTCNodeMessage
|
||||
}
|
||||
writeRoomRTCReturns struct {
|
||||
result1 error
|
||||
@@ -876,21 +875,20 @@ func (fake *FakeRouter) WriteParticipantRTCReturnsOnCall(i int, result1 error) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity, arg4 *livekit.RTCNodeMessage) error {
|
||||
func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RTCNodeMessage) error {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
ret, specificReturn := fake.writeRoomRTCReturnsOnCall[len(fake.writeRoomRTCArgsForCall)]
|
||||
fake.writeRoomRTCArgsForCall = append(fake.writeRoomRTCArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 livekit.ParticipantIdentity
|
||||
arg4 *livekit.RTCNodeMessage
|
||||
}{arg1, arg2, arg3, arg4})
|
||||
arg3 *livekit.RTCNodeMessage
|
||||
}{arg1, arg2, arg3})
|
||||
stub := fake.WriteRoomRTCStub
|
||||
fakeReturns := fake.writeRoomRTCReturns
|
||||
fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3, arg4})
|
||||
fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3})
|
||||
fake.writeRoomRTCMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2, arg3, arg4)
|
||||
return stub(arg1, arg2, arg3)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
@@ -904,17 +902,17 @@ func (fake *FakeRouter) WriteRoomRTCCallCount() int {
|
||||
return len(fake.writeRoomRTCArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error) {
|
||||
func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error) {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
defer fake.writeRoomRTCMutex.Unlock()
|
||||
fake.WriteRoomRTCStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) {
|
||||
func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RTCNodeMessage) {
|
||||
fake.writeRoomRTCMutex.RLock()
|
||||
defer fake.writeRoomRTCMutex.RUnlock()
|
||||
argsForCall := fake.writeRoomRTCArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
|
||||
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCReturns(result1 error) {
|
||||
|
||||
@@ -72,7 +72,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
|
||||
if err := EnsureCreatePermission(ctx); err != nil {
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
err := s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), "", &livekit.RTCNodeMessage{
|
||||
err := s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_DeleteRoom{
|
||||
DeleteRoom: req,
|
||||
},
|
||||
@@ -255,7 +255,12 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
|
||||
}
|
||||
|
||||
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
|
||||
err := s.writeRoomMessage(ctx, livekit.RoomName(req.Room), "", &livekit.RTCNodeMessage{
|
||||
roomName := livekit.RoomName(req.Room)
|
||||
if err := EnsureAdminPermission(ctx, roomName); err != nil {
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_SendData{
|
||||
SendData: req,
|
||||
},
|
||||
@@ -287,7 +292,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.writeRoomMessage(ctx, livekit.RoomName(req.Room), "", &livekit.RTCNodeMessage{
|
||||
err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
|
||||
UpdateRoomMetadata: req,
|
||||
},
|
||||
@@ -326,21 +331,6 @@ func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.
|
||||
return s.router.WriteParticipantRTC(ctx, room, identity, msg)
|
||||
}
|
||||
|
||||
func (s *RoomService) writeRoomMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
if err := EnsureAdminPermission(ctx, room); err != nil {
|
||||
return twirpAuthError(err)
|
||||
}
|
||||
|
||||
if identity != "" {
|
||||
_, err := s.roomStore.LoadParticipant(ctx, room, identity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return s.router.WriteRoomRTC(ctx, room, identity, msg)
|
||||
}
|
||||
|
||||
func confirmExecution(f func() error) error {
|
||||
expired := time.After(executionTimeout)
|
||||
var err error
|
||||
|
||||
Reference in New Issue
Block a user