mirror of
https://github.com/livekit/livekit.git
synced 2026-04-05 08:05:41 +00:00
* add size limit to metadata of room & participant * description in config-sample * solve comments
367 lines
9.6 KiB
Go
367 lines
9.6 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/thoas/go-funk"
|
|
"github.com/twitchtv/twirp"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/routing"
|
|
)
|
|
|
|
const (
|
|
executionTimeout = 2 * time.Second
|
|
checkInterval = 50 * time.Millisecond
|
|
)
|
|
|
|
// A rooms service that supports a single node
|
|
type RoomService struct {
|
|
router routing.MessageRouter
|
|
roomAllocator RoomAllocator
|
|
roomStore ServiceStore
|
|
conf config.RoomConfig
|
|
}
|
|
|
|
func NewRoomService(ra RoomAllocator, rs ServiceStore, router routing.MessageRouter, conf config.RoomConfig) (svc *RoomService, err error) {
|
|
svc = &RoomService{
|
|
router: router,
|
|
roomAllocator: ra,
|
|
roomStore: rs,
|
|
conf: conf,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (rm *livekit.Room, err error) {
|
|
if err = EnsureCreatePermission(ctx); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
rm, err = s.roomAllocator.CreateRoom(ctx, req)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "could not create room")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (res *livekit.ListRoomsResponse, err error) {
|
|
err = EnsureListPermission(ctx)
|
|
if err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
var names []livekit.RoomName
|
|
if len(req.Names) > 0 {
|
|
names = livekit.StringsAsRoomNames(req.Names)
|
|
}
|
|
rooms, err := s.roomStore.ListRooms(ctx, names)
|
|
if err != nil {
|
|
// TODO: translate error codes to twirp
|
|
return
|
|
}
|
|
|
|
res = &livekit.ListRoomsResponse{
|
|
Rooms: rooms,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) {
|
|
if err := EnsureCreatePermission(ctx); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
err := s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_DeleteRoom{
|
|
DeleteRoom: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// we should not return until when the room is confirmed deleted
|
|
err = confirmExecution(func() error {
|
|
_, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
|
|
if err == nil {
|
|
return ErrOperationFailed
|
|
} else if err != ErrRoomNotFound {
|
|
return err
|
|
} else {
|
|
return nil
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &livekit.DeleteRoomResponse{}, nil
|
|
}
|
|
|
|
func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (res *livekit.ListParticipantsResponse, err error) {
|
|
if err = EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
participants, err := s.roomStore.ListParticipants(ctx, livekit.RoomName(req.Room))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
res = &livekit.ListParticipantsResponse{
|
|
Participants: participants,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (res *livekit.ParticipantInfo, err error) {
|
|
if err = EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
participant, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
res = participant
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (res *livekit.RemoveParticipantResponse, err error) {
|
|
err = s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_RemoveParticipant{
|
|
RemoveParticipant: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = confirmExecution(func() error {
|
|
_, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
|
if err == ErrParticipantNotFound {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
} else {
|
|
return ErrOperationFailed
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res = &livekit.RemoveParticipantResponse{}
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (res *livekit.MuteRoomTrackResponse, err error) {
|
|
if err = EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
participant, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// find the track
|
|
t := funk.Find(participant.Tracks, func(t *livekit.TrackInfo) bool {
|
|
return t.Sid == req.TrackSid
|
|
})
|
|
if t == nil {
|
|
return nil, twirp.NotFoundError(ErrTrackNotFound.Error())
|
|
}
|
|
|
|
err = s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_MuteTrack{
|
|
MuteTrack: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var track *livekit.TrackInfo
|
|
err = confirmExecution(func() error {
|
|
p, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// ensure track is muted
|
|
t := funk.Find(p.Tracks, func(t *livekit.TrackInfo) bool {
|
|
return t.Sid == req.TrackSid
|
|
})
|
|
var ok bool
|
|
track, ok = t.(*livekit.TrackInfo)
|
|
if !ok {
|
|
return ErrTrackNotFound
|
|
}
|
|
if track.Muted != req.Muted {
|
|
return ErrOperationFailed
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res = &livekit.MuteRoomTrackResponse{
|
|
Track: track,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
|
|
if s.conf.MaxMetadataSize > 0 && len(req.Metadata) > int(s.conf.MaxMetadataSize) {
|
|
return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(int(s.conf.MaxMetadataSize)))
|
|
}
|
|
|
|
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_UpdateParticipant{
|
|
UpdateParticipant: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var participant *livekit.ParticipantInfo
|
|
err = confirmExecution(func() error {
|
|
participant, err = s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if req.Metadata != "" && participant.Metadata != req.Metadata {
|
|
return ErrOperationFailed
|
|
}
|
|
if req.Permission != nil && !proto.Equal(req.Permission, participant.Permission) {
|
|
return ErrOperationFailed
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return participant, nil
|
|
}
|
|
|
|
func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
|
|
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_UpdateSubscriptions{
|
|
UpdateSubscriptions: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &livekit.UpdateSubscriptionsResponse{}, nil
|
|
}
|
|
|
|
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
|
|
roomName := livekit.RoomName(req.Room)
|
|
if err := EnsureAdminPermission(ctx, roomName); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_SendData{
|
|
SendData: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &livekit.SendDataResponse{}, nil
|
|
}
|
|
|
|
func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
|
|
if s.conf.MaxMetadataSize > 0 && len(req.Metadata) > int(s.conf.MaxMetadataSize) {
|
|
return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(int(s.conf.MaxMetadataSize)))
|
|
}
|
|
|
|
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
|
|
return nil, twirpAuthError(err)
|
|
}
|
|
|
|
room, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// no one has joined the room, would not have been created on an RTC node.
|
|
// in this case, we'd want to run create again
|
|
_, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{
|
|
Name: req.Room,
|
|
Metadata: req.Metadata,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
|
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
|
|
UpdateRoomMetadata: req,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = confirmExecution(func() error {
|
|
room, err = s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if room.Metadata != req.Metadata {
|
|
return ErrOperationFailed
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return room, nil
|
|
}
|
|
|
|
func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
|
if err := EnsureAdminPermission(ctx, room); err != nil {
|
|
return twirpAuthError(err)
|
|
}
|
|
|
|
_, err := s.roomStore.LoadParticipant(ctx, room, identity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.router.WriteParticipantRTC(ctx, room, identity, msg)
|
|
}
|
|
|
|
func confirmExecution(f func() error) error {
|
|
expired := time.After(executionTimeout)
|
|
var err error
|
|
for {
|
|
select {
|
|
case <-expired:
|
|
return err
|
|
default:
|
|
err = f()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
time.Sleep(checkInterval)
|
|
}
|
|
}
|
|
}
|