SendData API & publishData permission (#88)

* SendData Server API

* SendData Server-API adjustments based on suggestions

* Update proto version

* enforce publishData permission

* go mod tidy

* fix go.mod

* go mod tidy

Co-authored-by: ChesterMing <89124853+ChesterMing@users.noreply.github.com>
This commit is contained in:
David Zhao
2021-08-19 18:11:48 -07:00
committed by GitHub
parent 2ab37d8fcc
commit bcf6e15e07
15 changed files with 163 additions and 59 deletions
+2 -1
View File
@@ -105,7 +105,8 @@ func createToken(c *cli.Context) error {
}
if c.Bool("recorder") {
grant.Hidden = true
grant.CanSubscribe = true
grant.SetCanPublish(false)
grant.SetCanPublishData(false)
}
at := auth.NewAccessToken(apiKey, apiSecret).
+1 -1
View File
@@ -12,7 +12,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.7.2
github.com/livekit/protocol v0.7.4
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+2 -2
View File
@@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0=
github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.7.2 h1:4qwCkIFKhDYeyzp79lwb09/nwkjyjql3/o/Viifnyig=
github.com/livekit/protocol v0.7.2/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/livekit/protocol v0.7.4 h1:t44jtmvYa2ENfwG/CxTqvgAmhDXeSMIysTk4HvSl3oU=
github.com/livekit/protocol v0.7.4/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+1 -2
View File
@@ -144,8 +144,6 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) {
},
TURN: TURNConfig{
Enabled: false,
TLSPort: 5349,
UDPPort: 3478,
},
Keys: map[string]string{},
}
@@ -170,6 +168,7 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) {
// set defaults for ports if none are set
if conf.RTC.UDPPort == 0 && conf.RTC.ICEPortRangeStart == 0 {
// to make it easier to run in dev mode/docker, default to single port
if conf.Development {
conf.RTC.UDPPort = 7882
} else {
+1
View File
@@ -16,6 +16,7 @@ func newMockParticipant(identity string, protocol types.ProtocolVersion, hidden
p.ProtocolVersionReturns(protocol)
p.CanSubscribeReturns(true)
p.CanPublishReturns(!hidden)
p.CanPublishDataReturns(!hidden)
p.HiddenReturns(hidden)
p.SetMetadataStub = func(m string) {
+4
View File
@@ -598,6 +598,10 @@ func (p *ParticipantImpl) CanSubscribe() bool {
return p.permission == nil || p.permission.CanSubscribe
}
func (p *ParticipantImpl) CanPublishData() bool {
return p.permission == nil || p.permission.CanPublishData
}
func (p *ParticipantImpl) Hidden() bool {
return p.params.Hidden
}
+15 -1
View File
@@ -352,6 +352,16 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) {
r.onParticipantChanged = f
}
func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Kind) {
dp := &livekit.DataPacket{
Kind: kind,
Value: &livekit.DataPacket_User{
User: up,
},
}
r.onDataPacket(nil, dp)
}
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
func (r *Room) autoSubscribe(participant types.Participant) bool {
if !participant.CanSubscribe() {
@@ -422,13 +432,17 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) {
}
func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
// don't forward if source isn't allowed to publish data
if source != nil && !source.CanPublishData() {
return
}
dest := dp.GetUser().GetDestinationSids()
for _, op := range r.GetParticipants() {
if op.State() != livekit.ParticipantInfo_ACTIVE {
continue
}
if op.ID() == source.ID() {
if source != nil && op.ID() == source.ID() {
continue
}
if len(dest) > 0 {
+24
View File
@@ -454,6 +454,30 @@ func TestDataChannel(t *testing.T) {
require.Equal(t, 1, p1.SendDataPacketCallCount())
require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value)
})
t.Run("publishing disallowed", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p.CanPublishDataReturns(false)
packet := livekit.DataPacket{
Kind: livekit.DataPacket_RELIABLE,
Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{
Payload: []byte{},
},
},
}
p.OnDataPacketArgsForCall(0)(p, &packet)
// no one should've been sent packet
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
require.Zero(t, fp.SendDataPacketCallCount())
}
})
}
func TestHiddenParticipants(t *testing.T) {
+1
View File
@@ -57,6 +57,7 @@ type Participant interface {
CanPublish() bool
CanSubscribe() bool
CanPublishData() bool
Hidden() bool
Start()
@@ -59,6 +59,16 @@ type FakeParticipant struct {
canPublishReturnsOnCall map[int]struct {
result1 bool
}
CanPublishDataStub func() bool
canPublishDataMutex sync.RWMutex
canPublishDataArgsForCall []struct {
}
canPublishDataReturns struct {
result1 bool
}
canPublishDataReturnsOnCall map[int]struct {
result1 bool
}
CanSubscribeStub func() bool
canSubscribeMutex sync.RWMutex
canSubscribeArgsForCall []struct {
@@ -649,6 +659,59 @@ func (fake *FakeParticipant) CanPublishReturnsOnCall(i int, result1 bool) {
}{result1}
}
func (fake *FakeParticipant) CanPublishData() bool {
fake.canPublishDataMutex.Lock()
ret, specificReturn := fake.canPublishDataReturnsOnCall[len(fake.canPublishDataArgsForCall)]
fake.canPublishDataArgsForCall = append(fake.canPublishDataArgsForCall, struct {
}{})
stub := fake.CanPublishDataStub
fakeReturns := fake.canPublishDataReturns
fake.recordInvocation("CanPublishData", []interface{}{})
fake.canPublishDataMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) CanPublishDataCallCount() int {
fake.canPublishDataMutex.RLock()
defer fake.canPublishDataMutex.RUnlock()
return len(fake.canPublishDataArgsForCall)
}
func (fake *FakeParticipant) CanPublishDataCalls(stub func() bool) {
fake.canPublishDataMutex.Lock()
defer fake.canPublishDataMutex.Unlock()
fake.CanPublishDataStub = stub
}
func (fake *FakeParticipant) CanPublishDataReturns(result1 bool) {
fake.canPublishDataMutex.Lock()
defer fake.canPublishDataMutex.Unlock()
fake.CanPublishDataStub = nil
fake.canPublishDataReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) CanPublishDataReturnsOnCall(i int, result1 bool) {
fake.canPublishDataMutex.Lock()
defer fake.canPublishDataMutex.Unlock()
fake.CanPublishDataStub = nil
if fake.canPublishDataReturnsOnCall == nil {
fake.canPublishDataReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.canPublishDataReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) CanSubscribe() bool {
fake.canSubscribeMutex.Lock()
ret, specificReturn := fake.canSubscribeReturnsOnCall[len(fake.canSubscribeArgsForCall)]
@@ -2550,6 +2613,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.addTrackMutex.RUnlock()
fake.canPublishMutex.RLock()
defer fake.canPublishMutex.RUnlock()
fake.canPublishDataMutex.RLock()
defer fake.canPublishDataMutex.RUnlock()
fake.canSubscribeMutex.RLock()
defer fake.canSubscribeMutex.RUnlock()
fake.closeMutex.RLock()
@@ -90,11 +90,6 @@ type FakePublishedTrack struct {
setMutedArgsForCall []struct {
arg1 bool
}
SetSimulcastLayersStub func([]livekit.VideoQuality)
setSimulcastLayersMutex sync.RWMutex
setSimulcastLayersArgsForCall []struct {
arg1 []livekit.VideoQuality
}
StartStub func()
startMutex sync.RWMutex
startArgsForCall []struct {
@@ -567,43 +562,6 @@ func (fake *FakePublishedTrack) SetMutedArgsForCall(i int) bool {
return argsForCall.arg1
}
func (fake *FakePublishedTrack) SetSimulcastLayers(arg1 []livekit.VideoQuality) {
var arg1Copy []livekit.VideoQuality
if arg1 != nil {
arg1Copy = make([]livekit.VideoQuality, len(arg1))
copy(arg1Copy, arg1)
}
fake.setSimulcastLayersMutex.Lock()
fake.setSimulcastLayersArgsForCall = append(fake.setSimulcastLayersArgsForCall, struct {
arg1 []livekit.VideoQuality
}{arg1Copy})
stub := fake.SetSimulcastLayersStub
fake.recordInvocation("SetSimulcastLayers", []interface{}{arg1Copy})
fake.setSimulcastLayersMutex.Unlock()
if stub != nil {
fake.SetSimulcastLayersStub(arg1)
}
}
func (fake *FakePublishedTrack) SetSimulcastLayersCallCount() int {
fake.setSimulcastLayersMutex.RLock()
defer fake.setSimulcastLayersMutex.RUnlock()
return len(fake.setSimulcastLayersArgsForCall)
}
func (fake *FakePublishedTrack) SetSimulcastLayersCalls(stub func([]livekit.VideoQuality)) {
fake.setSimulcastLayersMutex.Lock()
defer fake.setSimulcastLayersMutex.Unlock()
fake.SetSimulcastLayersStub = stub
}
func (fake *FakePublishedTrack) SetSimulcastLayersArgsForCall(i int) []livekit.VideoQuality {
fake.setSimulcastLayersMutex.RLock()
defer fake.setSimulcastLayersMutex.RUnlock()
argsForCall := fake.setSimulcastLayersArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakePublishedTrack) Start() {
fake.startMutex.Lock()
fake.startArgsForCall = append(fake.startArgsForCall, struct {
@@ -704,8 +662,6 @@ func (fake *FakePublishedTrack) Invocations() map[string][][]interface{} {
defer fake.removeSubscriberMutex.RUnlock()
fake.setMutedMutex.RLock()
defer fake.setMutedMutex.RUnlock()
fake.setSimulcastLayersMutex.RLock()
defer fake.setSimulcastLayersMutex.RUnlock()
fake.startMutex.RLock()
defer fake.startMutex.RUnlock()
fake.toProtoMutex.RLock()
+7
View File
@@ -527,6 +527,13 @@ func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.R
"tracks", rm.UpdateSubscriptions.TrackSids,
"subscribe", rm.UpdateSubscriptions.Subscribe)
}
case *livekit.RTCNodeMessage_SendData:
logger.Debugw("SendData", "message", rm)
up := &livekit.UserPacket{
Payload: rm.SendData.Data,
DestinationSids: rm.SendData.DestinationSids,
}
room.SendDataPacket(up, rm.SendData.Kind)
}
}
+21
View File
@@ -194,6 +194,27 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
return &livekit.UpdateSubscriptionsResponse{}, nil
}
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomManager.roomStore.ListParticipants(req.Room)
if err != nil {
return nil, err
}
if len(participants) > 0 {
err := s.writeMessage(ctx, req.Room, participants[0].Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_SendData{
SendData: req,
},
})
if err != nil {
return nil, err
}
}
return &livekit.SendDataResponse{}, nil
}
func (s *RoomService) createRTCSink(ctx context.Context, room, identity string) (routing.MessageSink, error) {
if err := EnsureAdminPermission(ctx, room); err != nil {
return nil, twirpAuthError(err)
+1 -8
View File
@@ -89,14 +89,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
if pv, err := strconv.Atoi(protocolParam); err == nil {
pi.ProtocolVersion = int32(pv)
}
// only use permissions if any of them are set, default permissive
if claims.Video.CanPublish || claims.Video.CanSubscribe {
pi.Permission = &livekit.ParticipantPermission{
CanSubscribe: claims.Video.CanSubscribe,
CanPublish: claims.Video.CanPublish,
}
}
pi.Permission = permissionFromGrant(claims.Video)
return roomName, pi, http.StatusOK, nil
}
+18
View File
@@ -92,3 +92,21 @@ func handleError(w http.ResponseWriter, status int, msg string) {
func boolValue(s string) bool {
return s == "1" || s == "true"
}
func permissionFromGrant(claim *auth.VideoGrant) *livekit.ParticipantPermission {
p := &livekit.ParticipantPermission{
CanSubscribe: true,
CanPublish: true,
CanPublishData: true,
}
if claim.CanPublish != nil {
p.CanPublish = *claim.CanPublish
}
if claim.CanSubscribe != nil {
p.CanSubscribe = *claim.CanSubscribe
}
if claim.CanPublishData != nil {
p.CanPublishData = *claim.CanPublishData
}
return p
}