diff --git a/config-sample.yaml b/config-sample.yaml index 9370819a4..6a647823a 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -183,8 +183,6 @@ keys: # # allow tracks to be unmuted remotely, defaults to false # # tracks can always be muted from the Room Service APIs # enable_remote_unmute: true -# # limit size of room and participant's metadata, 0 for no limit -# max_metadata_size: 0 # # control playout delay in ms of video track (and associated audio track) # playout_delay: # enabled: true @@ -311,3 +309,11 @@ keys: # # value less or equal than 0 means no limit. # subscription_limit_video: 0 # subscription_limit_audio: 0 +# # limit size of room and participant's metadata, 0 for no limit +# max_metadata_size: 0 +# # limit size of participant attributes, 0 for no limit +# max_attributes_size: 0 +# # limit length of room names +# max_room_name_length: 0 +# # limit length of participant identity +# max_participant_identity_length: 0 diff --git a/go.mod b/go.mod index d80c3a640..ef8dc0491 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 - github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed + github.com/livekit/protocol v1.18.0 github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 061c3305a..f7d3af548 100644 --- a/go.sum +++ b/go.sum @@ -152,8 +152,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed h1:S4avs1NKG6bBgHYuBOrQWnNxJSOdunGOB84BQfGzKmQ= -github.com/livekit/protocol v1.17.1-0.20240617184219-32c577d805ed/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4= +github.com/livekit/protocol v1.18.0 h1:LLOjKBA8rtnGpVGjAmKUROy7bv/l9q1wyn9hNmj8Sdg= +github.com/livekit/protocol v1.18.0/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/config/config.go b/pkg/config/config.go index c1b40cc12..306580414 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -231,17 +231,20 @@ type VideoConfig struct { type RoomConfig struct { // enable rooms to be automatically created - AutoCreate bool `yaml:"auto_create,omitempty"` - EnabledCodecs []CodecSpec `yaml:"enabled_codecs,omitempty"` - MaxParticipants uint32 `yaml:"max_participants,omitempty"` - EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"` - DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"` - EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"` - MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"` - PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"` - SyncStreams bool `yaml:"sync_streams,omitempty"` - MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` - MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + AutoCreate bool `yaml:"auto_create,omitempty"` + EnabledCodecs []CodecSpec `yaml:"enabled_codecs,omitempty"` + MaxParticipants uint32 `yaml:"max_participants,omitempty"` + EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"` + DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"` + EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"` + PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"` + SyncStreams bool `yaml:"sync_streams,omitempty"` + // deprecated, moved to limits + MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"` + // deprecated, moved to limits + MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` + // deprecated, moved to limits + MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` } type CodecSpec struct { @@ -300,6 +303,11 @@ type LimitConfig struct { BytesPerSec float32 `yaml:"bytes_per_sec,omitempty"` SubscriptionLimitVideo int32 `yaml:"subscription_limit_video,omitempty"` SubscriptionLimitAudio int32 `yaml:"subscription_limit_audio,omitempty"` + MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"` + // total size of all attributes on a participant + MaxAttributesSize uint32 `yaml:"max_attributes_size,omitempty"` + MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` + MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` } type IngressConfig struct { @@ -494,8 +502,12 @@ var DefaultConfig = Config{ {Mime: webrtc.MimeTypeVP9}, {Mime: webrtc.MimeTypeAV1}, }, - EmptyTimeout: 5 * 60, - DepartureTimeout: 20, + EmptyTimeout: 5 * 60, + DepartureTimeout: 20, + }, + Limit: LimitConfig{ + MaxMetadataSize: 64000, + MaxAttributesSize: 64000, MaxRoomNameLength: 256, MaxParticipantIdentityLength: 256, }, @@ -585,6 +597,17 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c conf.Logging.ComponentLevels["pion"] = conf.Logging.PionLevel } + // copy over legacy limits + if conf.Room.MaxMetadataSize != 0 { + conf.Limit.MaxMetadataSize = conf.Room.MaxMetadataSize + } + if conf.Room.MaxParticipantIdentityLength != 0 { + conf.Limit.MaxParticipantIdentityLength = conf.Room.MaxParticipantIdentityLength + } + if conf.Room.MaxRoomNameLength != 0 { + conf.Limit.MaxRoomNameLength = conf.Room.MaxRoomNameLength + } + return &conf, nil } diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index 32df7d91f..08e8ef33e 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -14,7 +14,9 @@ package rtc -import "errors" +import ( + "errors" +) var ( ErrRoomClosed = errors.New("room has already closed") @@ -29,6 +31,7 @@ var ( ErrEmptyParticipantID = errors.New("participant ID cannot be empty") ErrMissingGrants = errors.New("VideoGrant is missing") ErrInternalError = errors.New("internal error") + ErrAttributeExceedsLimits = errors.New("attribute size exceeds limits") // Track subscription related ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track") diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ed64cda3b..95636b1d9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -136,6 +136,7 @@ type ParticipantParams struct { VersionGenerator utils.TimedVersionGenerator TrackResolver types.MediaTrackResolver DisableDynacast bool + MaxAttributesSize uint32 SubscriberAllowPause bool SubscriptionLimitAudio int32 SubscriptionLimitVideo int32 @@ -459,6 +460,52 @@ func (p *ParticipantImpl) SetMetadata(metadata string) { } } +func (p *ParticipantImpl) SetAttributes(attrs map[string]string) error { + p.lock.Lock() + grants := p.grants.Load().Clone() + if grants.Attributes == nil { + grants.Attributes = make(map[string]string) + } + var keysToDelete []string + for k, v := range attrs { + if v == "" { + keysToDelete = append(keysToDelete, k) + } else { + grants.Attributes[k] = v + } + } + for _, k := range keysToDelete { + delete(grants.Attributes, k) + } + + maxAttributesSize := p.params.MaxAttributesSize + if maxAttributesSize > 0 { + total := 0 + for k, v := range grants.Attributes { + total += len(k) + len(v) + } + if uint32(total) > maxAttributesSize { + p.lock.Unlock() + return ErrAttributeExceedsLimits + } + } + + p.grants.Store(grants) + p.dirty.Store(true) + + onParticipantUpdate := p.onParticipantUpdate + onClaimsChanged := p.onClaimsChanged + p.lock.Unlock() + + if onParticipantUpdate != nil { + onParticipantUpdate(p) + } + if onClaimsChanged != nil { + onClaimsChanged(p) + } + return nil +} + func (p *ParticipantImpl) ClaimGrants() *auth.ClaimGrants { return p.grants.Load() } @@ -551,6 +598,7 @@ func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils. Version: v, Permission: grants.Video.ToPermission(), Metadata: grants.Metadata, + Attributes: grants.Attributes, Region: p.params.Region, IsPublisher: p.IsPublisher(), Kind: grants.GetParticipantKind(), diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 560dc06a1..9a5ecfc7b 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -819,13 +819,24 @@ func (r *Room) SetMetadata(metadata string) <-chan struct{} { return r.protoProxy.MarkDirty(true) } -func (r *Room) UpdateParticipantMetadata(participant types.LocalParticipant, name string, metadata string) { +func (r *Room) UpdateParticipantMetadata( + participant types.LocalParticipant, + name string, + metadata string, + attributes map[string]string, +) error { + if attributes != nil && len(attributes) > 0 { + if err := participant.SetAttributes(attributes); err != nil { + return err + } + } if metadata != "" { participant.SetMetadata(metadata) } if name != "" { participant.SetName(name) } + return nil } func (r *Room) sendRoomUpdate() { diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index d7d9c75ad..f9202e039 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -93,7 +93,15 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case *livekit.SignalRequest_UpdateMetadata: if participant.ClaimGrants().Video.GetCanUpdateOwnMetadata() { - room.UpdateParticipantMetadata(participant, msg.UpdateMetadata.Name, msg.UpdateMetadata.Metadata) + err := room.UpdateParticipantMetadata( + participant, + msg.UpdateMetadata.Name, + msg.UpdateMetadata.Metadata, + msg.UpdateMetadata.Attributes, + ) + if err != nil { + pLogger.Warnw("could not update metadata", err) + } } case *livekit.SignalRequest_UpdateAudioTrack: diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 93bb32206..e548900e8 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -255,9 +255,6 @@ type Participant interface { CanSkipBroadcast() bool ToProto() *livekit.ParticipantInfo - SetName(name string) - SetMetadata(metadata string) - IsPublisher() bool GetPublishedTrack(trackID livekit.TrackID) MediaTrack GetPublishedTracks() []MediaTrack @@ -329,6 +326,11 @@ type LocalParticipant interface { SetSignalSourceValid(valid bool) HandleSignalSourceClose() + // updates + SetName(name string) + SetMetadata(metadata string) + SetAttributes(attributes map[string]string) error + // permissions ClaimGrants() *auth.ClaimGrants SetPermission(permission *livekit.ParticipantPermission) bool @@ -437,7 +439,7 @@ type Room interface { SimulateScenario(participant LocalParticipant, scenario *livekit.SimulateScenario) error ResolveMediaTrackForSubscriber(subIdentity livekit.ParticipantIdentity, trackID livekit.TrackID) MediaResolverResult GetLocalParticipants() []LocalParticipant - UpdateParticipantMetadata(participant LocalParticipant, name string, metadata string) + UpdateParticipantMetadata(participant LocalParticipant, name string, metadata string, attributes map[string]string) error } // MediaTrack represents a media track diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 8a4bf1b86..7c239d81e 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -733,6 +733,17 @@ type FakeLocalParticipant struct { sendSpeakerUpdateReturnsOnCall map[int]struct { result1 error } + SetAttributesStub func(map[string]string) error + setAttributesMutex sync.RWMutex + setAttributesArgsForCall []struct { + arg1 map[string]string + } + setAttributesReturns struct { + result1 error + } + setAttributesReturnsOnCall map[int]struct { + result1 error + } SetICEConfigStub func(*livekit.ICEConfig) setICEConfigMutex sync.RWMutex setICEConfigArgsForCall []struct { @@ -4881,6 +4892,67 @@ func (fake *FakeLocalParticipant) SendSpeakerUpdateReturnsOnCall(i int, result1 }{result1} } +func (fake *FakeLocalParticipant) SetAttributes(arg1 map[string]string) error { + fake.setAttributesMutex.Lock() + ret, specificReturn := fake.setAttributesReturnsOnCall[len(fake.setAttributesArgsForCall)] + fake.setAttributesArgsForCall = append(fake.setAttributesArgsForCall, struct { + arg1 map[string]string + }{arg1}) + stub := fake.SetAttributesStub + fakeReturns := fake.setAttributesReturns + fake.recordInvocation("SetAttributes", []interface{}{arg1}) + fake.setAttributesMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) SetAttributesCallCount() int { + fake.setAttributesMutex.RLock() + defer fake.setAttributesMutex.RUnlock() + return len(fake.setAttributesArgsForCall) +} + +func (fake *FakeLocalParticipant) SetAttributesCalls(stub func(map[string]string) error) { + fake.setAttributesMutex.Lock() + defer fake.setAttributesMutex.Unlock() + fake.SetAttributesStub = stub +} + +func (fake *FakeLocalParticipant) SetAttributesArgsForCall(i int) map[string]string { + fake.setAttributesMutex.RLock() + defer fake.setAttributesMutex.RUnlock() + argsForCall := fake.setAttributesArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) SetAttributesReturns(result1 error) { + fake.setAttributesMutex.Lock() + defer fake.setAttributesMutex.Unlock() + fake.SetAttributesStub = nil + fake.setAttributesReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) SetAttributesReturnsOnCall(i int, result1 error) { + fake.setAttributesMutex.Lock() + defer fake.setAttributesMutex.Unlock() + fake.SetAttributesStub = nil + if fake.setAttributesReturnsOnCall == nil { + fake.setAttributesReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.setAttributesReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) SetICEConfig(arg1 *livekit.ICEConfig) { fake.setICEConfigMutex.Lock() fake.setICEConfigArgsForCall = append(fake.setICEConfigArgsForCall, struct { @@ -6571,6 +6643,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.sendRoomUpdateMutex.RUnlock() fake.sendSpeakerUpdateMutex.RLock() defer fake.sendSpeakerUpdateMutex.RUnlock() + fake.setAttributesMutex.RLock() + defer fake.setAttributesMutex.RUnlock() fake.setICEConfigMutex.RLock() defer fake.setICEConfigMutex.RUnlock() fake.setMetadataMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 9494b87e6..9c74e6f63 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -175,16 +175,6 @@ type FakeParticipant struct { arg2 bool arg3 bool } - SetMetadataStub func(string) - setMetadataMutex sync.RWMutex - setMetadataArgsForCall []struct { - arg1 string - } - SetNameStub func(string) - setNameMutex sync.RWMutex - setNameArgsForCall []struct { - arg1 string - } StateStub func() livekit.ParticipantInfo_State stateMutex sync.RWMutex stateArgsForCall []struct { @@ -1115,70 +1105,6 @@ func (fake *FakeParticipant) RemovePublishedTrackArgsForCall(i int) (types.Media return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeParticipant) SetMetadata(arg1 string) { - fake.setMetadataMutex.Lock() - fake.setMetadataArgsForCall = append(fake.setMetadataArgsForCall, struct { - arg1 string - }{arg1}) - stub := fake.SetMetadataStub - fake.recordInvocation("SetMetadata", []interface{}{arg1}) - fake.setMetadataMutex.Unlock() - if stub != nil { - fake.SetMetadataStub(arg1) - } -} - -func (fake *FakeParticipant) SetMetadataCallCount() int { - fake.setMetadataMutex.RLock() - defer fake.setMetadataMutex.RUnlock() - return len(fake.setMetadataArgsForCall) -} - -func (fake *FakeParticipant) SetMetadataCalls(stub func(string)) { - fake.setMetadataMutex.Lock() - defer fake.setMetadataMutex.Unlock() - fake.SetMetadataStub = stub -} - -func (fake *FakeParticipant) SetMetadataArgsForCall(i int) string { - fake.setMetadataMutex.RLock() - defer fake.setMetadataMutex.RUnlock() - argsForCall := fake.setMetadataArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeParticipant) SetName(arg1 string) { - fake.setNameMutex.Lock() - fake.setNameArgsForCall = append(fake.setNameArgsForCall, struct { - arg1 string - }{arg1}) - stub := fake.SetNameStub - fake.recordInvocation("SetName", []interface{}{arg1}) - fake.setNameMutex.Unlock() - if stub != nil { - fake.SetNameStub(arg1) - } -} - -func (fake *FakeParticipant) SetNameCallCount() int { - fake.setNameMutex.RLock() - defer fake.setNameMutex.RUnlock() - return len(fake.setNameArgsForCall) -} - -func (fake *FakeParticipant) SetNameCalls(stub func(string)) { - fake.setNameMutex.Lock() - defer fake.setNameMutex.Unlock() - fake.SetNameStub = stub -} - -func (fake *FakeParticipant) SetNameArgsForCall(i int) string { - fake.setNameMutex.RLock() - defer fake.setNameMutex.RUnlock() - argsForCall := fake.setNameArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeParticipant) State() livekit.ParticipantInfo_State { fake.stateMutex.Lock() ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)] @@ -1561,10 +1487,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.kindMutex.RUnlock() fake.removePublishedTrackMutex.RLock() defer fake.removePublishedTrackMutex.RUnlock() - fake.setMetadataMutex.RLock() - defer fake.setMetadataMutex.RUnlock() - fake.setNameMutex.RLock() - defer fake.setNameMutex.RUnlock() fake.stateMutex.RLock() defer fake.stateMutex.RUnlock() fake.subscriptionPermissionMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_room.go b/pkg/rtc/types/typesfakes/fake_room.go index 0abe91d69..503bf73ad 100644 --- a/pkg/rtc/types/typesfakes/fake_room.go +++ b/pkg/rtc/types/typesfakes/fake_room.go @@ -82,12 +82,19 @@ type FakeRoom struct { syncStateReturnsOnCall map[int]struct { result1 error } - UpdateParticipantMetadataStub func(types.LocalParticipant, string, string) + UpdateParticipantMetadataStub func(types.LocalParticipant, string, string, map[string]string) error updateParticipantMetadataMutex sync.RWMutex updateParticipantMetadataArgsForCall []struct { arg1 types.LocalParticipant arg2 string arg3 string + arg4 map[string]string + } + updateParticipantMetadataReturns struct { + result1 error + } + updateParticipantMetadataReturnsOnCall map[int]struct { + result1 error } UpdateSubscriptionPermissionStub func(types.LocalParticipant, *livekit.SubscriptionPermission) error updateSubscriptionPermissionMutex sync.RWMutex @@ -492,19 +499,26 @@ func (fake *FakeRoom) SyncStateReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRoom) UpdateParticipantMetadata(arg1 types.LocalParticipant, arg2 string, arg3 string) { +func (fake *FakeRoom) UpdateParticipantMetadata(arg1 types.LocalParticipant, arg2 string, arg3 string, arg4 map[string]string) error { fake.updateParticipantMetadataMutex.Lock() + ret, specificReturn := fake.updateParticipantMetadataReturnsOnCall[len(fake.updateParticipantMetadataArgsForCall)] fake.updateParticipantMetadataArgsForCall = append(fake.updateParticipantMetadataArgsForCall, struct { arg1 types.LocalParticipant arg2 string arg3 string - }{arg1, arg2, arg3}) + arg4 map[string]string + }{arg1, arg2, arg3, arg4}) stub := fake.UpdateParticipantMetadataStub - fake.recordInvocation("UpdateParticipantMetadata", []interface{}{arg1, arg2, arg3}) + fakeReturns := fake.updateParticipantMetadataReturns + fake.recordInvocation("UpdateParticipantMetadata", []interface{}{arg1, arg2, arg3, arg4}) fake.updateParticipantMetadataMutex.Unlock() if stub != nil { - fake.UpdateParticipantMetadataStub(arg1, arg2, arg3) + return stub(arg1, arg2, arg3, arg4) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeRoom) UpdateParticipantMetadataCallCount() int { @@ -513,17 +527,40 @@ func (fake *FakeRoom) UpdateParticipantMetadataCallCount() int { return len(fake.updateParticipantMetadataArgsForCall) } -func (fake *FakeRoom) UpdateParticipantMetadataCalls(stub func(types.LocalParticipant, string, string)) { +func (fake *FakeRoom) UpdateParticipantMetadataCalls(stub func(types.LocalParticipant, string, string, map[string]string) error) { fake.updateParticipantMetadataMutex.Lock() defer fake.updateParticipantMetadataMutex.Unlock() fake.UpdateParticipantMetadataStub = stub } -func (fake *FakeRoom) UpdateParticipantMetadataArgsForCall(i int) (types.LocalParticipant, string, string) { +func (fake *FakeRoom) UpdateParticipantMetadataArgsForCall(i int) (types.LocalParticipant, string, string, map[string]string) { fake.updateParticipantMetadataMutex.RLock() defer fake.updateParticipantMetadataMutex.RUnlock() argsForCall := fake.updateParticipantMetadataArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeRoom) UpdateParticipantMetadataReturns(result1 error) { + fake.updateParticipantMetadataMutex.Lock() + defer fake.updateParticipantMetadataMutex.Unlock() + fake.UpdateParticipantMetadataStub = nil + fake.updateParticipantMetadataReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRoom) UpdateParticipantMetadataReturnsOnCall(i int, result1 error) { + fake.updateParticipantMetadataMutex.Lock() + defer fake.updateParticipantMetadataMutex.Unlock() + fake.UpdateParticipantMetadataStub = nil + if fake.updateParticipantMetadataReturnsOnCall == nil { + fake.updateParticipantMetadataReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateParticipantMetadataReturnsOnCall[i] = struct { + result1 error + }{result1} } func (fake *FakeRoom) UpdateSubscriptionPermission(arg1 types.LocalParticipant, arg2 *livekit.SubscriptionPermission) error { diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 26f24389d..4a5f4d37d 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -26,6 +26,7 @@ var ( ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist") ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified") ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits") + ErrAttributeExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "attribute size exceeds limits") ErrRoomNameExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "room name length exceeds limits") ErrParticipantIdentityExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "participant identity length exceeds limits") ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed") diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go index e974833ac..f3996d9a8 100644 --- a/pkg/service/ioservice_sip.go +++ b/pkg/service/ioservice_sip.go @@ -74,7 +74,7 @@ func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.E return nil, err } log.Debugw("SIP dispatch rule matched", "sipRule", best.SipDispatchRuleId) - resp, err := sip.EvaluateDispatchRule(best, req) + resp, err := sip.EvaluateDispatchRule(trunkID, best, req) if err != nil { return nil, err } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index b4d180788..3ac7520de 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -433,6 +433,7 @@ func (r *RoomManager) StartSession( AdaptiveStream: pi.AdaptiveStream, AllowTCPFallback: allowFallback, TURNSEnabled: r.config.IsTURNSEnabled(), + MaxAttributesSize: r.config.Limit.MaxAttributesSize, GetParticipantInfo: func(pID livekit.ParticipantID) *livekit.ParticipantInfo { if p := room.GetParticipantByID(pID); p != nil { return p.ToProto() @@ -708,8 +709,14 @@ func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.Update } participant.GetLogger().Debugw("updating participant", - "metadata", req.Metadata, "permission", req.Permission) - room.UpdateParticipantMetadata(participant, req.Name, req.Metadata) + "metadata", req.Metadata, + "permission", req.Permission, + "attributes", req.Attributes, + ) + err = room.UpdateParticipantMetadata(participant, req.Name, req.Metadata, req.Attributes) + if err != nil { + return nil, err + } if req.Permission != nil { participant.SetPermission(req.Permission) } diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 0bae86fab..19657b09a 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -33,9 +33,8 @@ import ( "github.com/livekit/protocol/rpc" ) -// A rooms service that supports a single node type RoomService struct { - roomConf config.RoomConfig + limitConf config.LimitConfig apiConf config.APIConfig psrpcConf rpc.PSRPCConfig router routing.MessageRouter @@ -49,7 +48,7 @@ type RoomService struct { } func NewRoomService( - roomConf config.RoomConfig, + limitConf config.LimitConfig, apiConf config.APIConfig, psrpcConf rpc.PSRPCConfig, router routing.MessageRouter, @@ -62,7 +61,7 @@ func NewRoomService( participantClient rpc.TypedParticipantClient, ) (svc *RoomService, err error) { svc = &RoomService{ - roomConf: roomConf, + limitConf: limitConf, apiConf: apiConf, psrpcConf: psrpcConf, router: router, @@ -87,7 +86,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, ErrEgressNotConnected } - if limit := s.roomConf.MaxRoomNameLength; limit > 0 && len(req.Name) > limit { + if limit := s.limitConf.MaxRoomNameLength; limit > 0 && len(req.Name) > limit { return nil, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, limit) } @@ -232,10 +231,20 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) - maxMetadataSize := int(s.roomConf.MaxMetadataSize) + maxMetadataSize := int(s.limitConf.MaxMetadataSize) if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize { return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize)) } + maxAttributeSize := int(s.limitConf.MaxAttributesSize) + if maxAttributeSize > 0 { + total := 0 + for key, val := range req.Attributes { + total += len(key) + len(val) + } + if total > maxAttributeSize { + return nil, twirp.InvalidArgumentError(ErrAttributeExceedsLimits.Error(), strconv.Itoa(maxAttributeSize)) + } + } if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil { return nil, twirpAuthError(err) @@ -270,7 +279,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) { AppendLogFields(ctx, "room", req.Room, "size", len(req.Metadata)) - maxMetadataSize := int(s.roomConf.MaxMetadataSize) + maxMetadataSize := int(s.limitConf.MaxMetadataSize) if maxMetadataSize > 0 && len(req.Metadata) > maxMetadataSize { return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize)) } diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index aa10a6399..71a6a4f73 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -34,7 +34,7 @@ import ( func TestDeleteRoom(t *testing.T) { t.Run("missing permissions", func(t *testing.T) { - svc := newTestRoomService(config.RoomConfig{}) + svc := newTestRoomService(config.LimitConfig{}) grant := &auth.ClaimGrants{ Video: &auth.VideoGrant{}, } @@ -48,7 +48,7 @@ func TestDeleteRoom(t *testing.T) { func TestMetaDataLimits(t *testing.T) { t.Run("metadata exceed limits", func(t *testing.T) { - svc := newTestRoomService(config.RoomConfig{MaxMetadataSize: 5}) + svc := newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}) grant := &auth.ClaimGrants{ Video: &auth.VideoGrant{}, } @@ -72,8 +72,8 @@ func TestMetaDataLimits(t *testing.T) { }) notExceedsLimitsSvc := map[string]*TestRoomService{ - "metadata noe exceeds limits": newTestRoomService(config.RoomConfig{MaxMetadataSize: 5}), - "metadata no limits": newTestRoomService(config.RoomConfig{}), // no limits + "metadata noe exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}), + "metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits } for n, s := range notExceedsLimitsSvc { @@ -104,12 +104,12 @@ func TestMetaDataLimits(t *testing.T) { } } -func newTestRoomService(conf config.RoomConfig) *TestRoomService { +func newTestRoomService(limitConf config.LimitConfig) *TestRoomService { router := &routingfakes.FakeRouter{} allocator := &servicefakes.FakeRoomAllocator{} store := &servicefakes.FakeServiceStore{} svc, err := service.NewRoomService( - conf, + limitConf, config.APIConfig{ExecutionTimeout: 2}, rpc.PSRPCConfig{}, router, diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index eaf0b2e87..5843231e2 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -120,7 +120,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic if claims.Identity == "" { return "", pi, http.StatusBadRequest, ErrIdentityEmpty } - if limit := s.config.Room.MaxParticipantIdentityLength; limit > 0 && len(claims.Identity) > limit { + if limit := s.config.Limit.MaxParticipantIdentityLength; limit > 0 && len(claims.Identity) > limit { return "", pi, http.StatusBadRequest, fmt.Errorf("%w: max length %d", ErrParticipantIdentityExceedsLimits, limit) } @@ -136,7 +136,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic if onlyName != "" { roomName = onlyName } - if limit := s.config.Room.MaxRoomNameLength; limit > 0 && len(roomName) > limit { + if limit := s.config.Limit.MaxRoomNameLength; limit > 0 && len(roomName) > limit { return "", pi, http.StatusBadRequest, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, limit) } @@ -508,7 +508,7 @@ func (s *RTCService) DrainConnections(interval time.Duration) { defer t.Stop() for c := range conns { - c.Close() + _ = c.Close() <-t.C } } diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 91d6df7d1..c856c7384 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -54,7 +54,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live createClientConfiguration, createForwardStats, routing.CreateRouter, - getRoomConf, + getLimitConf, config.DefaultAPIConfig, wire.Bind(new(routing.MessageRouter), new(routing.Router)), wire.Bind(new(livekit.RoomService), new(*RoomService)), @@ -221,8 +221,8 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } -func getRoomConf(config *config.Config) config.RoomConfig { - return config.Room +func getLimitConf(config *config.Config) config.LimitConfig { + return config.Limit } func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index f731de75e..de2ecf664 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -36,7 +36,7 @@ import ( // Injectors from wire.go: func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { - roomConfig := getRoomConf(conf) + limitConfig := getLimitConf(conf) apiConfig := config.DefaultAPIConfig() psrpcConfig := getPSRPCConfig(conf) universalClient, err := createRedisClient(conf) @@ -96,7 +96,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } @@ -272,8 +272,8 @@ func createClientConfiguration() clientconfiguration.ClientConfigurationManager return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } -func getRoomConf(config2 *config.Config) config.RoomConfig { - return config2.Room +func getLimitConf(config2 *config.Config) config.LimitConfig { + return config2.Limit } func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig { diff --git a/test/client/client.go b/test/client/client.go index 5d461a74a..1564b4b97 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -35,6 +35,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil/pkg/rtcconfig" + "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -110,6 +111,7 @@ type Options struct { Publish string ClientInfo *livekit.ClientInfo DisabledCodecs []webrtc.RTPCodecCapability + TokenCustomizer func(token *auth.AccessToken, grants *auth.VideoGrant) SignalRequestInterceptor SignalRequestInterceptor SignalResponseInterceptor SignalResponseInterceptor } @@ -563,6 +565,16 @@ func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.Sig }) } +func (c *RTCClient) SetAttributes(attrs map[string]string) error { + return c.SendRequest(&livekit.SignalRequest{ + Message: &livekit.SignalRequest_UpdateMetadata{ + UpdateMetadata: &livekit.UpdateParticipantMetadata{ + Attributes: attrs, + }, + }, + }) +} + func (c *RTCClient) hasPrimaryEverConnected() bool { if c.subscriberAsPrimary.Load() { return c.subscriber.HasEverConnected() diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 8fffe6b81..69ea68291 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -202,7 +202,11 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { // creates a client and runs against server func createRTCClient(name string, port int, opts *testclient.Options) *testclient.RTCClient { - token := joinToken(testRoom, name) + var customizer func(token *auth.AccessToken, grants *auth.VideoGrant) + if opts != nil { + customizer = opts.TokenCustomizer + } + token := joinToken(testRoom, name, customizer) ws, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", port), token, opts) if err != nil { panic(err) @@ -241,12 +245,16 @@ func redisClient() *redis.Client { }) } -func joinToken(room, name string) string { +func joinToken(room, name string, customFn func(token *auth.AccessToken, grants *auth.VideoGrant)) string { at := auth.NewAccessToken(testApiKey, testApiSecret). - AddGrant(&auth.VideoGrant{RoomJoin: true, Room: room}). SetIdentity(name). SetName(name). SetMetadata("metadata" + name) + grant := &auth.VideoGrant{RoomJoin: true, Room: room} + if customFn != nil { + customFn(at, grant) + } + at.AddGrant(grant) t, err := at.ToJWT() if err != nil { panic(err) diff --git a/test/multinode_test.go b/test/multinode_test.go index 8b1eb1d32..e964f3096 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -229,6 +229,81 @@ func TestMultiNodeRefreshToken(t *testing.T) { }) } +// ensure that token accurately reflects out of band updates +func TestMultiNodeUpdateAttributes(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + _, _, finish := setupMultiNodeTest("TestMultiNodeUpdateAttributes") + defer finish() + + c1 := createRTCClient("au1", defaultServerPort, &client.Options{ + TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { + token.SetAttributes(map[string]string{ + "mykey": "au1", + }) + }, + }) + c2 := createRTCClient("au2", secondServerPort, &client.Options{ + TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) { + token.SetAttributes(map[string]string{ + "mykey": "au2", + }) + grants.SetCanUpdateOwnMetadata(true) + }, + }) + waitUntilConnected(t, c1, c2) + + testutils.WithTimeout(t, func() string { + rc2 := c1.GetRemoteParticipant(c2.ID()) + rc1 := c2.GetRemoteParticipant(c1.ID()) + if rc2 == nil || rc1 == nil { + return "participants could not see each other" + } + if rc1.Attributes == nil || rc1.Attributes["mykey"] != "au1" { + return "rc1's initial attributes are incorrect" + } + if rc2.Attributes == nil || rc2.Attributes["mykey"] != "au2" { + return "rc2's initial attributes are incorrect" + } + return "" + }) + + // this one should not go through + _ = c1.SetAttributes(map[string]string{"mykey": "shouldnotchange"}) + _ = c2.SetAttributes(map[string]string{"secondkey": "au2"}) + + // updates using room API should succeed + _, err := roomClient.UpdateParticipant(contextWithToken(adminRoomToken(testRoom)), &livekit.UpdateParticipantRequest{ + Room: testRoom, + Identity: "au1", + Attributes: map[string]string{ + "secondkey": "au1", + }, + }) + require.NoError(t, err) + + testutils.WithTimeout(t, func() string { + rc1 := c2.GetRemoteParticipant(c1.ID()) + rc2 := c1.GetRemoteParticipant(c2.ID()) + if rc1.Attributes["secondkey"] != "au1" { + return "au1's attribute update failed" + } + if rc2.Attributes["secondkey"] != "au2" { + return "au2's attribute update failed" + } + if rc1.Attributes["mykey"] != "au1" { + return "au1's mykey should not change" + } + if rc2.Attributes["mykey"] != "au2" { + return "au2's mykey should not change" + } + return "" + }) +} + func TestMultiNodeRevokePublishPermission(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultiNodeRevokePublishPermission") defer finish() diff --git a/test/singlenode_test.go b/test/singlenode_test.go index fe2e55b07..e136a8e68 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -408,12 +408,12 @@ func TestAutoCreate(t *testing.T) { waitForServerToStart(s) - token := joinToken(testRoom, "start-before-create") + token := joinToken(testRoom, "start-before-create", nil) _, err := testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil) require.Error(t, err) // second join should also fail - token = joinToken(testRoom, "start-before-create-2") + token = joinToken(testRoom, "start-before-create-2", nil) _, err = testclient.NewWebSocketConn(fmt.Sprintf("ws://localhost:%d", defaultServerPort), token, nil) require.Error(t, err) })