diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index ed76a8430..886520294 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -226,6 +226,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return err } subTrack := NewSubscribedTrack(SubscribedTrackParams{ + PublisherID: t.params.ParticipantID, PublisherIdentity: t.params.ParticipantIdentity, SubscriberID: sub.ID(), MediaTrack: t, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 5501c805c..b03a5e05d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2,13 +2,14 @@ package rtc import ( "fmt" - "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "io" "strings" "sync" "sync/atomic" "time" + "github.com/livekit/livekit-server/pkg/sfu/connectionquality" + lru "github.com/hashicorp/golang-lru" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -546,10 +547,21 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) err return nil } + var scopedSpeakers []*livekit.SpeakerInfo + for _, s := range speakers { + if p.IsSubscribedTo(s.Sid) { + scopedSpeakers = append(scopedSpeakers, s) + } + } + + if len(scopedSpeakers) == 0 { + return nil + } + return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_SpeakersChanged{ SpeakersChanged: &livekit.SpeakersChanged{ - Speakers: speakers, + Speakers: scopedSpeakers, }, }, }) @@ -702,20 +714,20 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo } } -func (p *ParticipantImpl) IsSubscribedTo(identity string) bool { - _, ok := p.subscribedTo.Load(identity) +func (p *ParticipantImpl) IsSubscribedTo(participantSid string) bool { + _, ok := p.subscribedTo.Load(participantSid) return ok } func (p *ParticipantImpl) GetSubscribedParticipants() []string { - var identities []string + var participantSids []string p.subscribedTo.Range(func(key, _ interface{}) bool { - if identity, ok := key.(string); ok { - identities = append(identities, identity) + if participantSid, ok := key.(string); ok { + participantSids = append(participantSids, participantSid) } return true }) - return identities + return participantSids } func (p *ParticipantImpl) CanPublish() bool { @@ -781,7 +793,8 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { // AddSubscribedTrack adds a track to the participant's subscribed list func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { p.params.Logger.Debugw("added subscribedTrack", - "publisher", subTrack.PublisherIdentity(), + "publisherID", subTrack.PublisherID(), + "publisherIdentity", subTrack.PublisherIdentity(), "track", subTrack.ID()) p.lock.Lock() p.subscribedTracks[subTrack.ID()] = subTrack @@ -790,12 +803,14 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { subTrack.OnBind(func() { p.subscriber.AddTrack(subTrack) }) - p.subscribedTo.Store(subTrack.PublisherIdentity(), struct{}{}) + p.subscribedTo.Store(subTrack.PublisherID(), struct{}{}) } // RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { - p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(), + p.params.Logger.Debugw("removed subscribedTrack", + "publisherID", subTrack.PublisherID(), + "publisherIdentity", subTrack.PublisherIdentity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind()) p.subscriber.RemoveTrack(subTrack) @@ -805,13 +820,13 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) // remove from subscribed map numRemaining := 0 for _, st := range p.subscribedTracks { - if st.PublisherIdentity() == subTrack.PublisherIdentity() { + if st.PublisherID() == subTrack.PublisherID() { numRemaining++ } } p.lock.Unlock() if numRemaining == 0 { - p.subscribedTo.Delete(subTrack.PublisherIdentity()) + p.subscribedTo.Delete(subTrack.PublisherID()) } } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ccb977f7a..420b6a300 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -735,7 +735,7 @@ func (r *Room) connectionQualityWorker() { connectionInfos := make(map[string]*livekit.ConnectionQualityInfo, len(participants)) for _, p := range participants { - connectionInfos[p.Identity()] = p.GetConnectionQuality() + connectionInfos[p.ID()] = p.GetConnectionQuality() } for _, op := range participants { @@ -745,13 +745,13 @@ func (r *Room) connectionQualityWorker() { update := &livekit.ConnectionQualityUpdate{} // send to user itself - if info, ok := connectionInfos[op.Identity()]; ok { + if info, ok := connectionInfos[op.ID()]; ok { update.Updates = append(update.Updates, info) } - // send to other participants its subscribed to - for _, identity := range op.GetSubscribedParticipants() { - if info, ok := connectionInfos[identity]; ok { + // add connection quality of other participants its subscribed to + for _, sid := range op.GetSubscribedParticipants() { + if info, ok := connectionInfos[sid]; ok { update.Updates = append(update.Updates, info) } } diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index af65a8966..5d65cf5a4 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -18,6 +18,7 @@ const ( ) type SubscribedTrackParams struct { + PublisherID string PublisherIdentity string SubscriberID string MediaTrack types.MediaTrack @@ -55,6 +56,10 @@ func (t *SubscribedTrack) ID() string { return t.params.DownTrack.ID() } +func (t *SubscribedTrack) PublisherID() string { + return t.params.PublisherID +} + func (t *SubscribedTrack) PublisherIdentity() string { return t.params.PublisherIdentity } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 0390a0e09..9283bca58 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -56,7 +56,7 @@ type Participant interface { SetTrackMuted(trackId string, muted bool, fromAdmin bool) GetAudioLevel() (level uint8, active bool) GetConnectionQuality() *livekit.ConnectionQualityInfo - IsSubscribedTo(identity string) bool + IsSubscribedTo(participantSid string) bool // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []string @@ -146,6 +146,7 @@ type PublishedTrack interface { type SubscribedTrack interface { OnBind(f func()) ID() string + PublisherID() string PublisherIdentity() string DownTrack() *sfu.DownTrack MediaTrack() MediaTrack diff --git a/pkg/rtc/types/typesfakes/fake_subscribed_track.go b/pkg/rtc/types/typesfakes/fake_subscribed_track.go index 2caf275d2..512df3218 100644 --- a/pkg/rtc/types/typesfakes/fake_subscribed_track.go +++ b/pkg/rtc/types/typesfakes/fake_subscribed_track.go @@ -55,6 +55,16 @@ type FakeSubscribedTrack struct { onBindArgsForCall []struct { arg1 func() } + PublisherIDStub func() string + publisherIDMutex sync.RWMutex + publisherIDArgsForCall []struct { + } + publisherIDReturns struct { + result1 string + } + publisherIDReturnsOnCall map[int]struct { + result1 string + } PublisherIdentityStub func() string publisherIdentityMutex sync.RWMutex publisherIdentityArgsForCall []struct { @@ -327,6 +337,59 @@ func (fake *FakeSubscribedTrack) OnBindArgsForCall(i int) func() { return argsForCall.arg1 } +func (fake *FakeSubscribedTrack) PublisherID() string { + fake.publisherIDMutex.Lock() + ret, specificReturn := fake.publisherIDReturnsOnCall[len(fake.publisherIDArgsForCall)] + fake.publisherIDArgsForCall = append(fake.publisherIDArgsForCall, struct { + }{}) + stub := fake.PublisherIDStub + fakeReturns := fake.publisherIDReturns + fake.recordInvocation("PublisherID", []interface{}{}) + fake.publisherIDMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeSubscribedTrack) PublisherIDCallCount() int { + fake.publisherIDMutex.RLock() + defer fake.publisherIDMutex.RUnlock() + return len(fake.publisherIDArgsForCall) +} + +func (fake *FakeSubscribedTrack) PublisherIDCalls(stub func() string) { + fake.publisherIDMutex.Lock() + defer fake.publisherIDMutex.Unlock() + fake.PublisherIDStub = stub +} + +func (fake *FakeSubscribedTrack) PublisherIDReturns(result1 string) { + fake.publisherIDMutex.Lock() + defer fake.publisherIDMutex.Unlock() + fake.PublisherIDStub = nil + fake.publisherIDReturns = struct { + result1 string + }{result1} +} + +func (fake *FakeSubscribedTrack) PublisherIDReturnsOnCall(i int, result1 string) { + fake.publisherIDMutex.Lock() + defer fake.publisherIDMutex.Unlock() + fake.PublisherIDStub = nil + if fake.publisherIDReturnsOnCall == nil { + fake.publisherIDReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.publisherIDReturnsOnCall[i] = struct { + result1 string + }{result1} +} + func (fake *FakeSubscribedTrack) PublisherIdentity() string { fake.publisherIdentityMutex.Lock() ret, specificReturn := fake.publisherIdentityReturnsOnCall[len(fake.publisherIdentityArgsForCall)] @@ -481,6 +544,8 @@ func (fake *FakeSubscribedTrack) Invocations() map[string][][]interface{} { defer fake.mediaTrackMutex.RUnlock() fake.onBindMutex.RLock() defer fake.onBindMutex.RUnlock() + fake.publisherIDMutex.RLock() + defer fake.publisherIDMutex.RUnlock() fake.publisherIdentityMutex.RLock() defer fake.publisherIdentityMutex.RUnlock() fake.setPublisherMutedMutex.RLock()