update participant to support signal broadcast skipping (#1657)

* update participant to support signal broadcast skipping

* cleanup

* lock

* feedback

* order

* update requireBroadcast in SetPermissions
This commit is contained in:
Paul Wells
2023-04-26 17:11:33 -07:00
committed by GitHub
parent 9db46bb866
commit 11eedf4514
4 changed files with 149 additions and 32 deletions

View File

@@ -131,6 +131,7 @@ type ParticipantImpl struct {
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
requireBroadcast bool
// queued participant updates before join response is sent
// guarded by updateLock
queuedUpdates []*livekit.ParticipantInfo
@@ -324,6 +325,7 @@ func (p *ParticipantImpl) SetMetadata(metadata string) {
}
p.grants.Metadata = metadata
p.requireBroadcast = p.requireBroadcast || metadata != ""
p.dirty.Store(true)
onParticipantUpdate := p.onParticipantUpdate
@@ -364,6 +366,9 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
canSubscribe := video.GetCanSubscribe()
onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
isPublisher := canPublish && p.TransportManager.IsPublisherEstablished()
p.requireBroadcast = p.requireBroadcast || isPublisher
p.lock.Unlock()
// publish permission has been revoked then remove offending tracks
@@ -390,7 +395,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
}
// update isPublisher attribute
p.isPublisher.Store(canPublish && p.TransportManager.IsPublisherEstablished())
p.isPublisher.Store(isPublisher)
if onParticipantUpdate != nil {
onParticipantUpdate(p)
@@ -401,6 +406,12 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
return true
}
func (p *ParticipantImpl) CanSkipBroadcast() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return !p.requireBroadcast
}
func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) {
v := p.version.Load()
piv := p.timedVersion.Load()
@@ -1081,10 +1092,11 @@ func (p *ParticipantImpl) setupUpTrackManager() {
})
p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) {
p.dirty.Store(true)
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
p.dirty.Store(true)
if onTrackUpdated != nil {
onTrackUpdated(p, track)
}
@@ -1134,6 +1146,10 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
if p.isPublisher.Swap(isPublisher) != isPublisher {
p.lock.Lock()
p.requireBroadcast = true
p.lock.Unlock()
p.dirty.Store(true)
// trigger update as well if participant is already fully connected

View File

@@ -176,6 +176,7 @@ type Participant interface {
Identity() livekit.ParticipantIdentity
State() livekit.ParticipantInfo_State
CanSkipBroadcast() bool
ToProto() *livekit.ParticipantInfo
SetName(name string)

View File

@@ -89,6 +89,16 @@ type FakeLocalParticipant struct {
canPublishSourceReturnsOnCall map[int]struct {
result1 bool
}
CanSkipBroadcastStub func() bool
canSkipBroadcastMutex sync.RWMutex
canSkipBroadcastArgsForCall []struct {
}
canSkipBroadcastReturns struct {
result1 bool
}
canSkipBroadcastReturnsOnCall map[int]struct {
result1 bool
}
CanSubscribeStub func() bool
canSubscribeMutex sync.RWMutex
canSubscribeArgsForCall []struct {
@@ -340,10 +350,6 @@ type FakeLocalParticipant struct {
identityReturnsOnCall map[int]struct {
result1 livekit.ParticipantIdentity
}
InvalidateVersionStub func()
invalidateVersionMutex sync.RWMutex
invalidateVersionArgsForCall []struct {
}
IsClosedStub func() bool
isClosedMutex sync.RWMutex
isClosedArgsForCall []struct {
@@ -1202,6 +1208,59 @@ func (fake *FakeLocalParticipant) CanPublishSourceReturnsOnCall(i int, result1 b
}{result1}
}
func (fake *FakeLocalParticipant) CanSkipBroadcast() bool {
fake.canSkipBroadcastMutex.Lock()
ret, specificReturn := fake.canSkipBroadcastReturnsOnCall[len(fake.canSkipBroadcastArgsForCall)]
fake.canSkipBroadcastArgsForCall = append(fake.canSkipBroadcastArgsForCall, struct {
}{})
stub := fake.CanSkipBroadcastStub
fakeReturns := fake.canSkipBroadcastReturns
fake.recordInvocation("CanSkipBroadcast", []interface{}{})
fake.canSkipBroadcastMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) CanSkipBroadcastCallCount() int {
fake.canSkipBroadcastMutex.RLock()
defer fake.canSkipBroadcastMutex.RUnlock()
return len(fake.canSkipBroadcastArgsForCall)
}
func (fake *FakeLocalParticipant) CanSkipBroadcastCalls(stub func() bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = stub
}
func (fake *FakeLocalParticipant) CanSkipBroadcastReturns(result1 bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = nil
fake.canSkipBroadcastReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = nil
if fake.canSkipBroadcastReturnsOnCall == nil {
fake.canSkipBroadcastReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.canSkipBroadcastReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) CanSubscribe() bool {
fake.canSubscribeMutex.Lock()
ret, specificReturn := fake.canSubscribeReturnsOnCall[len(fake.canSubscribeArgsForCall)]
@@ -2537,30 +2596,6 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P
}{result1}
}
func (fake *FakeLocalParticipant) InvalidateVersion() {
fake.invalidateVersionMutex.Lock()
fake.invalidateVersionArgsForCall = append(fake.invalidateVersionArgsForCall, struct {
}{})
stub := fake.InvalidateVersionStub
fake.recordInvocation("InvalidateVersion", []interface{}{})
fake.invalidateVersionMutex.Unlock()
if stub != nil {
fake.InvalidateVersionStub()
}
}
func (fake *FakeLocalParticipant) InvalidateVersionCallCount() int {
fake.invalidateVersionMutex.RLock()
defer fake.invalidateVersionMutex.RUnlock()
return len(fake.invalidateVersionArgsForCall)
}
func (fake *FakeLocalParticipant) InvalidateVersionCalls(stub func()) {
fake.invalidateVersionMutex.Lock()
defer fake.invalidateVersionMutex.Unlock()
fake.InvalidateVersionStub = stub
}
func (fake *FakeLocalParticipant) IsClosed() bool {
fake.isClosedMutex.Lock()
ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)]
@@ -5405,6 +5440,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.canPublishDataMutex.RUnlock()
fake.canPublishSourceMutex.RLock()
defer fake.canPublishSourceMutex.RUnlock()
fake.canSkipBroadcastMutex.RLock()
defer fake.canSkipBroadcastMutex.RUnlock()
fake.canSubscribeMutex.RLock()
defer fake.canSubscribeMutex.RUnlock()
fake.claimGrantsMutex.RLock()
@@ -5457,8 +5494,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.iDMutex.RUnlock()
fake.identityMutex.RLock()
defer fake.identityMutex.RUnlock()
fake.invalidateVersionMutex.RLock()
defer fake.invalidateVersionMutex.RUnlock()
fake.isClosedMutex.RLock()
defer fake.isClosedMutex.RUnlock()
fake.isDisconnectedMutex.RLock()

View File

@@ -10,6 +10,16 @@ import (
)
type FakeParticipant struct {
CanSkipBroadcastStub func() bool
canSkipBroadcastMutex sync.RWMutex
canSkipBroadcastArgsForCall []struct {
}
canSkipBroadcastReturns struct {
result1 bool
}
canSkipBroadcastReturnsOnCall map[int]struct {
result1 bool
}
CloseStub func(bool, types.ParticipantCloseReason) error
closeMutex sync.RWMutex
closeArgsForCall []struct {
@@ -197,6 +207,59 @@ type FakeParticipant struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeParticipant) CanSkipBroadcast() bool {
fake.canSkipBroadcastMutex.Lock()
ret, specificReturn := fake.canSkipBroadcastReturnsOnCall[len(fake.canSkipBroadcastArgsForCall)]
fake.canSkipBroadcastArgsForCall = append(fake.canSkipBroadcastArgsForCall, struct {
}{})
stub := fake.CanSkipBroadcastStub
fakeReturns := fake.canSkipBroadcastReturns
fake.recordInvocation("CanSkipBroadcast", []interface{}{})
fake.canSkipBroadcastMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) CanSkipBroadcastCallCount() int {
fake.canSkipBroadcastMutex.RLock()
defer fake.canSkipBroadcastMutex.RUnlock()
return len(fake.canSkipBroadcastArgsForCall)
}
func (fake *FakeParticipant) CanSkipBroadcastCalls(stub func() bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = stub
}
func (fake *FakeParticipant) CanSkipBroadcastReturns(result1 bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = nil
fake.canSkipBroadcastReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) CanSkipBroadcastReturnsOnCall(i int, result1 bool) {
fake.canSkipBroadcastMutex.Lock()
defer fake.canSkipBroadcastMutex.Unlock()
fake.CanSkipBroadcastStub = nil
if fake.canSkipBroadcastReturnsOnCall == nil {
fake.canSkipBroadcastReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.canSkipBroadcastReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) Close(arg1 bool, arg2 types.ParticipantCloseReason) error {
fake.closeMutex.Lock()
ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)]
@@ -1165,6 +1228,8 @@ func (fake *FakeParticipant) UpdateVideoLayersReturnsOnCall(i int, result1 error
func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.canSkipBroadcastMutex.RLock()
defer fake.canSkipBroadcastMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.debugInfoMutex.RLock()