Support for participant attributes (#2806)

* Support for participant attributes

* move metadata setters to LocalParticipant

* address feedback

* forward error

* update go mod

* update attributes first
This commit is contained in:
David Zhao
2024-06-20 02:14:19 -04:00
committed by GitHub
parent d4e50b633f
commit 7a774cc82a
24 changed files with 388 additions and 142 deletions
+8 -2
View File
@@ -183,8 +183,6 @@ keys:
# # allow tracks to be unmuted remotely, defaults to false
# # tracks can always be muted from the Room Service APIs
# enable_remote_unmute: true
# # limit size of room and participant's metadata, 0 for no limit
# max_metadata_size: 0
# # control playout delay in ms of video track (and associated audio track)
# playout_delay:
# enabled: true
@@ -311,3 +309,11 @@ keys:
# # value less or equal than 0 means no limit.
# subscription_limit_video: 0
# subscription_limit_audio: 0
# # limit size of room and participant's metadata, 0 for no limit
# max_metadata_size: 0
# # limit size of participant attributes, 0 for no limit
# max_attributes_size: 0
# # limit length of room names
# max_room_name_length: 0
# # limit length of participant identity
# max_participant_identity_length: 0
+1 -1
View File
@@ -20,7 +20,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75
github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed
github.com/livekit/protocol v1.18.0
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -152,8 +152,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed h1:S4avs1NKG6bBgHYuBOrQWnNxJSOdunGOB84BQfGzKmQ=
github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4=
github.com/livekit/protocol v1.18.0 h1:LLOjKBA8rtnGpVGjAmKUROy7bv/l9q1wyn9hNmj8Sdg=
github.com/livekit/protocol v1.18.0/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
+36 -13
View File
@@ -231,17 +231,20 @@ type VideoConfig struct {
type RoomConfig struct {
// enable rooms to be automatically created
AutoCreate bool `yaml:"auto_create,omitempty"`
EnabledCodecs []CodecSpec `yaml:"enabled_codecs,omitempty"`
MaxParticipants uint32 `yaml:"max_participants,omitempty"`
EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"`
DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"`
EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"`
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"`
SyncStreams bool `yaml:"sync_streams,omitempty"`
MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"`
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
AutoCreate bool `yaml:"auto_create,omitempty"`
EnabledCodecs []CodecSpec `yaml:"enabled_codecs,omitempty"`
MaxParticipants uint32 `yaml:"max_participants,omitempty"`
EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"`
DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"`
EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"`
PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"`
SyncStreams bool `yaml:"sync_streams,omitempty"`
// deprecated, moved to limits
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
// deprecated, moved to limits
MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"`
// deprecated, moved to limits
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
}
type CodecSpec struct {
@@ -300,6 +303,11 @@ type LimitConfig struct {
BytesPerSec float32 `yaml:"bytes_per_sec,omitempty"`
SubscriptionLimitVideo int32 `yaml:"subscription_limit_video,omitempty"`
SubscriptionLimitAudio int32 `yaml:"subscription_limit_audio,omitempty"`
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
// total size of all attributes on a participant
MaxAttributesSize uint32 `yaml:"max_attributes_size,omitempty"`
MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"`
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
}
type IngressConfig struct {
@@ -494,8 +502,12 @@ var DefaultConfig = Config{
{Mime: webrtc.MimeTypeVP9},
{Mime: webrtc.MimeTypeAV1},
},
EmptyTimeout: 5 * 60,
DepartureTimeout: 20,
EmptyTimeout: 5 * 60,
DepartureTimeout: 20,
},
Limit: LimitConfig{
MaxMetadataSize: 64000,
MaxAttributesSize: 64000,
MaxRoomNameLength: 256,
MaxParticipantIdentityLength: 256,
},
@@ -585,6 +597,17 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c
conf.Logging.ComponentLevels["pion"] = conf.Logging.PionLevel
}
// copy over legacy limits
if conf.Room.MaxMetadataSize != 0 {
conf.Limit.MaxMetadataSize = conf.Room.MaxMetadataSize
}
if conf.Room.MaxParticipantIdentityLength != 0 {
conf.Limit.MaxParticipantIdentityLength = conf.Room.MaxParticipantIdentityLength
}
if conf.Room.MaxRoomNameLength != 0 {
conf.Limit.MaxRoomNameLength = conf.Room.MaxRoomNameLength
}
return &conf, nil
}
+4 -1
View File
@@ -14,7 +14,9 @@
package rtc
import "errors"
import (
"errors"
)
var (
ErrRoomClosed = errors.New("room has already closed")
@@ -29,6 +31,7 @@ var (
ErrEmptyParticipantID = errors.New("participant ID cannot be empty")
ErrMissingGrants = errors.New("VideoGrant is missing")
ErrInternalError = errors.New("internal error")
ErrAttributeExceedsLimits = errors.New("attribute size exceeds limits")
// Track subscription related
ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track")
+48
View File
@@ -136,6 +136,7 @@ type ParticipantParams struct {
VersionGenerator utils.TimedVersionGenerator
TrackResolver types.MediaTrackResolver
DisableDynacast bool
MaxAttributesSize uint32
SubscriberAllowPause bool
SubscriptionLimitAudio int32
SubscriptionLimitVideo int32
@@ -459,6 +460,52 @@ func (p *ParticipantImpl) SetMetadata(metadata string) {
}
}
func (p *ParticipantImpl) SetAttributes(attrs map[string]string) error {
p.lock.Lock()
grants := p.grants.Load().Clone()
if grants.Attributes == nil {
grants.Attributes = make(map[string]string)
}
var keysToDelete []string
for k, v := range attrs {
if v == "" {
keysToDelete = append(keysToDelete, k)
} else {
grants.Attributes[k] = v
}
}
for _, k := range keysToDelete {
delete(grants.Attributes, k)
}
maxAttributesSize := p.params.MaxAttributesSize
if maxAttributesSize > 0 {
total := 0
for k, v := range grants.Attributes {
total += len(k) + len(v)
}
if uint32(total) > maxAttributesSize {
p.lock.Unlock()
return ErrAttributeExceedsLimits
}
}
p.grants.Store(grants)
p.dirty.Store(true)
onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()
if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
if onClaimsChanged != nil {
onClaimsChanged(p)
}
return nil
}
func (p *ParticipantImpl) ClaimGrants() *auth.ClaimGrants {
return p.grants.Load()
}
@@ -551,6 +598,7 @@ func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.
Version: v,
Permission: grants.Video.ToPermission(),
Metadata: grants.Metadata,
Attributes: grants.Attributes,
Region: p.params.Region,
IsPublisher: p.IsPublisher(),
Kind: grants.GetParticipantKind(),
+12 -1
View File
@@ -819,13 +819,24 @@ func (r *Room) SetMetadata(metadata string) <-chan struct{} {
return r.protoProxy.MarkDirty(true)
}
func (r *Room) UpdateParticipantMetadata(participant types.LocalParticipant, name string, metadata string) {
func (r *Room) UpdateParticipantMetadata(
participant types.LocalParticipant,
name string,
metadata string,
attributes map[string]string,
) error {
if attributes != nil && len(attributes) > 0 {
if err := participant.SetAttributes(attributes); err != nil {
return err
}
}
if metadata != "" {
participant.SetMetadata(metadata)
}
if name != "" {
participant.SetName(name)
}
return nil
}
func (r *Room) sendRoomUpdate() {
+9 -1
View File
@@ -93,7 +93,15 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case *livekit.SignalRequest_UpdateMetadata:
if participant.ClaimGrants().Video.GetCanUpdateOwnMetadata() {
room.UpdateParticipantMetadata(participant, msg.UpdateMetadata.Name, msg.UpdateMetadata.Metadata)
err := room.UpdateParticipantMetadata(
participant,
msg.UpdateMetadata.Name,
msg.UpdateMetadata.Metadata,
msg.UpdateMetadata.Attributes,
)
if err != nil {
pLogger.Warnw("could not update metadata", err)
}
}
case *livekit.SignalRequest_UpdateAudioTrack:
+6 -4
View File
@@ -255,9 +255,6 @@ type Participant interface {
CanSkipBroadcast() bool
ToProto() *livekit.ParticipantInfo
SetName(name string)
SetMetadata(metadata string)
IsPublisher() bool
GetPublishedTrack(trackID livekit.TrackID) MediaTrack
GetPublishedTracks() []MediaTrack
@@ -329,6 +326,11 @@ type LocalParticipant interface {
SetSignalSourceValid(valid bool)
HandleSignalSourceClose()
// updates
SetName(name string)
SetMetadata(metadata string)
SetAttributes(attributes map[string]string) error
// permissions
ClaimGrants() *auth.ClaimGrants
SetPermission(permission *livekit.ParticipantPermission) bool
@@ -437,7 +439,7 @@ type Room interface {
SimulateScenario(participant LocalParticipant, scenario *livekit.SimulateScenario) error
ResolveMediaTrackForSubscriber(subIdentity livekit.ParticipantIdentity, trackID livekit.TrackID) MediaResolverResult
GetLocalParticipants() []LocalParticipant
UpdateParticipantMetadata(participant LocalParticipant, name string, metadata string)
UpdateParticipantMetadata(participant LocalParticipant, name string, metadata string, attributes map[string]string) error
}
// MediaTrack represents a media track
@@ -733,6 +733,17 @@ type FakeLocalParticipant struct {
sendSpeakerUpdateReturnsOnCall map[int]struct {
result1 error
}
SetAttributesStub func(map[string]string) error
setAttributesMutex sync.RWMutex
setAttributesArgsForCall []struct {
arg1 map[string]string
}
setAttributesReturns struct {
result1 error
}
setAttributesReturnsOnCall map[int]struct {
result1 error
}
SetICEConfigStub func(*livekit.ICEConfig)
setICEConfigMutex sync.RWMutex
setICEConfigArgsForCall []struct {
@@ -4881,6 +4892,67 @@ func (fake *FakeLocalParticipant) SendSpeakerUpdateReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeLocalParticipant) SetAttributes(arg1 map[string]string) error {
fake.setAttributesMutex.Lock()
ret, specificReturn := fake.setAttributesReturnsOnCall[len(fake.setAttributesArgsForCall)]
fake.setAttributesArgsForCall = append(fake.setAttributesArgsForCall, struct {
arg1 map[string]string
}{arg1})
stub := fake.SetAttributesStub
fakeReturns := fake.setAttributesReturns
fake.recordInvocation("SetAttributes", []interface{}{arg1})
fake.setAttributesMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SetAttributesCallCount() int {
fake.setAttributesMutex.RLock()
defer fake.setAttributesMutex.RUnlock()
return len(fake.setAttributesArgsForCall)
}
func (fake *FakeLocalParticipant) SetAttributesCalls(stub func(map[string]string) error) {
fake.setAttributesMutex.Lock()
defer fake.setAttributesMutex.Unlock()
fake.SetAttributesStub = stub
}
func (fake *FakeLocalParticipant) SetAttributesArgsForCall(i int) map[string]string {
fake.setAttributesMutex.RLock()
defer fake.setAttributesMutex.RUnlock()
argsForCall := fake.setAttributesArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetAttributesReturns(result1 error) {
fake.setAttributesMutex.Lock()
defer fake.setAttributesMutex.Unlock()
fake.SetAttributesStub = nil
fake.setAttributesReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) SetAttributesReturnsOnCall(i int, result1 error) {
fake.setAttributesMutex.Lock()
defer fake.setAttributesMutex.Unlock()
fake.SetAttributesStub = nil
if fake.setAttributesReturnsOnCall == nil {
fake.setAttributesReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.setAttributesReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) SetICEConfig(arg1 *livekit.ICEConfig) {
fake.setICEConfigMutex.Lock()
fake.setICEConfigArgsForCall = append(fake.setICEConfigArgsForCall, struct {
@@ -6571,6 +6643,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.sendRoomUpdateMutex.RUnlock()
fake.sendSpeakerUpdateMutex.RLock()
defer fake.sendSpeakerUpdateMutex.RUnlock()
fake.setAttributesMutex.RLock()
defer fake.setAttributesMutex.RUnlock()
fake.setICEConfigMutex.RLock()
defer fake.setICEConfigMutex.RUnlock()
fake.setMetadataMutex.RLock()
@@ -175,16 +175,6 @@ type FakeParticipant struct {
arg2 bool
arg3 bool
}
SetMetadataStub func(string)
setMetadataMutex sync.RWMutex
setMetadataArgsForCall []struct {
arg1 string
}
SetNameStub func(string)
setNameMutex sync.RWMutex
setNameArgsForCall []struct {
arg1 string
}
StateStub func() livekit.ParticipantInfo_State
stateMutex sync.RWMutex
stateArgsForCall []struct {
@@ -1115,70 +1105,6 @@ func (fake *FakeParticipant) RemovePublishedTrackArgsForCall(i int) (types.Media
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipant) SetMetadata(arg1 string) {
fake.setMetadataMutex.Lock()
fake.setMetadataArgsForCall = append(fake.setMetadataArgsForCall, struct {
arg1 string
}{arg1})
stub := fake.SetMetadataStub
fake.recordInvocation("SetMetadata", []interface{}{arg1})
fake.setMetadataMutex.Unlock()
if stub != nil {
fake.SetMetadataStub(arg1)
}
}
func (fake *FakeParticipant) SetMetadataCallCount() int {
fake.setMetadataMutex.RLock()
defer fake.setMetadataMutex.RUnlock()
return len(fake.setMetadataArgsForCall)
}
func (fake *FakeParticipant) SetMetadataCalls(stub func(string)) {
fake.setMetadataMutex.Lock()
defer fake.setMetadataMutex.Unlock()
fake.SetMetadataStub = stub
}
func (fake *FakeParticipant) SetMetadataArgsForCall(i int) string {
fake.setMetadataMutex.RLock()
defer fake.setMetadataMutex.RUnlock()
argsForCall := fake.setMetadataArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) SetName(arg1 string) {
fake.setNameMutex.Lock()
fake.setNameArgsForCall = append(fake.setNameArgsForCall, struct {
arg1 string
}{arg1})
stub := fake.SetNameStub
fake.recordInvocation("SetName", []interface{}{arg1})
fake.setNameMutex.Unlock()
if stub != nil {
fake.SetNameStub(arg1)
}
}
func (fake *FakeParticipant) SetNameCallCount() int {
fake.setNameMutex.RLock()
defer fake.setNameMutex.RUnlock()
return len(fake.setNameArgsForCall)
}
func (fake *FakeParticipant) SetNameCalls(stub func(string)) {
fake.setNameMutex.Lock()
defer fake.setNameMutex.Unlock()
fake.SetNameStub = stub
}
func (fake *FakeParticipant) SetNameArgsForCall(i int) string {
fake.setNameMutex.RLock()
defer fake.setNameMutex.RUnlock()
argsForCall := fake.setNameArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) State() livekit.ParticipantInfo_State {
fake.stateMutex.Lock()
ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)]
@@ -1561,10 +1487,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.kindMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()
defer fake.removePublishedTrackMutex.RUnlock()
fake.setMetadataMutex.RLock()
defer fake.setMetadataMutex.RUnlock()
fake.setNameMutex.RLock()
defer fake.setNameMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
fake.subscriptionPermissionMutex.RLock()
+45 -8
View File
@@ -82,12 +82,19 @@ type FakeRoom struct {
syncStateReturnsOnCall map[int]struct {
result1 error
}
UpdateParticipantMetadataStub func(types.LocalParticipant, string, string)
UpdateParticipantMetadataStub func(types.LocalParticipant, string, string, map[string]string) error
updateParticipantMetadataMutex sync.RWMutex
updateParticipantMetadataArgsForCall []struct {
arg1 types.LocalParticipant
arg2 string
arg3 string
arg4 map[string]string
}
updateParticipantMetadataReturns struct {
result1 error
}
updateParticipantMetadataReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscriptionPermissionStub func(types.LocalParticipant, *livekit.SubscriptionPermission) error
updateSubscriptionPermissionMutex sync.RWMutex
@@ -492,19 +499,26 @@ func (fake *FakeRoom) SyncStateReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRoom) UpdateParticipantMetadata(arg1 types.LocalParticipant, arg2 string, arg3 string) {
func (fake *FakeRoom) UpdateParticipantMetadata(arg1 types.LocalParticipant, arg2 string, arg3 string, arg4 map[string]string) error {
fake.updateParticipantMetadataMutex.Lock()
ret, specificReturn := fake.updateParticipantMetadataReturnsOnCall[len(fake.updateParticipantMetadataArgsForCall)]
fake.updateParticipantMetadataArgsForCall = append(fake.updateParticipantMetadataArgsForCall, struct {
arg1 types.LocalParticipant
arg2 string
arg3 string
}{arg1, arg2, arg3})
arg4 map[string]string
}{arg1, arg2, arg3, arg4})
stub := fake.UpdateParticipantMetadataStub
fake.recordInvocation("UpdateParticipantMetadata", []interface{}{arg1, arg2, arg3})
fakeReturns := fake.updateParticipantMetadataReturns
fake.recordInvocation("UpdateParticipantMetadata", []interface{}{arg1, arg2, arg3, arg4})
fake.updateParticipantMetadataMutex.Unlock()
if stub != nil {
fake.UpdateParticipantMetadataStub(arg1, arg2, arg3)
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoom) UpdateParticipantMetadataCallCount() int {
@@ -513,17 +527,40 @@ func (fake *FakeRoom) UpdateParticipantMetadataCallCount() int {
return len(fake.updateParticipantMetadataArgsForCall)
}
func (fake *FakeRoom) UpdateParticipantMetadataCalls(stub func(types.LocalParticipant, string, string)) {
func (fake *FakeRoom) UpdateParticipantMetadataCalls(stub func(types.LocalParticipant, string, string, map[string]string) error) {
fake.updateParticipantMetadataMutex.Lock()
defer fake.updateParticipantMetadataMutex.Unlock()
fake.UpdateParticipantMetadataStub = stub
}
func (fake *FakeRoom) UpdateParticipantMetadataArgsForCall(i int) (types.LocalParticipant, string, string) {
func (fake *FakeRoom) UpdateParticipantMetadataArgsForCall(i int) (types.LocalParticipant, string, string, map[string]string) {
fake.updateParticipantMetadataMutex.RLock()
defer fake.updateParticipantMetadataMutex.RUnlock()
argsForCall := fake.updateParticipantMetadataArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeRoom) UpdateParticipantMetadataReturns(result1 error) {
fake.updateParticipantMetadataMutex.Lock()
defer fake.updateParticipantMetadataMutex.Unlock()
fake.UpdateParticipantMetadataStub = nil
fake.updateParticipantMetadataReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) UpdateParticipantMetadataReturnsOnCall(i int, result1 error) {
fake.updateParticipantMetadataMutex.Lock()
defer fake.updateParticipantMetadataMutex.Unlock()
fake.UpdateParticipantMetadataStub = nil
if fake.updateParticipantMetadataReturnsOnCall == nil {
fake.updateParticipantMetadataReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.updateParticipantMetadataReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) UpdateSubscriptionPermission(arg1 types.LocalParticipant, arg2 *livekit.SubscriptionPermission) error {
+1
View File
@@ -26,6 +26,7 @@ var (
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist")
ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified")
ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits")
ErrAttributeExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "attribute size exceeds limits")
ErrRoomNameExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "room name length exceeds limits")
ErrParticipantIdentityExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "participant identity length exceeds limits")
ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed")
+1 -1
View File
@@ -74,7 +74,7 @@ func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.E
return nil, err
}
log.Debugw("SIP dispatch rule matched", "sipRule", best.SipDispatchRuleId)
resp, err := sip.EvaluateDispatchRule(best, req)
resp, err := sip.EvaluateDispatchRule(trunkID, best, req)
if err != nil {
return nil, err
}
+9 -2
View File
@@ -433,6 +433,7 @@ func (r *RoomManager) StartSession(
AdaptiveStream: pi.AdaptiveStream,
AllowTCPFallback: allowFallback,
TURNSEnabled: r.config.IsTURNSEnabled(),
MaxAttributesSize: r.config.Limit.MaxAttributesSize,
GetParticipantInfo: func(pID livekit.ParticipantID) *livekit.ParticipantInfo {
if p := room.GetParticipantByID(pID); p != nil {
return p.ToProto()
@@ -708,8 +709,14 @@ func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.Update
}
participant.GetLogger().Debugw("updating participant",
"metadata", req.Metadata, "permission", req.Permission)
room.UpdateParticipantMetadata(participant, req.Name, req.Metadata)
"metadata", req.Metadata,
"permission", req.Permission,
"attributes", req.Attributes,
)
err = room.UpdateParticipantMetadata(participant, req.Name, req.Metadata, req.Attributes)
if err != nil {
return nil, err
}
if req.Permission != nil {
participant.SetPermission(req.Permission)
}
+16 -7
View File
@@ -33,9 +33,8 @@ import (
"github.com/livekit/protocol/rpc"
)
// A rooms service that supports a single node
type RoomService struct {
roomConf config.RoomConfig
limitConf config.LimitConfig
apiConf config.APIConfig
psrpcConf rpc.PSRPCConfig
router routing.MessageRouter
@@ -49,7 +48,7 @@ type RoomService struct {
}
func NewRoomService(
roomConf config.RoomConfig,
limitConf config.LimitConfig,
apiConf config.APIConfig,
psrpcConf rpc.PSRPCConfig,
router routing.MessageRouter,
@@ -62,7 +61,7 @@ func NewRoomService(
participantClient rpc.TypedParticipantClient,
) (svc *RoomService, err error) {
svc = &RoomService{
roomConf: roomConf,
limitConf: limitConf,
apiConf: apiConf,
psrpcConf: psrpcConf,
router: router,
@@ -87,7 +86,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, ErrEgressNotConnected
}
if limit := s.roomConf.MaxRoomNameLength; limit > 0 && len(req.Name) > limit {
if limit := s.limitConf.MaxRoomNameLength; limit > 0 && len(req.Name) > limit {
return nil, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, limit)
}
@@ -232,10 +231,20 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
maxMetadataSize := int(s.roomConf.MaxMetadataSize)
maxMetadataSize := int(s.limitConf.MaxMetadataSize)
if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize {
return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize))
}
maxAttributeSize := int(s.limitConf.MaxAttributesSize)
if maxAttributeSize > 0 {
total := 0
for key, val := range req.Attributes {
total += len(key) + len(val)
}
if total > maxAttributeSize {
return nil, twirp.InvalidArgumentError(ErrAttributeExceedsLimits.Error(), strconv.Itoa(maxAttributeSize))
}
}
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
@@ -270,7 +279,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest
func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
AppendLogFields(ctx, "room", req.Room, "size", len(req.Metadata))
maxMetadataSize := int(s.roomConf.MaxMetadataSize)
maxMetadataSize := int(s.limitConf.MaxMetadataSize)
if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize {
return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize))
}
+6 -6
View File
@@ -34,7 +34,7 @@ import (
func TestDeleteRoom(t *testing.T) {
t.Run("missing permissions", func(t *testing.T) {
svc := newTestRoomService(config.RoomConfig{})
svc := newTestRoomService(config.LimitConfig{})
grant := &auth.ClaimGrants{
Video: &auth.VideoGrant{},
}
@@ -48,7 +48,7 @@ func TestDeleteRoom(t *testing.T) {
func TestMetaDataLimits(t *testing.T) {
t.Run("metadata exceed limits", func(t *testing.T) {
svc := newTestRoomService(config.RoomConfig{MaxMetadataSize: 5})
svc := newTestRoomService(config.LimitConfig{MaxMetadataSize: 5})
grant := &auth.ClaimGrants{
Video: &auth.VideoGrant{},
}
@@ -72,8 +72,8 @@ func TestMetaDataLimits(t *testing.T) {
})
notExceedsLimitsSvc := map[string]*TestRoomService{
"metadata noe exceeds limits": newTestRoomService(config.RoomConfig{MaxMetadataSize: 5}),
"metadata no limits": newTestRoomService(config.RoomConfig{}), // no limits
"metadata noe exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}),
"metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits
}
for n, s := range notExceedsLimitsSvc {
@@ -104,12 +104,12 @@ func TestMetaDataLimits(t *testing.T) {
}
}
func newTestRoomService(conf config.RoomConfig) *TestRoomService {
func newTestRoomService(limitConf config.LimitConfig) *TestRoomService {
router := &routingfakes.FakeRouter{}
allocator := &servicefakes.FakeRoomAllocator{}
store := &servicefakes.FakeServiceStore{}
svc, err := service.NewRoomService(
conf,
limitConf,
config.APIConfig{ExecutionTimeout: 2},
rpc.PSRPCConfig{},
router,
+3 -3
View File
@@ -120,7 +120,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
if claims.Identity == "" {
return "", pi, http.StatusBadRequest, ErrIdentityEmpty
}
if limit := s.config.Room.MaxParticipantIdentityLength; limit > 0 && len(claims.Identity) > limit {
if limit := s.config.Limit.MaxParticipantIdentityLength; limit > 0 && len(claims.Identity) > limit {
return "", pi, http.StatusBadRequest, fmt.Errorf("%w: max length %d", ErrParticipantIdentityExceedsLimits, limit)
}
@@ -136,7 +136,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
if onlyName != "" {
roomName = onlyName
}
if limit := s.config.Room.MaxRoomNameLength; limit > 0 && len(roomName) > limit {
if limit := s.config.Limit.MaxRoomNameLength; limit > 0 && len(roomName) > limit {
return "", pi, http.StatusBadRequest, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, limit)
}
@@ -508,7 +508,7 @@ func (s *RTCService) DrainConnections(interval time.Duration) {
defer t.Stop()
for c := range conns {
c.Close()
_ = c.Close()
<-t.C
}
}
+3 -3
View File
@@ -54,7 +54,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
createClientConfiguration,
createForwardStats,
routing.CreateRouter,
getRoomConf,
getLimitConf,
config.DefaultAPIConfig,
wire.Bind(new(routing.MessageRouter), new(routing.Router)),
wire.Bind(new(livekit.RoomService), new(*RoomService)),
@@ -221,8 +221,8 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}
func getRoomConf(config *config.Config) config.RoomConfig {
return config.Room
func getLimitConf(config *config.Config) config.LimitConfig {
return config.Limit
}
func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig {
+4 -4
View File
@@ -36,7 +36,7 @@ import (
// Injectors from wire.go:
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
roomConfig := getRoomConf(conf)
limitConfig := getLimitConf(conf)
apiConfig := config.DefaultAPIConfig()
psrpcConfig := getPSRPCConfig(conf)
universalClient, err := createRedisClient(conf)
@@ -96,7 +96,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
@@ -272,8 +272,8 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}
func getRoomConf(config2 *config.Config) config.RoomConfig {
return config2.Room
func getLimitConf(config2 *config.Config) config.LimitConfig {
return config2.Limit
}
func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig {
+12
View File
@@ -35,6 +35,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -110,6 +111,7 @@ type Options struct {
Publish string
ClientInfo *livekit.ClientInfo
DisabledCodecs []webrtc.RTPCodecCapability
TokenCustomizer func(token *auth.AccessToken, grants *auth.VideoGrant)
SignalRequestInterceptor SignalRequestInterceptor
SignalResponseInterceptor SignalResponseInterceptor
}
@@ -563,6 +565,16 @@ func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.Sig
})
}
func (c *RTCClient) SetAttributes(attrs map[string]string) error {
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_UpdateMetadata{
UpdateMetadata: &livekit.UpdateParticipantMetadata{
Attributes: attrs,
},
},
})
}
func (c *RTCClient) hasPrimaryEverConnected() bool {
if c.subscriberAsPrimary.Load() {
return c.subscriber.HasEverConnected()
+11 -3
View File
@@ -202,7 +202,11 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer {
// creates a client and runs against server
func createRTCClient(name string, port int, opts *testclient.Options) *testclient.RTCClient {
token := joinToken(testRoom, name)
var customizer func(token *auth.AccessToken, grants *auth.VideoGrant)
if opts != nil {
customizer = opts.TokenCustomizer
}
token := joinToken(testRoom, name, customizer)
ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts)
if err != nil {
panic(err)
@@ -241,12 +245,16 @@ func redisClient() *redis.Client {
})
}
func joinToken(room, name string) string {
func joinToken(room, name string, customFn func(token *auth.AccessToken, grants *auth.VideoGrant)) string {
at := auth.NewAccessToken(testApiKey, testApiSecret).
AddGrant(&auth.VideoGrant{RoomJoin: true, Room: room}).
SetIdentity(name).
SetName(name).
SetMetadata("metadata" + name)
grant := &auth.VideoGrant{RoomJoin: true, Room: room}
if customFn != nil {
customFn(at, grant)
}
at.AddGrant(grant)
t, err := at.ToJWT()
if err != nil {
panic(err)
+75
View File
@@ -229,6 +229,81 @@ func TestMultiNodeRefreshToken(t *testing.T) {
})
}
// ensure that token accurately reflects out of band updates
func TestMultiNodeUpdateAttributes(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
_, _, finish := setupMultiNodeTest("TestMultiNodeUpdateAttributes")
defer finish()
c1 := createRTCClient("au1", defaultServerPort, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au1",
})
},
})
c2 := createRTCClient("au2", secondServerPort, &client.Options{
TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
token.SetAttributes(map[string]string{
"mykey": "au2",
})
grants.SetCanUpdateOwnMetadata(true)
},
})
waitUntilConnected(t, c1, c2)
testutils.WithTimeout(t, func() string {
rc2 := c1.GetRemoteParticipant(c2.ID())
rc1 := c2.GetRemoteParticipant(c1.ID())
if rc2 == nil || rc1 == nil {
return "participants could not see each other"
}
if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" {
return "rc1's initial attributes are incorrect"
}
if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" {
return "rc2's initial attributes are incorrect"
}
return ""
})
// this one should not go through
_ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"})
_ = c2.SetAttributes(map[string]string{"secondkey": "au2"})
// updates using room API should succeed
_, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{
Room: testRoom,
Identity: "au1",
Attributes: map[string]string{
"secondkey": "au1",
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
rc1 := c2.GetRemoteParticipant(c1.ID())
rc2 := c1.GetRemoteParticipant(c2.ID())
if rc1.Attributes["secondkey"] != "au1" {
return "au1's attribute update failed"
}
if rc2.Attributes["secondkey"] != "au2" {
return "au2's attribute update failed"
}
if rc1.Attributes["mykey"] != "au1" {
return "au1's mykey should not change"
}
if rc2.Attributes["mykey"] != "au2" {
return "au2's mykey should not change"
}
return ""
})
}
func TestMultiNodeRevokePublishPermission(t *testing.T) {
_, _, finish := setupMultiNodeTest("TestMultiNodeRevokePublishPermission")
defer finish()
+2 -2
View File
@@ -408,12 +408,12 @@ func TestAutoCreate(t *testing.T) {
waitForServerToStart(s)
token := joinToken(testRoom, "start-before-create")
token := joinToken(testRoom, "start-before-create", nil)
_, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil)
require.Error(t, err)
// second join should also fail
token = joinToken(testRoom, "start-before-create-2")
token = joinToken(testRoom, "start-before-create-2", nil)
_, err = testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil)
require.Error(t, err)
})