Send active speaker update on subscription. (#676)

Newly joining participant does not get information about
currently active speaker till there is a speaker state change.
This addresses it by sending a speaker update on subscription
if the subscribed to participant is actively speaking.
This commit is contained in:
Raja Subramanian
2022-05-10 12:31:26 +05:30
committed by GitHub
parent bd7e3beda4
commit 9f37239af3
4 changed files with 65 additions and 1 deletions
+11 -1
View File
@@ -126,6 +126,7 @@ type ParticipantImpl struct {
onStateChange func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, *livekit.DataPacket)
onSubscribedTo func(types.LocalParticipant, livekit.ParticipantID)
migrateState atomic.Value // types.MigrateState
pendingOffer *webrtc.SessionDescription
@@ -440,6 +441,10 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *li
p.onDataPacket = callback
}
func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, livekit.ParticipantID)) {
p.onSubscribedTo = callback
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
p.onClose = callback
}
@@ -815,7 +820,12 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
subTrack.UpdateSubscriberSettings(settings)
}
p.subscribedTo.Store(subTrack.PublisherID(), struct{}{})
publisherID := subTrack.PublisherID()
isAlreadySubscribed := p.isSubscribedTo(publisherID)
p.subscribedTo.Store(publisherID, struct{}{})
if !isAlreadySubscribed && p.onSubscribedTo != nil {
p.onSubscribedTo(p, publisherID)
}
}
// RemoveSubscribedTrack removes a track to the participant's subscribed list
+14
View File
@@ -225,6 +225,19 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
participant.OnTrackUpdated(r.onTrackUpdated)
participant.OnParticipantUpdate(r.onParticipantUpdate)
participant.OnDataPacket(r.onDataPacket)
participant.OnSubscribedTo(func(p types.LocalParticipant, publisherID livekit.ParticipantID) {
// when a participant subscribed to another participant,
// send speaker update if the subscribed to participant is active.
go func() {
speakers := r.GetActiveSpeakers()
for _, speaker := range speakers {
if livekit.ParticipantID(speaker.Sid) == publisherID {
p.SendSpeakerUpdate(speakers)
break
}
}
}()
})
r.Logger.Infow("new participant joined",
"pID", participant.ID(),
"participant", participant.Identity(),
@@ -332,6 +345,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
p.OnStateChange(nil)
p.OnParticipantUpdate(nil)
p.OnDataPacket(nil)
p.OnSubscribedTo(nil)
// close participant as well
r.Logger.Infow("closing participant for removal", "pID", p.ID(), "participant", p.Identity())
+1
View File
@@ -153,6 +153,7 @@ type LocalParticipant interface {
// OnParticipantUpdate - metadata or permission is updated
OnParticipantUpdate(callback func(LocalParticipant))
OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket))
OnSubscribedTo(callback func(LocalParticipant, livekit.ParticipantID))
OnClose(_callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
OnClaimsChanged(_callback func(LocalParticipant))
@@ -337,6 +337,11 @@ type FakeLocalParticipant struct {
onStateChangeArgsForCall []struct {
arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
}
OnSubscribedToStub func(func(types.LocalParticipant, livekit.ParticipantID))
onSubscribedToMutex sync.RWMutex
onSubscribedToArgsForCall []struct {
arg1 func(types.LocalParticipant, livekit.ParticipantID)
}
OnTrackPublishedStub func(func(types.LocalParticipant, types.MediaTrack))
onTrackPublishedMutex sync.RWMutex
onTrackPublishedArgsForCall []struct {
@@ -2369,6 +2374,38 @@ func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.L
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnSubscribedTo(arg1 func(types.LocalParticipant, livekit.ParticipantID)) {
fake.onSubscribedToMutex.Lock()
fake.onSubscribedToArgsForCall = append(fake.onSubscribedToArgsForCall, struct {
arg1 func(types.LocalParticipant, livekit.ParticipantID)
}{arg1})
stub := fake.OnSubscribedToStub
fake.recordInvocation("OnSubscribedTo", []interface{}{arg1})
fake.onSubscribedToMutex.Unlock()
if stub != nil {
fake.OnSubscribedToStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnSubscribedToCallCount() int {
fake.onSubscribedToMutex.RLock()
defer fake.onSubscribedToMutex.RUnlock()
return len(fake.onSubscribedToArgsForCall)
}
func (fake *FakeLocalParticipant) OnSubscribedToCalls(stub func(func(types.LocalParticipant, livekit.ParticipantID))) {
fake.onSubscribedToMutex.Lock()
defer fake.onSubscribedToMutex.Unlock()
fake.OnSubscribedToStub = stub
}
func (fake *FakeLocalParticipant) OnSubscribedToArgsForCall(i int) func(types.LocalParticipant, livekit.ParticipantID) {
fake.onSubscribedToMutex.RLock()
defer fake.onSubscribedToMutex.RUnlock()
argsForCall := fake.onSubscribedToArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnTrackPublished(arg1 func(types.LocalParticipant, types.MediaTrack)) {
fake.onTrackPublishedMutex.Lock()
fake.onTrackPublishedArgsForCall = append(fake.onTrackPublishedArgsForCall, struct {
@@ -4030,6 +4067,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onParticipantUpdateMutex.RUnlock()
fake.onStateChangeMutex.RLock()
defer fake.onStateChangeMutex.RUnlock()
fake.onSubscribedToMutex.RLock()
defer fake.onSubscribedToMutex.RUnlock()
fake.onTrackPublishedMutex.RLock()
defer fake.onTrackPublishedMutex.RUnlock()
fake.onTrackUpdatedMutex.RLock()