Set down track connected flag in one-shot-signalling mode. (#3191)

* Set down track connected flag in one-shot-signalling mode.

Also, added maintaing ICE candidates for info purposes.
And doing analytics events (have to maintain the subscription inside
subscriptionmanager to get list of subscribed tracks, so added enough
bits from the async path into sync path to get the analytics bits also)

* comment typo
This commit is contained in:
Raja Subramanian
2024-11-21 18:41:33 +05:30
committed by GitHub
parent d5cc567140
commit 31d6dd7107
3 changed files with 134 additions and 18 deletions
+18 -5
View File
@@ -1388,12 +1388,21 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) {
if err != nil {
return
}
if p.TransportManager.HasSubscriberEverConnected() {
dt := subTrack.DownTrack()
dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())})
dt.SetConnected()
if p.params.UseOneShotSignallingMode {
if p.TransportManager.HasPublisherEverConnected() {
dt := subTrack.DownTrack()
dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())})
dt.SetConnected()
}
// ONE-SHOT-SIGNALLING-MODE-TODO: video support should add to publisher PC for congestion control
} else {
if p.TransportManager.HasSubscriberEverConnected() {
dt := subTrack.DownTrack()
dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())})
dt.SetConnected()
}
p.TransportManager.AddSubscribedTrack(subTrack)
}
p.TransportManager.AddSubscribedTrack(subTrack)
})
}
@@ -1882,6 +1891,10 @@ func (p *ParticipantImpl) onPublisherInitialConnected() {
p.supervisor.SetPublisherPeerConnectionConnected(true)
}
if p.params.UseOneShotSignallingMode {
p.setDownTracksConnected()
}
p.pubRTCPQueue.Start()
}
+70 -11
View File
@@ -168,11 +168,7 @@ func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) {
func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) {
if m.params.UseOneShotSignallingMode {
// ONE-SHOT-SIGNALLING-MODE-TODO
// 1. Remove from peer connection
// 2. Analytics events for `TrackUnsubscribed` and `RTPStats`.
// Note that these are sent only if track was bound.
// So, maybe one shot mode also should maintain subscribed tracks?
m.unsubscribeSynchronous(trackID)
return
}
@@ -626,6 +622,17 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro
return ErrTrackNotFound
}
m.lock.Lock()
sub, ok := m.subscriptions[trackID]
if !ok {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.subscriptions[trackID] = sub
}
m.lock.Unlock()
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
return err
@@ -639,6 +646,35 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro
)
}
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
subTrack.OnClose(func(isExpectedToResume bool) {
m.handleSubscribedTrackClose(sub, isExpectedToResume)
m.lock.Lock()
delete(m.subscriptions, trackID)
m.lock.Unlock()
})
subTrack.AddOnBind(func(err error) {
if err != nil {
sub.logger.Infow("failed to bind track", "err", err)
sub.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
m.UnsubscribeFromTrack(trackID)
m.params.OnSubscriptionError(trackID, false, err)
return
}
sub.setBound()
sub.maybeRecordSuccess(m.params.Telemetry, m.params.Participant.ID())
})
sub.setSubscribedTrack(subTrack)
switch track.Kind() {
case livekit.TrackType_VIDEO:
m.subscribedVideoCount.Inc()
case livekit.TrackType_AUDIO:
m.subscribedAudioCount.Inc()
}
go m.params.OnTrackSubscribed(subTrack)
m.params.Logger.Debugw(
"subscribed to track",
"trackID", trackID,
@@ -646,8 +682,6 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro
"subscribedVideoCount", m.subscribedVideoCount.Load(),
)
}
// ONE-SHOT-SIGNALLING-MODE-TODO
// 1. Analytics events for `TrackSubscribed`
return nil
}
@@ -671,6 +705,29 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error {
return nil
}
func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error {
m.lock.Lock()
sub := m.subscriptions[trackID]
delete(m.subscriptions, trackID)
m.lock.Unlock()
if sub == nil {
// already unsubscribed or not subscribed
return nil
}
sub.logger.Debugw("executing unsubscribe synchronous")
subTrack := sub.getSubscribedTrack()
if subTrack == nil {
// already unsubscribed
return nil
}
track := subTrack.MediaTrack()
track.RemoveSubscriber(m.params.Participant.ID(), false)
return nil
}
func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) {
m.lock.Lock()
sub := m.subscriptions[trackID]
@@ -780,10 +837,12 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i
t := time.Now()
s.subscribeAt.Store(&t)
}
if relieveFromLimits {
m.queueReconcile(trackIDForReconcileSubscriptions)
} else {
m.queueReconcile(s.trackID)
if !m.params.UseOneShotSignallingMode {
if relieveFromLimits {
m.queueReconcile(trackIDForReconcileSubscriptions)
} else {
m.queueReconcile(s.trackID)
}
}
}
+46 -2
View File
@@ -1025,7 +1025,28 @@ func (t *PCTransport) clearConnTimer() {
func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) error {
if t.params.UseOneShotSignallingMode {
err := t.pc.SetRemoteDescription(sd)
// add remote candidates to ICE connection details
parsed, err := sd.Unmarshal()
if err == nil {
addRemoteICECandidates := func(attrs []sdp.Attribute) {
for _, a := range attrs {
if a.IsICECandidate() {
c, err := ice.UnmarshalCandidate(a.Value)
if err != nil {
continue
}
t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
}
}
}
addRemoteICECandidates(parsed.Attributes)
for _, m := range parsed.MediaDescriptions {
addRemoteICECandidates(m.Attributes)
}
}
err = t.pc.SetRemoteDescription(sd)
if err != nil {
t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err)
}
@@ -1061,7 +1082,30 @@ func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, error) {
// wait for gathering to complete to include all candidates in the answer
<-webrtc.GatheringCompletePromise(t.pc)
return *t.pc.CurrentLocalDescription(), nil
cld := t.pc.CurrentLocalDescription()
// add local candidates to ICE connection details
parsed, err := cld.Unmarshal()
if err == nil {
addLocalICECandidates := func(attrs []sdp.Attribute) {
for _, a := range attrs {
if a.IsICECandidate() {
c, err := ice.UnmarshalCandidate(a.Value)
if err != nil {
continue
}
t.connectionDetails.AddLocalICECandidate(c, false, false)
}
}
}
addLocalICECandidates(parsed.Attributes)
for _, m := range parsed.MediaDescriptions {
addLocalICECandidates(m.Attributes)
}
}
return *cld, nil
}
func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {