diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 0b797a294..b4e26d020 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -212,6 +212,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * addTrackParams := types.AddTrackParams{ Stereo: info.Stereo, } + sub.VerifySubscribeParticipantInfo(subTrack.PublisherID(), subTrack.PublisherVersion()) if sub.ProtocolVersion().SupportsTransceiverReuse() { // // AddTrack will create a new transceiver or re-use an unused one diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 376ddc0c9..1496f3790 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -86,6 +86,7 @@ type ParticipantParams struct { AdaptiveStream bool AllowTCPFallback bool TURNSEnabled bool + GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo } type ParticipantImpl struct { @@ -192,7 +193,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { var err error // keep last participants and when updates were sent - if p.updateCache, err = lru.New(32); err != nil { + if p.updateCache, err = lru.New(128); err != nil { return nil, err } @@ -858,6 +859,18 @@ func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID, return nil } +func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) { + if v, ok := p.updateCache.Get(pID); ok && v.(uint32) >= version { + return + } + + if f := p.params.GetParticipantInfo; f != nil { + if info := f(pID); info != nil { + p.SendParticipantUpdate([]*livekit.ParticipantInfo{info}) + } + } +} + // AddSubscribedTrack adds a track to the participant's subscribed list func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { p.lock.Lock() diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 79e9649b1..83e2ceb4f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -273,6 +273,7 @@ type LocalParticipant interface { RemoveSubscribedTrack(st SubscribedTrack) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error GetSubscribedTracks() []SubscribedTrack + VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []livekit.ParticipantID diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 33d8e9b63..c49818bd9 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -755,6 +755,12 @@ type FakeLocalParticipant struct { updateVideoLayersReturnsOnCall map[int]struct { result1 error } + VerifySubscribeParticipantInfoStub func(livekit.ParticipantID, uint32) + verifySubscribeParticipantInfoMutex sync.RWMutex + verifySubscribeParticipantInfoArgsForCall []struct { + arg1 livekit.ParticipantID + arg2 uint32 + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -4750,6 +4756,39 @@ func (fake *FakeLocalParticipant) UpdateVideoLayersReturnsOnCall(i int, result1 }{result1} } +func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfo(arg1 livekit.ParticipantID, arg2 uint32) { + fake.verifySubscribeParticipantInfoMutex.Lock() + fake.verifySubscribeParticipantInfoArgsForCall = append(fake.verifySubscribeParticipantInfoArgsForCall, struct { + arg1 livekit.ParticipantID + arg2 uint32 + }{arg1, arg2}) + stub := fake.VerifySubscribeParticipantInfoStub + fake.recordInvocation("VerifySubscribeParticipantInfo", []interface{}{arg1, arg2}) + fake.verifySubscribeParticipantInfoMutex.Unlock() + if stub != nil { + fake.VerifySubscribeParticipantInfoStub(arg1, arg2) + } +} + +func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfoCallCount() int { + fake.verifySubscribeParticipantInfoMutex.RLock() + defer fake.verifySubscribeParticipantInfoMutex.RUnlock() + return len(fake.verifySubscribeParticipantInfoArgsForCall) +} + +func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfoCalls(stub func(livekit.ParticipantID, uint32)) { + fake.verifySubscribeParticipantInfoMutex.Lock() + defer fake.verifySubscribeParticipantInfoMutex.Unlock() + fake.VerifySubscribeParticipantInfoStub = stub +} + +func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfoArgsForCall(i int) (livekit.ParticipantID, uint32) { + fake.verifySubscribeParticipantInfoMutex.RLock() + defer fake.verifySubscribeParticipantInfoMutex.RUnlock() + argsForCall := fake.verifySubscribeParticipantInfoArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -4923,6 +4962,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.updateSubscriptionPermissionMutex.RUnlock() fake.updateVideoLayersMutex.RLock() defer fake.updateVideoLayersMutex.RUnlock() + fake.verifySubscribeParticipantInfoMutex.RLock() + defer fake.verifySubscribeParticipantInfoMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 2a0f90406..4b9060c53 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -297,6 +297,12 @@ func (r *RoomManager) StartSession( AdaptiveStream: pi.AdaptiveStream, AllowTCPFallback: allowFallback, TURNSEnabled: r.config.IsTURNSEnabled(), + GetParticipantInfo: func(pID livekit.ParticipantID) *livekit.ParticipantInfo { + if p := room.GetParticipantBySid(pID); p != nil { + return p.ToProto() + } + return nil + }, }) if err != nil { return err