From df6c26dbf6361195a51bfa0e7b59331a4e881a59 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 12 Sep 2025 13:01:00 +0530 Subject: [PATCH] Subscrbed audio codecs - update from remote nodes. (#3921) --- pkg/rtc/mediatrack.go | 8 +- pkg/rtc/participant.go | 15 +++- pkg/rtc/types/interfaces.go | 4 +- .../typesfakes/fake_local_media_track.go | 80 +++++++++++++----- .../typesfakes/fake_local_participant.go | 81 +++++++++++++++++++ 5 files changed, 166 insertions(+), 22 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index d19e5740d..08becb859 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -219,7 +219,13 @@ func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quali } } -func (t *MediaTrack) ClearSubscriberNodesMaxQuality() { +func (t *MediaTrack) NotifySubscriptionNode(nodeID livekit.NodeID, codecs []*livekit.SubscribedAudioCodec) { + if t.dynacastManager != nil { + t.dynacastManager.NotifySubscriptionNode(nodeID, codecs) + } +} + +func (t *MediaTrack) ClearSubscriberNodes() { if t.dynacastManager != nil { t.dynacastManager.ClearSubscriberNodes() } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6f6de913c..bb4f4e9bc 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3756,6 +3756,17 @@ func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID return nil } +func (p *ParticipantImpl) UpdateSubscribedAudioCodecs(nodeID livekit.NodeID, trackID livekit.TrackID, codecs []*livekit.SubscribedAudioCodec) error { + track := p.GetPublishedTrack(trackID) + if track == nil { + p.pubLogger.Debugw("could not find track", "trackID", trackID) + return errors.New("could not find published track") + } + + track.(types.LocalMediaTrack).NotifySubscriptionNode(nodeID, codecs) + return nil +} + func (p *ParticipantImpl) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error { track := p.GetPublishedTrack(trackID) if track == nil { @@ -4036,9 +4047,9 @@ func (p *ParticipantImpl) MoveToRoom(params types.MoveToRoomParams) { for _, track := range p.GetPublishedTracks() { for _, sub := range track.GetAllSubscribers() { track.RemoveSubscriber(sub, false) - // clear the subscriber node max quality as the remote quality notify + // clear the subscriber node max quality/audio codecs as the remote quality notify // from source room would not reach the moving out participant. - track.(types.LocalMediaTrack).ClearSubscriberNodesMaxQuality() + track.(types.LocalMediaTrack).ClearSubscriberNodes() } trackInfo := track.ToProto() p.params.Telemetry.TrackUnpublished( diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index e732672e9..91ad3ae6f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -518,6 +518,7 @@ type LocalParticipant interface { OnICEConfigChanged(callback func(participant LocalParticipant, iceConfig *livekit.ICEConfig)) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error + UpdateSubscribedAudioCodecs(nodeID livekit.NodeID, trackID livekit.TrackID, codecs []*livekit.SubscribedAudioCodec) error UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error // down stream bandwidth management @@ -627,7 +628,8 @@ type LocalMediaTrack interface { SetRTT(rtt uint32) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []SubscribedCodecQuality) - ClearSubscriberNodesMaxQuality() + NotifySubscriptionNode(nodeID livekit.NodeID, codecs []*livekit.SubscribedAudioCodec) + ClearSubscriberNodes() NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8) } diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 115e0ac45..a327509de 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -35,9 +35,9 @@ type FakeLocalMediaTrack struct { clearAllReceiversArgsForCall []struct { arg1 bool } - ClearSubscriberNodesMaxQualityStub func() - clearSubscriberNodesMaxQualityMutex sync.RWMutex - clearSubscriberNodesMaxQualityArgsForCall []struct { + ClearSubscriberNodesStub func() + clearSubscriberNodesMutex sync.RWMutex + clearSubscriberNodesArgsForCall []struct { } CloseStub func(bool) closeMutex sync.RWMutex @@ -239,6 +239,12 @@ type FakeLocalMediaTrack struct { arg1 livekit.NodeID arg2 uint8 } + NotifySubscriptionNodeStub func(livekit.NodeID, []*livekit.SubscribedAudioCodec) + notifySubscriptionNodeMutex sync.RWMutex + notifySubscriptionNodeArgsForCall []struct { + arg1 livekit.NodeID + arg2 []*livekit.SubscribedAudioCodec + } OnTrackSubscribedStub func() onTrackSubscribedMutex sync.RWMutex onTrackSubscribedArgsForCall []struct { @@ -491,28 +497,28 @@ func (fake *FakeLocalMediaTrack) ClearAllReceiversArgsForCall(i int) bool { return argsForCall.arg1 } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQuality() { - fake.clearSubscriberNodesMaxQualityMutex.Lock() - fake.clearSubscriberNodesMaxQualityArgsForCall = append(fake.clearSubscriberNodesMaxQualityArgsForCall, struct { +func (fake *FakeLocalMediaTrack) ClearSubscriberNodes() { + fake.clearSubscriberNodesMutex.Lock() + fake.clearSubscriberNodesArgsForCall = append(fake.clearSubscriberNodesArgsForCall, struct { }{}) - stub := fake.ClearSubscriberNodesMaxQualityStub - fake.recordInvocation("ClearSubscriberNodesMaxQuality", []interface{}{}) - fake.clearSubscriberNodesMaxQualityMutex.Unlock() + stub := fake.ClearSubscriberNodesStub + fake.recordInvocation("ClearSubscriberNodes", []interface{}{}) + fake.clearSubscriberNodesMutex.Unlock() if stub != nil { - fake.ClearSubscriberNodesMaxQualityStub() + fake.ClearSubscriberNodesStub() } } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQualityCallCount() int { - fake.clearSubscriberNodesMaxQualityMutex.RLock() - defer fake.clearSubscriberNodesMaxQualityMutex.RUnlock() - return len(fake.clearSubscriberNodesMaxQualityArgsForCall) +func (fake *FakeLocalMediaTrack) ClearSubscriberNodesCallCount() int { + fake.clearSubscriberNodesMutex.RLock() + defer fake.clearSubscriberNodesMutex.RUnlock() + return len(fake.clearSubscriberNodesArgsForCall) } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQualityCalls(stub func()) { - fake.clearSubscriberNodesMaxQualityMutex.Lock() - defer fake.clearSubscriberNodesMaxQualityMutex.Unlock() - fake.ClearSubscriberNodesMaxQualityStub = stub +func (fake *FakeLocalMediaTrack) ClearSubscriberNodesCalls(stub func()) { + fake.clearSubscriberNodesMutex.Lock() + defer fake.clearSubscriberNodesMutex.Unlock() + fake.ClearSubscriberNodesStub = stub } func (fake *FakeLocalMediaTrack) Close(arg1 bool) { @@ -1569,6 +1575,44 @@ func (fake *FakeLocalMediaTrack) NotifySubscriberNodeMediaLossArgsForCall(i int) return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeLocalMediaTrack) NotifySubscriptionNode(arg1 livekit.NodeID, arg2 []*livekit.SubscribedAudioCodec) { + var arg2Copy []*livekit.SubscribedAudioCodec + if arg2 != nil { + arg2Copy = make([]*livekit.SubscribedAudioCodec, len(arg2)) + copy(arg2Copy, arg2) + } + fake.notifySubscriptionNodeMutex.Lock() + fake.notifySubscriptionNodeArgsForCall = append(fake.notifySubscriptionNodeArgsForCall, struct { + arg1 livekit.NodeID + arg2 []*livekit.SubscribedAudioCodec + }{arg1, arg2Copy}) + stub := fake.NotifySubscriptionNodeStub + fake.recordInvocation("NotifySubscriptionNode", []interface{}{arg1, arg2Copy}) + fake.notifySubscriptionNodeMutex.Unlock() + if stub != nil { + fake.NotifySubscriptionNodeStub(arg1, arg2) + } +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeCallCount() int { + fake.notifySubscriptionNodeMutex.RLock() + defer fake.notifySubscriptionNodeMutex.RUnlock() + return len(fake.notifySubscriptionNodeArgsForCall) +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeCalls(stub func(livekit.NodeID, []*livekit.SubscribedAudioCodec)) { + fake.notifySubscriptionNodeMutex.Lock() + defer fake.notifySubscriptionNodeMutex.Unlock() + fake.NotifySubscriptionNodeStub = stub +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeArgsForCall(i int) (livekit.NodeID, []*livekit.SubscribedAudioCodec) { + fake.notifySubscriptionNodeMutex.RLock() + defer fake.notifySubscriptionNodeMutex.RUnlock() + argsForCall := fake.notifySubscriptionNodeArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalMediaTrack) OnTrackSubscribed() { fake.onTrackSubscribedMutex.Lock() fake.onTrackSubscribedArgsForCall = append(fake.onTrackSubscribedArgsForCall, struct { diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index abde499ad..a1dcca947 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -1332,6 +1332,19 @@ type FakeLocalParticipant struct { updateSignalingRTTArgsForCall []struct { arg1 uint32 } + UpdateSubscribedAudioCodecsStub func(livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) error + updateSubscribedAudioCodecsMutex sync.RWMutex + updateSubscribedAudioCodecsArgsForCall []struct { + arg1 livekit.NodeID + arg2 livekit.TrackID + arg3 []*livekit.SubscribedAudioCodec + } + updateSubscribedAudioCodecsReturns struct { + result1 error + } + updateSubscribedAudioCodecsReturnsOnCall map[int]struct { + result1 error + } UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error updateSubscribedQualityMutex sync.RWMutex updateSubscribedQualityArgsForCall []struct { @@ -8554,6 +8567,74 @@ func (fake *FakeLocalParticipant) UpdateSignalingRTTArgsForCall(i int) uint32 { return argsForCall.arg1 } +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecs(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []*livekit.SubscribedAudioCodec) error { + var arg3Copy []*livekit.SubscribedAudioCodec + if arg3 != nil { + arg3Copy = make([]*livekit.SubscribedAudioCodec, len(arg3)) + copy(arg3Copy, arg3) + } + fake.updateSubscribedAudioCodecsMutex.Lock() + ret, specificReturn := fake.updateSubscribedAudioCodecsReturnsOnCall[len(fake.updateSubscribedAudioCodecsArgsForCall)] + fake.updateSubscribedAudioCodecsArgsForCall = append(fake.updateSubscribedAudioCodecsArgsForCall, struct { + arg1 livekit.NodeID + arg2 livekit.TrackID + arg3 []*livekit.SubscribedAudioCodec + }{arg1, arg2, arg3Copy}) + stub := fake.UpdateSubscribedAudioCodecsStub + fakeReturns := fake.updateSubscribedAudioCodecsReturns + fake.recordInvocation("UpdateSubscribedAudioCodecs", []interface{}{arg1, arg2, arg3Copy}) + fake.updateSubscribedAudioCodecsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsCallCount() int { + fake.updateSubscribedAudioCodecsMutex.RLock() + defer fake.updateSubscribedAudioCodecsMutex.RUnlock() + return len(fake.updateSubscribedAudioCodecsArgsForCall) +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsCalls(stub func(livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = stub +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsArgsForCall(i int) (livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) { + fake.updateSubscribedAudioCodecsMutex.RLock() + defer fake.updateSubscribedAudioCodecsMutex.RUnlock() + argsForCall := fake.updateSubscribedAudioCodecsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsReturns(result1 error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = nil + fake.updateSubscribedAudioCodecsReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsReturnsOnCall(i int, result1 error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = nil + if fake.updateSubscribedAudioCodecsReturnsOnCall == nil { + fake.updateSubscribedAudioCodecsReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateSubscribedAudioCodecsReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) UpdateSubscribedQuality(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []types.SubscribedCodecQuality) error { var arg3Copy []types.SubscribedCodecQuality if arg3 != nil {