diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index fe514ce84..34df1bdb4 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -126,6 +126,7 @@ type ParticipantImpl struct { onStateChange func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) onParticipantUpdate func(types.LocalParticipant) onDataPacket func(types.LocalParticipant, *livekit.DataPacket) + onSubscribedTo func(types.LocalParticipant, livekit.ParticipantID) migrateState atomic.Value // types.MigrateState pendingOffer *webrtc.SessionDescription @@ -440,6 +441,10 @@ func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *li p.onDataPacket = callback } +func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, livekit.ParticipantID)) { + p.onSubscribedTo = callback +} + func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { p.onClose = callback } @@ -815,7 +820,12 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { subTrack.UpdateSubscriberSettings(settings) } - p.subscribedTo.Store(subTrack.PublisherID(), struct{}{}) + publisherID := subTrack.PublisherID() + isAlreadySubscribed := p.isSubscribedTo(publisherID) + p.subscribedTo.Store(publisherID, struct{}{}) + if !isAlreadySubscribed && p.onSubscribedTo != nil { + p.onSubscribedTo(p, publisherID) + } } // RemoveSubscribedTrack removes a track to the participant's subscribed list diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 9059eee32..c0c890ba5 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -225,6 +225,19 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions participant.OnTrackUpdated(r.onTrackUpdated) participant.OnParticipantUpdate(r.onParticipantUpdate) participant.OnDataPacket(r.onDataPacket) + participant.OnSubscribedTo(func(p types.LocalParticipant, publisherID livekit.ParticipantID) { + // when a participant subscribed to another participant, + // send speaker update if the subscribed to participant is active. + go func() { + speakers := r.GetActiveSpeakers() + for _, speaker := range speakers { + if livekit.ParticipantID(speaker.Sid) == publisherID { + p.SendSpeakerUpdate(speakers) + break + } + } + }() + }) r.Logger.Infow("new participant joined", "pID", participant.ID(), "participant", participant.Identity(), @@ -332,6 +345,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) { p.OnStateChange(nil) p.OnParticipantUpdate(nil) p.OnDataPacket(nil) + p.OnSubscribedTo(nil) // close participant as well r.Logger.Infow("closing participant for removal", "pID", p.ID(), "participant", p.Identity()) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 151d6477a..a138709b2 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -153,6 +153,7 @@ type LocalParticipant interface { // OnParticipantUpdate - metadata or permission is updated OnParticipantUpdate(callback func(LocalParticipant)) OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket)) + OnSubscribedTo(callback func(LocalParticipant, livekit.ParticipantID)) OnClose(_callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) OnClaimsChanged(_callback func(LocalParticipant)) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 7cbb148d5..f4081ea1c 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -337,6 +337,11 @@ type FakeLocalParticipant struct { onStateChangeArgsForCall []struct { arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) } + OnSubscribedToStub func(func(types.LocalParticipant, livekit.ParticipantID)) + onSubscribedToMutex sync.RWMutex + onSubscribedToArgsForCall []struct { + arg1 func(types.LocalParticipant, livekit.ParticipantID) + } OnTrackPublishedStub func(func(types.LocalParticipant, types.MediaTrack)) onTrackPublishedMutex sync.RWMutex onTrackPublishedArgsForCall []struct { @@ -2369,6 +2374,38 @@ func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.L return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnSubscribedTo(arg1 func(types.LocalParticipant, livekit.ParticipantID)) { + fake.onSubscribedToMutex.Lock() + fake.onSubscribedToArgsForCall = append(fake.onSubscribedToArgsForCall, struct { + arg1 func(types.LocalParticipant, livekit.ParticipantID) + }{arg1}) + stub := fake.OnSubscribedToStub + fake.recordInvocation("OnSubscribedTo", []interface{}{arg1}) + fake.onSubscribedToMutex.Unlock() + if stub != nil { + fake.OnSubscribedToStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnSubscribedToCallCount() int { + fake.onSubscribedToMutex.RLock() + defer fake.onSubscribedToMutex.RUnlock() + return len(fake.onSubscribedToArgsForCall) +} + +func (fake *FakeLocalParticipant) OnSubscribedToCalls(stub func(func(types.LocalParticipant, livekit.ParticipantID))) { + fake.onSubscribedToMutex.Lock() + defer fake.onSubscribedToMutex.Unlock() + fake.OnSubscribedToStub = stub +} + +func (fake *FakeLocalParticipant) OnSubscribedToArgsForCall(i int) func(types.LocalParticipant, livekit.ParticipantID) { + fake.onSubscribedToMutex.RLock() + defer fake.onSubscribedToMutex.RUnlock() + argsForCall := fake.onSubscribedToArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) OnTrackPublished(arg1 func(types.LocalParticipant, types.MediaTrack)) { fake.onTrackPublishedMutex.Lock() fake.onTrackPublishedArgsForCall = append(fake.onTrackPublishedArgsForCall, struct { @@ -4030,6 +4067,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onParticipantUpdateMutex.RUnlock() fake.onStateChangeMutex.RLock() defer fake.onStateChangeMutex.RUnlock() + fake.onSubscribedToMutex.RLock() + defer fake.onSubscribedToMutex.RUnlock() fake.onTrackPublishedMutex.RLock() defer fake.onTrackPublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock()