diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 532030aac..cb9f68e33 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -131,6 +131,7 @@ type ParticipantImpl struct { // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo + requireBroadcast bool // queued participant updates before join response is sent // guarded by updateLock queuedUpdates []*livekit.ParticipantInfo @@ -324,6 +325,7 @@ func (p *ParticipantImpl) SetMetadata(metadata string) { } p.grants.Metadata = metadata + p.requireBroadcast = p.requireBroadcast || metadata != "" p.dirty.Store(true) onParticipantUpdate := p.onParticipantUpdate @@ -364,6 +366,9 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio canSubscribe := video.GetCanSubscribe() onParticipantUpdate := p.onParticipantUpdate onClaimsChanged := p.onClaimsChanged + + isPublisher := canPublish && p.TransportManager.IsPublisherEstablished() + p.requireBroadcast = p.requireBroadcast || isPublisher p.lock.Unlock() // publish permission has been revoked then remove offending tracks @@ -390,7 +395,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio } // update isPublisher attribute - p.isPublisher.Store(canPublish && p.TransportManager.IsPublisherEstablished()) + p.isPublisher.Store(isPublisher) if onParticipantUpdate != nil { onParticipantUpdate(p) @@ -401,6 +406,12 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio return true } +func (p *ParticipantImpl) CanSkipBroadcast() bool { + p.lock.RLock() + defer p.lock.RUnlock() + return !p.requireBroadcast +} + func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) { v := p.version.Load() piv := p.timedVersion.Load() @@ -1081,10 +1092,11 @@ func (p *ParticipantImpl) setupUpTrackManager() { }) p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) { - p.dirty.Store(true) p.lock.RLock() onTrackUpdated := p.onTrackUpdated p.lock.RUnlock() + + p.dirty.Store(true) if onTrackUpdated != nil { onTrackUpdated(p, track) } @@ -1134,6 +1146,10 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { func (p *ParticipantImpl) setIsPublisher(isPublisher bool) { if p.isPublisher.Swap(isPublisher) != isPublisher { + p.lock.Lock() + p.requireBroadcast = true + p.lock.Unlock() + p.dirty.Store(true) // trigger update as well if participant is already fully connected diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 1163cc1d2..625c3ecb0 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -176,6 +176,7 @@ type Participant interface { Identity() livekit.ParticipantIdentity State() livekit.ParticipantInfo_State + CanSkipBroadcast() bool ToProto() *livekit.ParticipantInfo SetName(name string) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index f4966a5c5..33416aa05 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -89,6 +89,16 @@ type FakeLocalParticipant struct { canPublishSourceReturnsOnCall map[int]struct { result1 bool } + CanSkipBroadcastStub func() bool + canSkipBroadcastMutex sync.RWMutex + canSkipBroadcastArgsForCall []struct { + } + canSkipBroadcastReturns struct { + result1 bool + } + canSkipBroadcastReturnsOnCall map[int]struct { + result1 bool + } CanSubscribeStub func() bool canSubscribeMutex sync.RWMutex canSubscribeArgsForCall []struct { @@ -340,10 +350,6 @@ type FakeLocalParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } - InvalidateVersionStub func() - invalidateVersionMutex sync.RWMutex - invalidateVersionArgsForCall []struct { - } IsClosedStub func() bool isClosedMutex sync.RWMutex isClosedArgsForCall []struct { @@ -1202,6 +1208,59 @@ func (fake *FakeLocalParticipant) CanPublishSourceReturnsOnCall(i int, result1 b }{result1} } +func (fake *FakeLocalParticipant) CanSkipBroadcast() bool { + fake.canSkipBroadcastMutex.Lock() + ret, specificReturn := fake.canSkipBroadcastReturnsOnCall[len(fake.canSkipBroadcastArgsForCall)] + fake.canSkipBroadcastArgsForCall = append(fake.canSkipBroadcastArgsForCall, struct { + }{}) + stub := fake.CanSkipBroadcastStub + fakeReturns := fake.canSkipBroadcastReturns + fake.recordInvocation("CanSkipBroadcast", []interface{}{}) + fake.canSkipBroadcastMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) CanSkipBroadcastCallCount() int { + fake.canSkipBroadcastMutex.RLock() + defer fake.canSkipBroadcastMutex.RUnlock() + return len(fake.canSkipBroadcastArgsForCall) +} + +func (fake *FakeLocalParticipant) CanSkipBroadcastCalls(stub func() bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = stub +} + +func (fake *FakeLocalParticipant) CanSkipBroadcastReturns(result1 bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = nil + fake.canSkipBroadcastReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = nil + if fake.canSkipBroadcastReturnsOnCall == nil { + fake.canSkipBroadcastReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.canSkipBroadcastReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) CanSubscribe() bool { fake.canSubscribeMutex.Lock() ret, specificReturn := fake.canSubscribeReturnsOnCall[len(fake.canSubscribeArgsForCall)] @@ -2537,30 +2596,6 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P }{result1} } -func (fake *FakeLocalParticipant) InvalidateVersion() { - fake.invalidateVersionMutex.Lock() - fake.invalidateVersionArgsForCall = append(fake.invalidateVersionArgsForCall, struct { - }{}) - stub := fake.InvalidateVersionStub - fake.recordInvocation("InvalidateVersion", []interface{}{}) - fake.invalidateVersionMutex.Unlock() - if stub != nil { - fake.InvalidateVersionStub() - } -} - -func (fake *FakeLocalParticipant) InvalidateVersionCallCount() int { - fake.invalidateVersionMutex.RLock() - defer fake.invalidateVersionMutex.RUnlock() - return len(fake.invalidateVersionArgsForCall) -} - -func (fake *FakeLocalParticipant) InvalidateVersionCalls(stub func()) { - fake.invalidateVersionMutex.Lock() - defer fake.invalidateVersionMutex.Unlock() - fake.InvalidateVersionStub = stub -} - func (fake *FakeLocalParticipant) IsClosed() bool { fake.isClosedMutex.Lock() ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)] @@ -5405,6 +5440,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.canPublishDataMutex.RUnlock() fake.canPublishSourceMutex.RLock() defer fake.canPublishSourceMutex.RUnlock() + fake.canSkipBroadcastMutex.RLock() + defer fake.canSkipBroadcastMutex.RUnlock() fake.canSubscribeMutex.RLock() defer fake.canSubscribeMutex.RUnlock() fake.claimGrantsMutex.RLock() @@ -5457,8 +5494,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() - fake.invalidateVersionMutex.RLock() - defer fake.invalidateVersionMutex.RUnlock() fake.isClosedMutex.RLock() defer fake.isClosedMutex.RUnlock() fake.isDisconnectedMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 404af0a17..c32b8a6ed 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -10,6 +10,16 @@ import ( ) type FakeParticipant struct { + CanSkipBroadcastStub func() bool + canSkipBroadcastMutex sync.RWMutex + canSkipBroadcastArgsForCall []struct { + } + canSkipBroadcastReturns struct { + result1 bool + } + canSkipBroadcastReturnsOnCall map[int]struct { + result1 bool + } CloseStub func(bool, types.ParticipantCloseReason) error closeMutex sync.RWMutex closeArgsForCall []struct { @@ -197,6 +207,59 @@ type FakeParticipant struct { invocationsMutex sync.RWMutex } +func (fake *FakeParticipant) CanSkipBroadcast() bool { + fake.canSkipBroadcastMutex.Lock() + ret, specificReturn := fake.canSkipBroadcastReturnsOnCall[len(fake.canSkipBroadcastArgsForCall)] + fake.canSkipBroadcastArgsForCall = append(fake.canSkipBroadcastArgsForCall, struct { + }{}) + stub := fake.CanSkipBroadcastStub + fakeReturns := fake.canSkipBroadcastReturns + fake.recordInvocation("CanSkipBroadcast", []interface{}{}) + fake.canSkipBroadcastMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) CanSkipBroadcastCallCount() int { + fake.canSkipBroadcastMutex.RLock() + defer fake.canSkipBroadcastMutex.RUnlock() + return len(fake.canSkipBroadcastArgsForCall) +} + +func (fake *FakeParticipant) CanSkipBroadcastCalls(stub func() bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = stub +} + +func (fake *FakeParticipant) CanSkipBroadcastReturns(result1 bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = nil + fake.canSkipBroadcastReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool) { + fake.canSkipBroadcastMutex.Lock() + defer fake.canSkipBroadcastMutex.Unlock() + fake.CanSkipBroadcastStub = nil + if fake.canSkipBroadcastReturnsOnCall == nil { + fake.canSkipBroadcastReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.canSkipBroadcastReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error { fake.closeMutex.Lock() ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] @@ -1165,6 +1228,8 @@ func (fake *FakeParticipant) UpdateVideoLayersReturnsOnCall(i int, result1 error func (fake *FakeParticipant) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.canSkipBroadcastMutex.RLock() + defer fake.canSkipBroadcastMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() fake.debugInfoMutex.RLock()