From 5589637152d1acbe7b7b0ea969cfdaaf3e89b367 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 14 Feb 2025 15:46:08 +0530 Subject: [PATCH] Seed on receiving forwarder state. (#3435) This is mostly to clean up forwarder state cache for already started tracks. A scenario like the following could apply the seed twice and end up with an incorrect state resulting in a large jump - Participant A let's say is the one showing the problem - Participant A migrates first. So, it tries to restore its down track states by querying state from the previous node. - But, its down tracks start before the response can be received. However, it remains in the cache. - Participant B migrates from a different node to where Participant A. So, the down track of Participant A gets switched from relay up track publisher -> local up track publisher. - I am guessing the seeding gets applied twice in this case and the cached value from step 3 above causes the huge jump. In those cases, the cache needs to be cleaned up. (NOTE: I think this seeding of down track on migration is not necessary as the SSRC of down track changes and the remote side seems to be treating it like a fresh start because of that. But, doing this step first and will remove the related parts after observing for a bit more) Also, moving fetching forwarder state to a goroutine as it involves a network call to the previous node via Director. --- pkg/rtc/participant.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 552f38e08..3410b6849 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1215,7 +1215,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) { case types.MigrateStateComplete: p.TransportManager.ProcessPendingPublisherDataChannels() - p.cacheForwarderState() + go p.cacheForwarderState() } if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil { @@ -2826,6 +2826,12 @@ func (p *ParticipantImpl) cacheForwarderState() { p.lock.Lock() p.forwarderState = fs p.lock.Unlock() + + for _, t := range p.SubscriptionManager.GetSubscribedTracks() { + if dt := t.DownTrack(); dt != nil { + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(t.ID())}) + } + } } } }