diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d7ef6062d..ed610fe1a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1388,12 +1388,21 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) { if err != nil { return } - if p.TransportManager.HasSubscriberEverConnected() { - dt := subTrack.DownTrack() - dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) - dt.SetConnected() + if p.params.UseOneShotSignallingMode { + if p.TransportManager.HasPublisherEverConnected() { + dt := subTrack.DownTrack() + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) + dt.SetConnected() + } + // ONE-SHOT-SIGNALLING-MODE-TODO: video support should add to publisher PC for congestion control + } else { + if p.TransportManager.HasSubscriberEverConnected() { + dt := subTrack.DownTrack() + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) + dt.SetConnected() + } + p.TransportManager.AddSubscribedTrack(subTrack) } - p.TransportManager.AddSubscribedTrack(subTrack) }) } @@ -1882,6 +1891,10 @@ func (p *ParticipantImpl) onPublisherInitialConnected() { p.supervisor.SetPublisherPeerConnectionConnected(true) } + if p.params.UseOneShotSignallingMode { + p.setDownTracksConnected() + } + p.pubRTCPQueue.Start() } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 2f0f25038..5d2c939e2 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -168,11 +168,7 @@ func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) { func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) { if m.params.UseOneShotSignallingMode { - // ONE-SHOT-SIGNALLING-MODE-TODO - // 1. Remove from peer connection - // 2. Analytics events for `TrackUnsubscribed` and `RTPStats`. - // Note that these are sent only if track was bound. - // So, maybe one shot mode also should maintain subscribed tracks? + m.unsubscribeSynchronous(trackID) return } @@ -626,6 +622,17 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro return ErrTrackNotFound } + m.lock.Lock() + sub, ok := m.subscriptions[trackID] + if !ok { + sLogger := m.params.Logger.WithValues( + "trackID", trackID, + ) + sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger) + m.subscriptions[trackID] = sub + } + m.lock.Unlock() + subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && !errors.Is(err, errAlreadySubscribed) { return err @@ -639,6 +646,35 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro ) } if err == nil && subTrack != nil { // subTrack could be nil if already subscribed + subTrack.OnClose(func(isExpectedToResume bool) { + m.handleSubscribedTrackClose(sub, isExpectedToResume) + + m.lock.Lock() + delete(m.subscriptions, trackID) + m.lock.Unlock() + }) + subTrack.AddOnBind(func(err error) { + if err != nil { + sub.logger.Infow("failed to bind track", "err", err) + sub.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true) + m.UnsubscribeFromTrack(trackID) + m.params.OnSubscriptionError(trackID, false, err) + return + } + sub.setBound() + sub.maybeRecordSuccess(m.params.Telemetry, m.params.Participant.ID()) + }) + sub.setSubscribedTrack(subTrack) + + switch track.Kind() { + case livekit.TrackType_VIDEO: + m.subscribedVideoCount.Inc() + case livekit.TrackType_AUDIO: + m.subscribedAudioCount.Inc() + } + + go m.params.OnTrackSubscribed(subTrack) + m.params.Logger.Debugw( "subscribed to track", "trackID", trackID, @@ -646,8 +682,6 @@ func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) erro "subscribedVideoCount", m.subscribedVideoCount.Load(), ) } - // ONE-SHOT-SIGNALLING-MODE-TODO - // 1. Analytics events for `TrackSubscribed` return nil } @@ -671,6 +705,29 @@ func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error { return nil } +func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error { + m.lock.Lock() + sub := m.subscriptions[trackID] + delete(m.subscriptions, trackID) + m.lock.Unlock() + if sub == nil { + // already unsubscribed or not subscribed + return nil + } + + sub.logger.Debugw("executing unsubscribe synchronous") + + subTrack := sub.getSubscribedTrack() + if subTrack == nil { + // already unsubscribed + return nil + } + + track := subTrack.MediaTrack() + track.RemoveSubscriber(m.params.Participant.ID(), false) + return nil +} + func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) { m.lock.Lock() sub := m.subscriptions[trackID] @@ -780,10 +837,12 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i t := time.Now() s.subscribeAt.Store(&t) } - if relieveFromLimits { - m.queueReconcile(trackIDForReconcileSubscriptions) - } else { - m.queueReconcile(s.trackID) + if !m.params.UseOneShotSignallingMode { + if relieveFromLimits { + m.queueReconcile(trackIDForReconcileSubscriptions) + } else { + m.queueReconcile(s.trackID) + } } } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 97d13a1a7..d421753ee 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1025,7 +1025,28 @@ func (t *PCTransport) clearConnTimer() { func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) error { if t.params.UseOneShotSignallingMode { - err := t.pc.SetRemoteDescription(sd) + // add remote candidates to ICE connection details + parsed, err := sd.Unmarshal() + if err == nil { + addRemoteICECandidates := func(attrs []sdp.Attribute) { + for _, a := range attrs { + if a.IsICECandidate() { + c, err := ice.UnmarshalCandidate(a.Value) + if err != nil { + continue + } + t.connectionDetails.AddRemoteICECandidate(c, false, false, false) + } + } + } + + addRemoteICECandidates(parsed.Attributes) + for _, m := range parsed.MediaDescriptions { + addRemoteICECandidates(m.Attributes) + } + } + + err = t.pc.SetRemoteDescription(sd) if err != nil { t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err) } @@ -1061,7 +1082,30 @@ func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, error) { // wait for gathering to complete to include all candidates in the answer <-webrtc.GatheringCompletePromise(t.pc) - return *t.pc.CurrentLocalDescription(), nil + cld := t.pc.CurrentLocalDescription() + + // add local candidates to ICE connection details + parsed, err := cld.Unmarshal() + if err == nil { + addLocalICECandidates := func(attrs []sdp.Attribute) { + for _, a := range attrs { + if a.IsICECandidate() { + c, err := ice.UnmarshalCandidate(a.Value) + if err != nil { + continue + } + t.connectionDetails.AddLocalICECandidate(c, false, false) + } + } + } + + addLocalICECandidates(parsed.Attributes) + for _, m := range parsed.MediaDescriptions { + addLocalICECandidates(m.Attributes) + } + } + + return *cld, nil } func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {