diff --git a/cmd/server/commands.go b/cmd/server/commands.go index aa22f5e0a..d71c3860a 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -105,7 +105,8 @@ func createToken(c *cli.Context) error { } if c.Bool("recorder") { grant.Hidden = true - grant.CanSubscribe = true + grant.SetCanPublish(false) + grant.SetCanPublishData(false) } at := auth.NewAccessToken(apiKey, apiSecret). diff --git a/go.mod b/go.mod index a51c9446f..908eb0f98 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.7.2 + github.com/livekit/protocol v0.7.4 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 8009c296d..039c530e4 100644 --- a/go.sum +++ b/go.sum @@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0= github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= -github.com/livekit/protocol v0.7.2 h1:4qwCkIFKhDYeyzp79lwb09/nwkjyjql3/o/Viifnyig= -github.com/livekit/protocol v0.7.2/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= +github.com/livekit/protocol v0.7.4 h1:t44jtmvYa2ENfwG/CxTqvgAmhDXeSMIysTk4HvSl3oU= +github.com/livekit/protocol v0.7.4/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/config/config.go b/pkg/config/config.go index 838a2092a..564d0a1d1 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -144,8 +144,6 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) { }, TURN: TURNConfig{ Enabled: false, - TLSPort: 5349, - UDPPort: 3478, }, Keys: map[string]string{}, } @@ -170,6 +168,7 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) { // set defaults for ports if none are set if conf.RTC.UDPPort == 0 && conf.RTC.ICEPortRangeStart == 0 { + // to make it easier to run in dev mode/docker, default to single port if conf.Development { conf.RTC.UDPPort = 7882 } else { diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index 4d281b220..410a346d7 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -16,6 +16,7 @@ func newMockParticipant(identity string, protocol types.ProtocolVersion, hidden p.ProtocolVersionReturns(protocol) p.CanSubscribeReturns(true) p.CanPublishReturns(!hidden) + p.CanPublishDataReturns(!hidden) p.HiddenReturns(hidden) p.SetMetadataStub = func(m string) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f7910e1d3..431e0da4d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -598,6 +598,10 @@ func (p *ParticipantImpl) CanSubscribe() bool { return p.permission == nil || p.permission.CanSubscribe } +func (p *ParticipantImpl) CanPublishData() bool { + return p.permission == nil || p.permission.CanPublishData +} + func (p *ParticipantImpl) Hidden() bool { return p.params.Hidden } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 81ce27314..ef053c994 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -352,6 +352,16 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) { r.onParticipantChanged = f } +func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Kind) { + dp := &livekit.DataPacket{ + Kind: kind, + Value: &livekit.DataPacket_User{ + User: up, + }, + } + r.onDataPacket(nil, dp) +} + // checks if participant should be autosubscribed to new tracks, assumes lock is already acquired func (r *Room) autoSubscribe(participant types.Participant) bool { if !participant.CanSubscribe() { @@ -422,13 +432,17 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) { } func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) { + // don't forward if source isn't allowed to publish data + if source != nil && !source.CanPublishData() { + return + } dest := dp.GetUser().GetDestinationSids() for _, op := range r.GetParticipants() { if op.State() != livekit.ParticipantInfo_ACTIVE { continue } - if op.ID() == source.ID() { + if source != nil && op.ID() == source.ID() { continue } if len(dest) > 0 { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 2b2c86742..11a0a37ba 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -454,6 +454,30 @@ func TestDataChannel(t *testing.T) { require.Equal(t, 1, p1.SendDataPacketCallCount()) require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value) }) + + t.Run("publishing disallowed", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) + defer rm.Close() + participants := rm.GetParticipants() + p := participants[0].(*typesfakes.FakeParticipant) + p.CanPublishDataReturns(false) + + packet := livekit.DataPacket{ + Kind: livekit.DataPacket_RELIABLE, + Value: &livekit.DataPacket_User{ + User: &livekit.UserPacket{ + Payload: []byte{}, + }, + }, + } + p.OnDataPacketArgsForCall(0)(p, &packet) + + // no one should've been sent packet + for _, op := range participants { + fp := op.(*typesfakes.FakeParticipant) + require.Zero(t, fp.SendDataPacketCallCount()) + } + }) } func TestHiddenParticipants(t *testing.T) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index a6b0e5151..c8e9274b8 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -57,6 +57,7 @@ type Participant interface { CanPublish() bool CanSubscribe() bool + CanPublishData() bool Hidden() bool Start() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 22e54235d..cb7989c55 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -59,6 +59,16 @@ type FakeParticipant struct { canPublishReturnsOnCall map[int]struct { result1 bool } + CanPublishDataStub func() bool + canPublishDataMutex sync.RWMutex + canPublishDataArgsForCall []struct { + } + canPublishDataReturns struct { + result1 bool + } + canPublishDataReturnsOnCall map[int]struct { + result1 bool + } CanSubscribeStub func() bool canSubscribeMutex sync.RWMutex canSubscribeArgsForCall []struct { @@ -649,6 +659,59 @@ func (fake *FakeParticipant) CanPublishReturnsOnCall(i int, result1 bool) { }{result1} } +func (fake *FakeParticipant) CanPublishData() bool { + fake.canPublishDataMutex.Lock() + ret, specificReturn := fake.canPublishDataReturnsOnCall[len(fake.canPublishDataArgsForCall)] + fake.canPublishDataArgsForCall = append(fake.canPublishDataArgsForCall, struct { + }{}) + stub := fake.CanPublishDataStub + fakeReturns := fake.canPublishDataReturns + fake.recordInvocation("CanPublishData", []interface{}{}) + fake.canPublishDataMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) CanPublishDataCallCount() int { + fake.canPublishDataMutex.RLock() + defer fake.canPublishDataMutex.RUnlock() + return len(fake.canPublishDataArgsForCall) +} + +func (fake *FakeParticipant) CanPublishDataCalls(stub func() bool) { + fake.canPublishDataMutex.Lock() + defer fake.canPublishDataMutex.Unlock() + fake.CanPublishDataStub = stub +} + +func (fake *FakeParticipant) CanPublishDataReturns(result1 bool) { + fake.canPublishDataMutex.Lock() + defer fake.canPublishDataMutex.Unlock() + fake.CanPublishDataStub = nil + fake.canPublishDataReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) CanPublishDataReturnsOnCall(i int, result1 bool) { + fake.canPublishDataMutex.Lock() + defer fake.canPublishDataMutex.Unlock() + fake.CanPublishDataStub = nil + if fake.canPublishDataReturnsOnCall == nil { + fake.canPublishDataReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.canPublishDataReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) CanSubscribe() bool { fake.canSubscribeMutex.Lock() ret, specificReturn := fake.canSubscribeReturnsOnCall[len(fake.canSubscribeArgsForCall)] @@ -2550,6 +2613,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.addTrackMutex.RUnlock() fake.canPublishMutex.RLock() defer fake.canPublishMutex.RUnlock() + fake.canPublishDataMutex.RLock() + defer fake.canPublishDataMutex.RUnlock() fake.canSubscribeMutex.RLock() defer fake.canSubscribeMutex.RUnlock() fake.closeMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_published_track.go b/pkg/rtc/types/typesfakes/fake_published_track.go index 87860cabf..0db086599 100644 --- a/pkg/rtc/types/typesfakes/fake_published_track.go +++ b/pkg/rtc/types/typesfakes/fake_published_track.go @@ -90,11 +90,6 @@ type FakePublishedTrack struct { setMutedArgsForCall []struct { arg1 bool } - SetSimulcastLayersStub func([]livekit.VideoQuality) - setSimulcastLayersMutex sync.RWMutex - setSimulcastLayersArgsForCall []struct { - arg1 []livekit.VideoQuality - } StartStub func() startMutex sync.RWMutex startArgsForCall []struct { @@ -567,43 +562,6 @@ func (fake *FakePublishedTrack) SetMutedArgsForCall(i int) bool { return argsForCall.arg1 } -func (fake *FakePublishedTrack) SetSimulcastLayers(arg1 []livekit.VideoQuality) { - var arg1Copy []livekit.VideoQuality - if arg1 != nil { - arg1Copy = make([]livekit.VideoQuality, len(arg1)) - copy(arg1Copy, arg1) - } - fake.setSimulcastLayersMutex.Lock() - fake.setSimulcastLayersArgsForCall = append(fake.setSimulcastLayersArgsForCall, struct { - arg1 []livekit.VideoQuality - }{arg1Copy}) - stub := fake.SetSimulcastLayersStub - fake.recordInvocation("SetSimulcastLayers", []interface{}{arg1Copy}) - fake.setSimulcastLayersMutex.Unlock() - if stub != nil { - fake.SetSimulcastLayersStub(arg1) - } -} - -func (fake *FakePublishedTrack) SetSimulcastLayersCallCount() int { - fake.setSimulcastLayersMutex.RLock() - defer fake.setSimulcastLayersMutex.RUnlock() - return len(fake.setSimulcastLayersArgsForCall) -} - -func (fake *FakePublishedTrack) SetSimulcastLayersCalls(stub func([]livekit.VideoQuality)) { - fake.setSimulcastLayersMutex.Lock() - defer fake.setSimulcastLayersMutex.Unlock() - fake.SetSimulcastLayersStub = stub -} - -func (fake *FakePublishedTrack) SetSimulcastLayersArgsForCall(i int) []livekit.VideoQuality { - fake.setSimulcastLayersMutex.RLock() - defer fake.setSimulcastLayersMutex.RUnlock() - argsForCall := fake.setSimulcastLayersArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakePublishedTrack) Start() { fake.startMutex.Lock() fake.startArgsForCall = append(fake.startArgsForCall, struct { @@ -704,8 +662,6 @@ func (fake *FakePublishedTrack) Invocations() map[string][][]interface{} { defer fake.removeSubscriberMutex.RUnlock() fake.setMutedMutex.RLock() defer fake.setMutedMutex.RUnlock() - fake.setSimulcastLayersMutex.RLock() - defer fake.setSimulcastLayersMutex.RUnlock() fake.startMutex.RLock() defer fake.startMutex.RUnlock() fake.toProtoMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 379c1a017..4c67baa87 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -527,6 +527,13 @@ func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.R "tracks", rm.UpdateSubscriptions.TrackSids, "subscribe", rm.UpdateSubscriptions.Subscribe) } + case *livekit.RTCNodeMessage_SendData: + logger.Debugw("SendData", "message", rm) + up := &livekit.UserPacket{ + Payload: rm.SendData.Data, + DestinationSids: rm.SendData.DestinationSids, + } + room.SendDataPacket(up, rm.SendData.Kind) } } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index b0b9ffa48..1c3c669bc 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -194,6 +194,27 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda return &livekit.UpdateSubscriptionsResponse{}, nil } +func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { + // here we are using any user's identity, due to how it works with routing + participants, err := s.roomManager.roomStore.ListParticipants(req.Room) + if err != nil { + return nil, err + } + + if len(participants) > 0 { + err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{ + Message: &livekit.RTCNodeMessage_SendData{ + SendData: req, + }, + }) + if err != nil { + return nil, err + } + } + + return &livekit.SendDataResponse{}, nil +} + func (s *RoomService) createRTCSink(ctx context.Context, room, identity string) (routing.MessageSink, error) { if err := EnsureAdminPermission(ctx, room); err != nil { return nil, twirpAuthError(err) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 93f20c3c0..bd111a201 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -89,14 +89,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, if pv, err := strconv.Atoi(protocolParam); err == nil { pi.ProtocolVersion = int32(pv) } - - // only use permissions if any of them are set, default permissive - if claims.Video.CanPublish || claims.Video.CanSubscribe { - pi.Permission = &livekit.ParticipantPermission{ - CanSubscribe: claims.Video.CanSubscribe, - CanPublish: claims.Video.CanPublish, - } - } + pi.Permission = permissionFromGrant(claims.Video) return roomName, pi, http.StatusOK, nil } diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 58c72f2d3..48272f057 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -92,3 +92,21 @@ func handleError(w http.ResponseWriter, status int, msg string) { func boolValue(s string) bool { return s == "1" || s == "true" } + +func permissionFromGrant(claim *auth.VideoGrant) *livekit.ParticipantPermission { + p := &livekit.ParticipantPermission{ + CanSubscribe: true, + CanPublish: true, + CanPublishData: true, + } + if claim.CanPublish != nil { + p.CanPublish = *claim.CanPublish + } + if claim.CanSubscribe != nil { + p.CanSubscribe = *claim.CanSubscribe + } + if claim.CanPublishData != nil { + p.CanPublishData = *claim.CanPublishData + } + return p +}