From 31d6dd710778b281762c191429a040be57914176 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 21 Nov 2024 18:41:33 +0530 Subject: [PATCH] Set down track connected flag in one-shot-signalling mode. (#3191) * Set down track connected flag in one-shot-signalling mode. Also, added maintaing ICE candidates for info purposes. And doing analytics events (have to maintain the subscription inside subscriptionmanager to get list of subscribed tracks, so added enough bits from the async path into sync path to get the analytics bits also) * comment typo --- pkg/rtc/participant.go | 23 +++++++--- pkg/rtc/subscriptionmanager.go | 81 +++++++++++++++++++++++++++++----- pkg/rtc/transport.go | 48 +++++++++++++++++++- 3 files changed, 134 insertions(+), 18 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d7ef6062d..ed610fe1a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1388,12 +1388,21 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) { if err != nil { return } - if p.TransportManager.HasSubscriberEverConnected() { - dt := subTrack.DownTrack() - dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) - dt.SetConnected() + if p.params.UseOneShotSignallingMode { + if p.TransportManager.HasPublisherEverConnected() { + dt := subTrack.DownTrack() + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) + dt.SetConnected() + } + // ONE-SHOT-SIGNALLING-MODE-TODO: video support should add to publisher PC for congestion control + } else { + if p.TransportManager.HasSubscriberEverConnected() { + dt := subTrack.DownTrack() + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) + dt.SetConnected() + } + p.TransportManager.AddSubscribedTrack(subTrack) } - p.TransportManager.AddSubscribedTrack(subTrack) }) } @@ -1882,6 +1891,10 @@ func (p *ParticipantImpl) onPublisherInitialConnected() { p.supervisor.SetPublisherPeerConnectionConnected(true) } + if p.params.UseOneShotSignallingMode { + p.setDownTracksConnected() + } + p.pubRTCPQueue.Start() } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 2f0f25038..5d2c939e2 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -168,11 +168,7 @@ func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) { func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) { if m.params.UseOneShotSignallingMode { - // ONE-SHOT-SIGNALLING-MODE-TODO - // 1. Remove from peer connection - // 2. Analytics events for `TrackUnsubscribed` and `RTPStats`. - // Note that these are sent only if track was bound. - // So, maybe one shot mode also should maintain subscribed tracks? + m.unsubscribeSynchronous(trackID) return } @@ -626,6 +622,17 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro return ErrTrackNotFound } + m.lock.Lock() + sub, ok := m.subscriptions[trackID] + if !ok { + sLogger := m.params.Logger.WithValues( + "trackID", trackID, + ) + sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger) + m.subscriptions[trackID] = sub + } + m.lock.Unlock() + subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && !errors.Is(err, errAlreadySubscribed) { return err @@ -639,6 +646,35 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro ) } if err == nil && subTrack != nil { // subTrack could be nil if already subscribed + subTrack.OnClose(func(isExpectedToResume bool) { + m.handleSubscribedTrackClose(sub, isExpectedToResume) + + m.lock.Lock() + delete(m.subscriptions, trackID) + m.lock.Unlock() + }) + subTrack.AddOnBind(func(err error) { + if err != nil { + sub.logger.Infow("failed to bind track", "err", err) + sub.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true) + m.UnsubscribeFromTrack(trackID) + m.params.OnSubscriptionError(trackID, false, err) + return + } + sub.setBound() + sub.maybeRecordSuccess(m.params.Telemetry, m.params.Participant.ID()) + }) + sub.setSubscribedTrack(subTrack) + + switch track.Kind() { + case livekit.TrackType_VIDEO: + m.subscribedVideoCount.Inc() + case livekit.TrackType_AUDIO: + m.subscribedAudioCount.Inc() + } + + go m.params.OnTrackSubscribed(subTrack) + m.params.Logger.Debugw( "subscribed to track", "trackID", trackID, @@ -646,8 +682,6 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro "subscribedVideoCount", m.subscribedVideoCount.Load(), ) } - // ONE-SHOT-SIGNALLING-MODE-TODO - // 1. Analytics events for `TrackSubscribed` return nil } @@ -671,6 +705,29 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error { return nil } +func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error { + m.lock.Lock() + sub := m.subscriptions[trackID] + delete(m.subscriptions, trackID) + m.lock.Unlock() + if sub == nil { + // already unsubscribed or not subscribed + return nil + } + + sub.logger.Debugw("executing unsubscribe synchronous") + + subTrack := sub.getSubscribedTrack() + if subTrack == nil { + // already unsubscribed + return nil + } + + track := subTrack.MediaTrack() + track.RemoveSubscriber(m.params.Participant.ID(), false) + return nil +} + func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) { m.lock.Lock() sub := m.subscriptions[trackID] @@ -780,10 +837,12 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i t := time.Now() s.subscribeAt.Store(&t) } - if relieveFromLimits { - m.queueReconcile(trackIDForReconcileSubscriptions) - } else { - m.queueReconcile(s.trackID) + if !m.params.UseOneShotSignallingMode { + if relieveFromLimits { + m.queueReconcile(trackIDForReconcileSubscriptions) + } else { + m.queueReconcile(s.trackID) + } } } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 97d13a1a7..d421753ee 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1025,7 +1025,28 @@ func (t *PCTransport) clearConnTimer() { func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) error { if t.params.UseOneShotSignallingMode { - err := t.pc.SetRemoteDescription(sd) + // add remote candidates to ICE connection details + parsed, err := sd.Unmarshal() + if err == nil { + addRemoteICECandidates := func(attrs []sdp.Attribute) { + for _, a := range attrs { + if a.IsICECandidate() { + c, err := ice.UnmarshalCandidate(a.Value) + if err != nil { + continue + } + t.connectionDetails.AddRemoteICECandidate(c, false, false, false) + } + } + } + + addRemoteICECandidates(parsed.Attributes) + for _, m := range parsed.MediaDescriptions { + addRemoteICECandidates(m.Attributes) + } + } + + err = t.pc.SetRemoteDescription(sd) if err != nil { t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err) } @@ -1061,7 +1082,30 @@ func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, error) { // wait for gathering to complete to include all candidates in the answer <-webrtc.GatheringCompletePromise(t.pc) - return *t.pc.CurrentLocalDescription(), nil + cld := t.pc.CurrentLocalDescription() + + // add local candidates to ICE connection details + parsed, err := cld.Unmarshal() + if err == nil { + addLocalICECandidates := func(attrs []sdp.Attribute) { + for _, a := range attrs { + if a.IsICECandidate() { + c, err := ice.UnmarshalCandidate(a.Value) + if err != nil { + continue + } + t.connectionDetails.AddLocalICECandidate(c, false, false) + } + } + } + + addLocalICECandidates(parsed.Attributes) + for _, m := range parsed.MediaDescriptions { + addLocalICECandidates(m.Attributes) + } + } + + return *cld, nil } func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {