Scoped speaker update (#280)

* Scoped speaker update

Include only participants a participant is subscribed to.

NOTE: Not doing this for active speaker changes for Protocol < 3.

* correct comment spelling
This commit is contained in:
Raja Subramanian
2021-12-23 09:13:49 +05:30
committed by GitHub
parent 857ff7fae7
commit 42a9b6657d
6 changed files with 106 additions and 19 deletions
+1
View File
@@ -226,6 +226,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
return err
}
subTrack := NewSubscribedTrack(SubscribedTrackParams{
PublisherID: t.params.ParticipantID,
PublisherIdentity: t.params.ParticipantIdentity,
SubscriberID: sub.ID(),
MediaTrack: t,
+28 -13
View File
@@ -2,13 +2,14 @@ package rtc
import (
"fmt"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
lru "github.com/hashicorp/golang-lru"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -546,10 +547,21 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) err
return nil
}
var scopedSpeakers []*livekit.SpeakerInfo
for _, s := range speakers {
if p.IsSubscribedTo(s.Sid) {
scopedSpeakers = append(scopedSpeakers, s)
}
}
if len(scopedSpeakers) == 0 {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SpeakersChanged{
SpeakersChanged: &livekit.SpeakersChanged{
Speakers: speakers,
Speakers: scopedSpeakers,
},
},
})
@@ -702,20 +714,20 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
}
}
func (p *ParticipantImpl) IsSubscribedTo(identity string) bool {
_, ok := p.subscribedTo.Load(identity)
func (p *ParticipantImpl) IsSubscribedTo(participantSid string) bool {
_, ok := p.subscribedTo.Load(participantSid)
return ok
}
func (p *ParticipantImpl) GetSubscribedParticipants() []string {
var identities []string
var participantSids []string
p.subscribedTo.Range(func(key, _ interface{}) bool {
if identity, ok := key.(string); ok {
identities = append(identities, identity)
if participantSid, ok := key.(string); ok {
participantSids = append(participantSids, participantSid)
}
return true
})
return identities
return participantSids
}
func (p *ParticipantImpl) CanPublish() bool {
@@ -781,7 +793,8 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {
// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("added subscribedTrack",
"publisher", subTrack.PublisherIdentity(),
"publisherID", subTrack.PublisherID(),
"publisherIdentity", subTrack.PublisherIdentity(),
"track", subTrack.ID())
p.lock.Lock()
p.subscribedTracks[subTrack.ID()] = subTrack
@@ -790,12 +803,14 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
subTrack.OnBind(func() {
p.subscriber.AddTrack(subTrack)
})
p.subscribedTo.Store(subTrack.PublisherIdentity(), struct{}{})
p.subscribedTo.Store(subTrack.PublisherID(), struct{}{})
}
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(),
p.params.Logger.Debugw("removed subscribedTrack",
"publisherID", subTrack.PublisherID(),
"publisherIdentity", subTrack.PublisherIdentity(),
"track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
p.subscriber.RemoveTrack(subTrack)
@@ -805,13 +820,13 @@ func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack)
// remove from subscribed map
numRemaining := 0
for _, st := range p.subscribedTracks {
if st.PublisherIdentity() == subTrack.PublisherIdentity() {
if st.PublisherID() == subTrack.PublisherID() {
numRemaining++
}
}
p.lock.Unlock()
if numRemaining == 0 {
p.subscribedTo.Delete(subTrack.PublisherIdentity())
p.subscribedTo.Delete(subTrack.PublisherID())
}
}
+5 -5
View File
@@ -735,7 +735,7 @@ func (r *Room) connectionQualityWorker() {
connectionInfos := make(map[string]*livekit.ConnectionQualityInfo, len(participants))
for _, p := range participants {
connectionInfos[p.Identity()] = p.GetConnectionQuality()
connectionInfos[p.ID()] = p.GetConnectionQuality()
}
for _, op := range participants {
@@ -745,13 +745,13 @@ func (r *Room) connectionQualityWorker() {
update := &livekit.ConnectionQualityUpdate{}
// send to user itself
if info, ok := connectionInfos[op.Identity()]; ok {
if info, ok := connectionInfos[op.ID()]; ok {
update.Updates = append(update.Updates, info)
}
// send to other participants its subscribed to
for _, identity := range op.GetSubscribedParticipants() {
if info, ok := connectionInfos[identity]; ok {
// add connection quality of other participants its subscribed to
for _, sid := range op.GetSubscribedParticipants() {
if info, ok := connectionInfos[sid]; ok {
update.Updates = append(update.Updates, info)
}
}
+5
View File
@@ -18,6 +18,7 @@ const (
)
type SubscribedTrackParams struct {
PublisherID string
PublisherIdentity string
SubscriberID string
MediaTrack types.MediaTrack
@@ -55,6 +56,10 @@ func (t *SubscribedTrack) ID() string {
return t.params.DownTrack.ID()
}
func (t *SubscribedTrack) PublisherID() string {
return t.params.PublisherID
}
func (t *SubscribedTrack) PublisherIdentity() string {
return t.params.PublisherIdentity
}
+2 -1
View File
@@ -56,7 +56,7 @@ type Participant interface {
SetTrackMuted(trackId string, muted bool, fromAdmin bool)
GetAudioLevel() (level uint8, active bool)
GetConnectionQuality() *livekit.ConnectionQualityInfo
IsSubscribedTo(identity string) bool
IsSubscribedTo(participantSid string) bool
// returns list of participant identities that the current participant is subscribed to
GetSubscribedParticipants() []string
@@ -146,6 +146,7 @@ type PublishedTrack interface {
type SubscribedTrack interface {
OnBind(f func())
ID() string
PublisherID() string
PublisherIdentity() string
DownTrack() *sfu.DownTrack
MediaTrack() MediaTrack
@@ -55,6 +55,16 @@ type FakeSubscribedTrack struct {
onBindArgsForCall []struct {
arg1 func()
}
PublisherIDStub func() string
publisherIDMutex sync.RWMutex
publisherIDArgsForCall []struct {
}
publisherIDReturns struct {
result1 string
}
publisherIDReturnsOnCall map[int]struct {
result1 string
}
PublisherIdentityStub func() string
publisherIdentityMutex sync.RWMutex
publisherIdentityArgsForCall []struct {
@@ -327,6 +337,59 @@ func (fake *FakeSubscribedTrack) OnBindArgsForCall(i int) func() {
return argsForCall.arg1
}
func (fake *FakeSubscribedTrack) PublisherID() string {
fake.publisherIDMutex.Lock()
ret, specificReturn := fake.publisherIDReturnsOnCall[len(fake.publisherIDArgsForCall)]
fake.publisherIDArgsForCall = append(fake.publisherIDArgsForCall, struct {
}{})
stub := fake.PublisherIDStub
fakeReturns := fake.publisherIDReturns
fake.recordInvocation("PublisherID", []interface{}{})
fake.publisherIDMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeSubscribedTrack) PublisherIDCallCount() int {
fake.publisherIDMutex.RLock()
defer fake.publisherIDMutex.RUnlock()
return len(fake.publisherIDArgsForCall)
}
func (fake *FakeSubscribedTrack) PublisherIDCalls(stub func() string) {
fake.publisherIDMutex.Lock()
defer fake.publisherIDMutex.Unlock()
fake.PublisherIDStub = stub
}
func (fake *FakeSubscribedTrack) PublisherIDReturns(result1 string) {
fake.publisherIDMutex.Lock()
defer fake.publisherIDMutex.Unlock()
fake.PublisherIDStub = nil
fake.publisherIDReturns = struct {
result1 string
}{result1}
}
func (fake *FakeSubscribedTrack) PublisherIDReturnsOnCall(i int, result1 string) {
fake.publisherIDMutex.Lock()
defer fake.publisherIDMutex.Unlock()
fake.PublisherIDStub = nil
if fake.publisherIDReturnsOnCall == nil {
fake.publisherIDReturnsOnCall = make(map[int]struct {
result1 string
})
}
fake.publisherIDReturnsOnCall[i] = struct {
result1 string
}{result1}
}
func (fake *FakeSubscribedTrack) PublisherIdentity() string {
fake.publisherIdentityMutex.Lock()
ret, specificReturn := fake.publisherIdentityReturnsOnCall[len(fake.publisherIdentityArgsForCall)]
@@ -481,6 +544,8 @@ func (fake *FakeSubscribedTrack) Invocations() map[string][][]interface{} {
defer fake.mediaTrackMutex.RUnlock()
fake.onBindMutex.RLock()
defer fake.onBindMutex.RUnlock()
fake.publisherIDMutex.RLock()
defer fake.publisherIDMutex.RUnlock()
fake.publisherIdentityMutex.RLock()
defer fake.publisherIdentityMutex.RUnlock()
fake.setPublisherMutedMutex.RLock()