mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
* Set down track connected flag in one-shot-signalling mode. Also, added maintaing ICE candidates for info purposes. And doing analytics events (have to maintain the subscription inside subscriptionmanager to get list of subscribed tracks, so added enough bits from the async path into sync path to get the analytics bits also) * comment typo * method to check if a track name is subscribed
1176 lines
33 KiB
Go
1176 lines
33 KiB
Go
/*
|
|
* Copyright 2023 LiveKit, Inc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/webrtc/v3/pkg/rtcerr"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/sfu"
|
|
"github.com/livekit/livekit-server/pkg/telemetry"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils"
|
|
)
|
|
|
|
// using var instead of const to override in tests
|
|
var (
|
|
reconcileInterval = 3 * time.Second
|
|
// amount of time to give up if a track or publisher isn't found
|
|
// ensuring this is longer than iceFailedTimeout so we are certain the participant won't return
|
|
notFoundTimeout = time.Minute
|
|
// amount of time to try otherwise before flagging subscription as failed
|
|
subscriptionTimeout = iceFailedTimeoutTotal
|
|
trackRemoveGracePeriod = time.Second
|
|
maxUnsubscribeWait = time.Second
|
|
)
|
|
|
|
const (
|
|
trackIDForReconcileSubscriptions = livekit.TrackID("subscriptions_reconcile")
|
|
)
|
|
|
|
type SubscriptionManagerParams struct {
|
|
Logger logger.Logger
|
|
Participant types.LocalParticipant
|
|
TrackResolver types.MediaTrackResolver
|
|
OnTrackSubscribed func(subTrack types.SubscribedTrack)
|
|
OnTrackUnsubscribed func(subTrack types.SubscribedTrack)
|
|
OnSubscriptionError func(trackID livekit.TrackID, fatal bool, err error)
|
|
Telemetry telemetry.TelemetryService
|
|
|
|
SubscriptionLimitVideo, SubscriptionLimitAudio int32
|
|
|
|
UseOneShotSignallingMode bool
|
|
}
|
|
|
|
// SubscriptionManager manages a participant's subscriptions
|
|
type SubscriptionManager struct {
|
|
params SubscriptionManagerParams
|
|
lock sync.RWMutex
|
|
subscriptions map[livekit.TrackID]*trackSubscription
|
|
pendingUnsubscribes atomic.Int32
|
|
|
|
subscribedVideoCount, subscribedAudioCount atomic.Int32
|
|
|
|
subscribedTo map[livekit.ParticipantID]map[livekit.TrackID]struct{}
|
|
reconcileCh chan livekit.TrackID
|
|
closeCh chan struct{}
|
|
doneCh chan struct{}
|
|
|
|
onSubscribeStatusChanged func(publisherID livekit.ParticipantID, subscribed bool)
|
|
}
|
|
|
|
func NewSubscriptionManager(params SubscriptionManagerParams) *SubscriptionManager {
|
|
m := &SubscriptionManager{
|
|
params: params,
|
|
subscriptions: make(map[livekit.TrackID]*trackSubscription),
|
|
subscribedTo: make(map[livekit.ParticipantID]map[livekit.TrackID]struct{}),
|
|
reconcileCh: make(chan livekit.TrackID, 50),
|
|
closeCh: make(chan struct{}),
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
|
|
go m.reconcileWorker()
|
|
return m
|
|
}
|
|
|
|
func (m *SubscriptionManager) Close(isExpectedToResume bool) {
|
|
m.lock.Lock()
|
|
if m.isClosed() {
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
close(m.closeCh)
|
|
m.lock.Unlock()
|
|
|
|
<-m.doneCh
|
|
|
|
subTracks := m.GetSubscribedTracks()
|
|
downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks))
|
|
for _, st := range subTracks {
|
|
m.setDesired(st.ID(), false)
|
|
dt := st.DownTrack()
|
|
// nil check exists primarily for tests
|
|
if dt != nil {
|
|
downTracksToClose = append(downTracksToClose, st.DownTrack())
|
|
}
|
|
}
|
|
|
|
if isExpectedToResume {
|
|
for _, dt := range downTracksToClose {
|
|
dt.CloseWithFlush(false)
|
|
}
|
|
} else {
|
|
// flush blocks, so execute in parallel
|
|
for _, dt := range downTracksToClose {
|
|
go dt.CloseWithFlush(true)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) isClosed() bool {
|
|
select {
|
|
case <-m.closeCh:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) {
|
|
if m.params.UseOneShotSignallingMode {
|
|
m.subscribeSynchronous(trackID)
|
|
return
|
|
}
|
|
|
|
sub, desireChanged := m.setDesired(trackID, true)
|
|
if sub == nil {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
|
|
m.lock.Lock()
|
|
m.subscriptions[trackID] = sub
|
|
m.lock.Unlock()
|
|
|
|
sub, desireChanged = m.setDesired(trackID, true)
|
|
}
|
|
if desireChanged {
|
|
sub.logger.Debugw("subscribing to track")
|
|
}
|
|
|
|
// always reconcile, since SubscribeToTrack could be called when the track is ready
|
|
m.queueReconcile(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) {
|
|
if m.params.UseOneShotSignallingMode {
|
|
m.unsubscribeSynchronous(trackID)
|
|
return
|
|
}
|
|
|
|
sub, desireChanged := m.setDesired(trackID, false)
|
|
if sub == nil || !desireChanged {
|
|
return
|
|
}
|
|
|
|
sub.logger.Debugw("unsubscribing from track")
|
|
m.queueReconcile(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
tracks := make([]types.SubscribedTrack, 0, len(m.subscriptions))
|
|
for _, t := range m.subscriptions {
|
|
st := t.getSubscribedTrack()
|
|
if st != nil {
|
|
tracks = append(tracks, st)
|
|
}
|
|
}
|
|
return tracks
|
|
}
|
|
|
|
func (m *SubscriptionManager) IsTrackNameSubscribed(publisherIdentity livekit.ParticipantIdentity, trackName string) bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
for _, s := range m.subscriptions {
|
|
st := s.getSubscribedTrack()
|
|
if st != nil && st.PublisherIdentity() == publisherIdentity && st.MediaTrack() != nil && st.MediaTrack().Name() == trackName {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions))
|
|
for trackID, t := range m.subscriptions {
|
|
st := t.getSubscribedTrack()
|
|
if st != nil {
|
|
dt := st.DownTrack()
|
|
if dt != nil {
|
|
state := dt.StopWriteAndGetState()
|
|
if state.ForwarderState != nil {
|
|
states[trackID] = state.ForwarderState
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return states
|
|
}
|
|
|
|
func (m *SubscriptionManager) HasSubscriptions() bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
for _, s := range m.subscriptions {
|
|
if s.isDesired() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantID {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
return maps.Keys(m.subscribedTo)
|
|
}
|
|
|
|
func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
_, ok := m.subscribedTo[participantID]
|
|
return ok
|
|
}
|
|
|
|
func (m *SubscriptionManager) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) {
|
|
m.lock.Lock()
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
m.subscriptions[trackID] = sub
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
sub.setSettings(settings)
|
|
}
|
|
|
|
// OnSubscribeStatusChanged callback will be notified when a participant subscribes or unsubscribes to another participant
|
|
// it will only fire once per publisher. If current participant is subscribed to multiple tracks from another, this
|
|
// callback will only fire once.
|
|
func (m *SubscriptionManager) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) {
|
|
m.lock.Lock()
|
|
m.onSubscribeStatusChanged = fn
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
func (m *SubscriptionManager) WaitUntilSubscribed(timeout time.Duration) error {
|
|
expiresAt := time.Now().Add(timeout)
|
|
for expiresAt.After(time.Now()) {
|
|
allSubscribed := true
|
|
m.lock.RLock()
|
|
for _, sub := range m.subscriptions {
|
|
if sub.needsSubscribe() {
|
|
allSubscribed = false
|
|
break
|
|
}
|
|
}
|
|
m.lock.RUnlock()
|
|
if allSubscribed {
|
|
return nil
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
return context.DeadlineExceeded
|
|
}
|
|
|
|
func (m *SubscriptionManager) ReconcileAll() {
|
|
m.queueReconcile(trackIDForReconcileSubscriptions)
|
|
}
|
|
|
|
func (m *SubscriptionManager) setDesired(trackID livekit.TrackID, desired bool) (*trackSubscription, bool) {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
return sub, sub.setDesired(desired)
|
|
}
|
|
|
|
func (m *SubscriptionManager) canReconcile() bool {
|
|
p := m.params.Participant
|
|
if m.isClosed() || p.IsClosed() || p.IsDisconnected() {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileSubscriptions() {
|
|
var needsToReconcile []*trackSubscription
|
|
m.lock.RLock()
|
|
for _, sub := range m.subscriptions {
|
|
if sub.needsSubscribe() || sub.needsUnsubscribe() || sub.needsBind() || sub.needsCleanup() {
|
|
needsToReconcile = append(needsToReconcile, sub)
|
|
}
|
|
}
|
|
m.lock.RUnlock()
|
|
|
|
for _, s := range needsToReconcile {
|
|
m.reconcileSubscription(s)
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) {
|
|
if !m.canReconcile() {
|
|
return
|
|
}
|
|
if s.needsSubscribe() {
|
|
if m.pendingUnsubscribes.Load() != 0 && s.durationSinceStart() < maxUnsubscribeWait {
|
|
// enqueue this in a bit, after pending unsubscribes are complete
|
|
go func() {
|
|
time.Sleep(time.Duration(sfu.RTPBlankFramesCloseSeconds * float32(time.Second)))
|
|
m.queueReconcile(s.trackID)
|
|
}()
|
|
return
|
|
}
|
|
|
|
numAttempts := s.getNumAttempts()
|
|
if numAttempts == 0 {
|
|
m.params.Telemetry.TrackSubscribeRequested(
|
|
context.Background(),
|
|
m.params.Participant.ID(),
|
|
&livekit.TrackInfo{
|
|
Sid: string(s.trackID),
|
|
},
|
|
)
|
|
}
|
|
if err := m.subscribe(s); err != nil {
|
|
s.recordAttempt(false)
|
|
|
|
switch err {
|
|
case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrSubscriptionLimitExceeded:
|
|
// these are errors that are outside of our control, so we'll keep trying
|
|
// - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment
|
|
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
|
|
// - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance)
|
|
// - ErrNotOpen: Track is closing or already closed
|
|
// - ErrSubscriptionLimitExceeded: the participant have reached the limit of subscriptions, wait for the other subscription to be unsubscribed
|
|
// We'll still log an event to reflect this in telemetry since it's been too long
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
|
|
}
|
|
case ErrTrackNotFound:
|
|
// source track was never published or closed
|
|
// if after timeout we'd unsubscribe from it.
|
|
// this is the *only* case we'd change desired state
|
|
if s.durationSinceStart() > notFoundTimeout {
|
|
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
|
|
s.logger.Infow("unsubscribing from track after notFoundTimeout", "error", err)
|
|
s.setDesired(false)
|
|
m.queueReconcile(s.trackID)
|
|
m.params.OnSubscriptionError(s.trackID, false, err)
|
|
}
|
|
default:
|
|
// all other errors
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.logger.Warnw("failed to subscribe, triggering error handler", err,
|
|
"attempt", numAttempts,
|
|
)
|
|
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, false)
|
|
m.params.OnSubscriptionError(s.trackID, true, err)
|
|
} else {
|
|
s.logger.Debugw("failed to subscribe, retrying",
|
|
"error", err,
|
|
"attempt", numAttempts,
|
|
)
|
|
}
|
|
}
|
|
} else {
|
|
s.recordAttempt(true)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if s.needsUnsubscribe() {
|
|
if err := m.unsubscribe(s); err != nil {
|
|
s.logger.Warnw("failed to unsubscribe", err)
|
|
}
|
|
// do not remove subscription from map. Wait for subscribed track to close
|
|
// and the callback (handleSubscribedTrackClose) to set the subscribedTrack to nil
|
|
// and the clean up path to handle removing subscription from the subscription map.
|
|
// It is possible that the track is re-published before subscribed track is closed.
|
|
// That could create a new subscription and a duplicate entry in SDP.
|
|
// Waiting for susbcribed track close would ensure that the track is removed from
|
|
// the peer connection before re-published track is re-subscribed and added back to the SDP.
|
|
return
|
|
}
|
|
|
|
if s.needsBind() {
|
|
// check bound status, notify error callback if it's not bound
|
|
// if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go
|
|
// back to needsSubscribe state
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.logger.Warnw("track not bound after timeout", nil)
|
|
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), ErrTrackNotBound, false)
|
|
m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound)
|
|
}
|
|
}
|
|
|
|
m.lock.Lock()
|
|
if s.needsCleanup() {
|
|
s.logger.Debugw("cleanup removing subscription")
|
|
delete(m.subscriptions, s.trackID)
|
|
}
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
// trigger an immediate reconciliation, when trackID is empty, will reconcile all subscriptions
|
|
func (m *SubscriptionManager) queueReconcile(trackID livekit.TrackID) {
|
|
select {
|
|
case m.reconcileCh <- trackID:
|
|
default:
|
|
// queue is full, will reconcile based on timer
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileWorker() {
|
|
reconcileTicker := time.NewTicker(reconcileInterval)
|
|
defer reconcileTicker.Stop()
|
|
defer close(m.doneCh)
|
|
|
|
for {
|
|
select {
|
|
case <-m.closeCh:
|
|
return
|
|
case <-reconcileTicker.C:
|
|
m.reconcileSubscriptions()
|
|
case trackID := <-m.reconcileCh:
|
|
m.lock.Lock()
|
|
s := m.subscriptions[trackID]
|
|
m.lock.Unlock()
|
|
if s != nil {
|
|
m.reconcileSubscription(s)
|
|
} else {
|
|
m.reconcileSubscriptions()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) hasCapacityForSubscription(kind livekit.TrackType) bool {
|
|
switch kind {
|
|
case livekit.TrackType_VIDEO:
|
|
if m.params.SubscriptionLimitVideo > 0 && m.subscribedVideoCount.Load() >= m.params.SubscriptionLimitVideo {
|
|
return false
|
|
}
|
|
|
|
case livekit.TrackType_AUDIO:
|
|
if m.params.SubscriptionLimitAudio > 0 && m.subscribedAudioCount.Load() >= m.params.SubscriptionLimitAudio {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
|
|
s.logger.Debugw("executing subscribe")
|
|
|
|
if !m.params.Participant.CanSubscribe() {
|
|
return ErrNoSubscribePermission
|
|
}
|
|
|
|
if kind, ok := s.getKind(); ok && !m.hasCapacityForSubscription(kind) {
|
|
return ErrSubscriptionLimitExceeded
|
|
}
|
|
|
|
trackID := s.trackID
|
|
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
|
|
s.logger.Debugw("resolved track", "result", res)
|
|
|
|
if res.TrackChangedNotifier != nil && s.setChangedNotifier(res.TrackChangedNotifier) {
|
|
// set callback only when we haven't done it before
|
|
// we set the observer before checking for existence of track, so that we may get notified
|
|
// when the track becomes available
|
|
res.TrackChangedNotifier.AddObserver(string(m.params.Participant.ID()), func() {
|
|
m.queueReconcile(trackID)
|
|
})
|
|
}
|
|
if res.TrackRemovedNotifier != nil && s.setRemovedNotifier(res.TrackRemovedNotifier) {
|
|
res.TrackRemovedNotifier.AddObserver(string(m.params.Participant.ID()), func() {
|
|
// re-resolve the track in case the same track had been re-published
|
|
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
|
|
if res.Track != nil {
|
|
// do not unsubscribe, track is still available
|
|
return
|
|
}
|
|
m.handleSourceTrackRemoved(trackID)
|
|
})
|
|
}
|
|
|
|
track := res.Track
|
|
if track == nil {
|
|
return ErrTrackNotFound
|
|
}
|
|
s.trySetKind(track.Kind())
|
|
if !m.hasCapacityForSubscription(track.Kind()) {
|
|
return ErrSubscriptionLimitExceeded
|
|
}
|
|
|
|
s.setPublisher(res.PublisherIdentity, res.PublisherID)
|
|
|
|
permChanged := s.setHasPermission(res.HasPermission)
|
|
if permChanged {
|
|
m.params.Participant.SubscriptionPermissionUpdate(s.getPublisherID(), trackID, res.HasPermission)
|
|
}
|
|
if !res.HasPermission {
|
|
return ErrNoTrackPermission
|
|
}
|
|
|
|
subTrack, err := track.AddSubscriber(m.params.Participant)
|
|
if err != nil && !errors.Is(err, errAlreadySubscribed) {
|
|
// ignore error(s): already subscribed
|
|
if !utils.ErrorIsOneOf(err, ErrNoReceiver) {
|
|
// as track resolution could take some time, not logging errors due to waiting for track resolution
|
|
m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID)
|
|
}
|
|
return err
|
|
}
|
|
if err == errAlreadySubscribed {
|
|
m.params.Logger.Debugw(
|
|
"already subscribed to track",
|
|
"trackID", trackID,
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
|
|
subTrack.OnClose(func(isExpectedToResume bool) {
|
|
m.handleSubscribedTrackClose(s, isExpectedToResume)
|
|
})
|
|
subTrack.AddOnBind(func(err error) {
|
|
if err != nil {
|
|
s.logger.Infow("failed to bind track", "err", err)
|
|
s.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
|
|
m.UnsubscribeFromTrack(trackID)
|
|
m.params.OnSubscriptionError(trackID, false, err)
|
|
return
|
|
}
|
|
s.setBound()
|
|
s.maybeRecordSuccess(m.params.Telemetry, m.params.Participant.ID())
|
|
})
|
|
s.setSubscribedTrack(subTrack)
|
|
|
|
switch track.Kind() {
|
|
case livekit.TrackType_VIDEO:
|
|
m.subscribedVideoCount.Inc()
|
|
case livekit.TrackType_AUDIO:
|
|
m.subscribedAudioCount.Inc()
|
|
}
|
|
|
|
if subTrack.NeedsNegotiation() {
|
|
m.params.Participant.Negotiate(false)
|
|
}
|
|
|
|
go m.params.OnTrackSubscribed(subTrack)
|
|
|
|
m.params.Logger.Debugw(
|
|
"subscribed to track",
|
|
"trackID", trackID,
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
|
|
// add mark the participant as someone we've subscribed to
|
|
firstSubscribe := false
|
|
publisherID := s.getPublisherID()
|
|
m.lock.Lock()
|
|
pTracks := m.subscribedTo[publisherID]
|
|
changedCB := m.onSubscribeStatusChanged
|
|
if pTracks == nil {
|
|
pTracks = make(map[livekit.TrackID]struct{})
|
|
m.subscribedTo[publisherID] = pTracks
|
|
firstSubscribe = true
|
|
}
|
|
pTracks[trackID] = struct{}{}
|
|
m.lock.Unlock()
|
|
|
|
if changedCB != nil && firstSubscribe {
|
|
changedCB(publisherID, true)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) error {
|
|
m.params.Logger.Debugw("executing subscribe synchronous", "trackID", trackID)
|
|
|
|
if !m.params.Participant.CanSubscribe() {
|
|
return ErrNoSubscribePermission
|
|
}
|
|
|
|
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
|
|
m.params.Logger.Debugw("resolved track", "trackID", trackID, " result", res)
|
|
|
|
track := res.Track
|
|
if track == nil {
|
|
return ErrTrackNotFound
|
|
}
|
|
|
|
m.lock.Lock()
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
m.subscriptions[trackID] = sub
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
subTrack, err := track.AddSubscriber(m.params.Participant)
|
|
if err != nil && !errors.Is(err, errAlreadySubscribed) {
|
|
return err
|
|
}
|
|
if err == errAlreadySubscribed {
|
|
m.params.Logger.Debugw(
|
|
"already subscribed to track",
|
|
"trackID", trackID,
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
|
|
subTrack.OnClose(func(isExpectedToResume bool) {
|
|
m.handleSubscribedTrackClose(sub, isExpectedToResume)
|
|
|
|
m.lock.Lock()
|
|
delete(m.subscriptions, trackID)
|
|
m.lock.Unlock()
|
|
})
|
|
subTrack.AddOnBind(func(err error) {
|
|
if err != nil {
|
|
sub.logger.Infow("failed to bind track", "err", err)
|
|
sub.maybeRecordError(m.params.Telemetry, m.params.Participant.ID(), err, true)
|
|
m.UnsubscribeFromTrack(trackID)
|
|
m.params.OnSubscriptionError(trackID, false, err)
|
|
return
|
|
}
|
|
sub.setBound()
|
|
sub.maybeRecordSuccess(m.params.Telemetry, m.params.Participant.ID())
|
|
})
|
|
sub.setSubscribedTrack(subTrack)
|
|
|
|
switch track.Kind() {
|
|
case livekit.TrackType_VIDEO:
|
|
m.subscribedVideoCount.Inc()
|
|
case livekit.TrackType_AUDIO:
|
|
m.subscribedAudioCount.Inc()
|
|
}
|
|
|
|
go m.params.OnTrackSubscribed(subTrack)
|
|
|
|
m.params.Logger.Debugw(
|
|
"subscribed to track",
|
|
"trackID", trackID,
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error {
|
|
s.logger.Debugw("executing unsubscribe")
|
|
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
// already unsubscribed
|
|
return nil
|
|
}
|
|
|
|
track := subTrack.MediaTrack()
|
|
pID := m.params.Participant.ID()
|
|
m.pendingUnsubscribes.Inc()
|
|
go func() {
|
|
defer m.pendingUnsubscribes.Dec()
|
|
track.RemoveSubscriber(pID, false)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error {
|
|
m.lock.Lock()
|
|
sub := m.subscriptions[trackID]
|
|
delete(m.subscriptions, trackID)
|
|
m.lock.Unlock()
|
|
if sub == nil {
|
|
// already unsubscribed or not subscribed
|
|
return nil
|
|
}
|
|
|
|
sub.logger.Debugw("executing unsubscribe synchronous")
|
|
|
|
subTrack := sub.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
// already unsubscribed
|
|
return nil
|
|
}
|
|
|
|
track := subTrack.MediaTrack()
|
|
track.RemoveSubscriber(m.params.Participant.ID(), false)
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) {
|
|
m.lock.Lock()
|
|
sub := m.subscriptions[trackID]
|
|
m.lock.Unlock()
|
|
|
|
if sub != nil {
|
|
sub.handleSourceTrackRemoved()
|
|
}
|
|
}
|
|
|
|
// DownTrack closing is how the publisher signifies that the subscription is no longer fulfilled
|
|
// this could be due to a few reasons:
|
|
// - subscriber-initiated unsubscribe
|
|
// - UpTrack was closed
|
|
// - publisher revoked permissions for the participant
|
|
func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, isExpectedToResume bool) {
|
|
s.logger.Debugw(
|
|
"subscribed track closed",
|
|
"isExpectedToResume", isExpectedToResume,
|
|
)
|
|
wasBound := s.isBound()
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
return
|
|
}
|
|
s.setSubscribedTrack(nil)
|
|
|
|
var relieveFromLimits bool
|
|
switch subTrack.MediaTrack().Kind() {
|
|
case livekit.TrackType_VIDEO:
|
|
videoCount := m.subscribedVideoCount.Dec()
|
|
relieveFromLimits = m.params.SubscriptionLimitVideo > 0 && videoCount == m.params.SubscriptionLimitVideo-1
|
|
case livekit.TrackType_AUDIO:
|
|
audioCount := m.subscribedAudioCount.Dec()
|
|
relieveFromLimits = m.params.SubscriptionLimitAudio > 0 && audioCount == m.params.SubscriptionLimitAudio-1
|
|
}
|
|
|
|
// remove from subscribedTo
|
|
publisherID := s.getPublisherID()
|
|
lastSubscription := false
|
|
m.lock.Lock()
|
|
changedCB := m.onSubscribeStatusChanged
|
|
pTracks := m.subscribedTo[publisherID]
|
|
if pTracks != nil {
|
|
delete(pTracks, s.trackID)
|
|
if len(pTracks) == 0 {
|
|
delete(m.subscribedTo, publisherID)
|
|
lastSubscription = true
|
|
}
|
|
}
|
|
m.lock.Unlock()
|
|
if changedCB != nil && lastSubscription {
|
|
go changedCB(publisherID, false)
|
|
}
|
|
|
|
go m.params.OnTrackUnsubscribed(subTrack)
|
|
|
|
// trigger to decrement unsubscribed counter as long as track has been bound
|
|
// Only log an analytics event when
|
|
// * the participant isn't closing
|
|
// * it's not a migration
|
|
if wasBound {
|
|
m.params.Telemetry.TrackUnsubscribed(
|
|
context.Background(),
|
|
m.params.Participant.ID(),
|
|
&livekit.TrackInfo{Sid: string(s.trackID), Type: subTrack.MediaTrack().Kind()},
|
|
!isExpectedToResume,
|
|
)
|
|
|
|
dt := subTrack.DownTrack()
|
|
if dt != nil {
|
|
stats := dt.GetTrackStats()
|
|
if stats != nil {
|
|
m.params.Telemetry.TrackSubscribeRTPStats(
|
|
context.Background(),
|
|
m.params.Participant.ID(),
|
|
s.trackID,
|
|
dt.Codec().MimeType,
|
|
stats,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !isExpectedToResume {
|
|
sender := subTrack.RTPSender()
|
|
if sender != nil {
|
|
s.logger.Debugw("removing PeerConnection track",
|
|
"kind", subTrack.MediaTrack().Kind(),
|
|
)
|
|
|
|
if err := m.params.Participant.RemoveTrackLocal(sender); err != nil {
|
|
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
|
|
// most of these are safe to ignore, since the track state might have already
|
|
// been set to Inactive
|
|
m.params.Logger.Debugw("could not remove remoteTrack from forwarder",
|
|
"error", err,
|
|
"publisher", subTrack.PublisherIdentity(),
|
|
"publisherID", subTrack.PublisherID(),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
m.params.Participant.Negotiate(false)
|
|
} else {
|
|
t := time.Now()
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
if !m.params.UseOneShotSignallingMode {
|
|
if relieveFromLimits {
|
|
m.queueReconcile(trackIDForReconcileSubscriptions)
|
|
} else {
|
|
m.queueReconcile(s.trackID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
|
|
type trackSubscription struct {
|
|
subscriberID livekit.ParticipantID
|
|
trackID livekit.TrackID
|
|
logger logger.Logger
|
|
|
|
lock sync.RWMutex
|
|
desired bool
|
|
publisherID livekit.ParticipantID
|
|
publisherIdentity livekit.ParticipantIdentity
|
|
settings *livekit.UpdateTrackSettings
|
|
changedNotifier types.ChangeNotifier
|
|
removedNotifier types.ChangeNotifier
|
|
hasPermissionInitialized bool
|
|
hasPermission bool
|
|
subscribedTrack types.SubscribedTrack
|
|
eventSent atomic.Bool
|
|
numAttempts atomic.Int32
|
|
bound bool
|
|
kind atomic.Pointer[livekit.TrackType]
|
|
|
|
// the later of when subscription was requested OR when the first failure was encountered OR when permission is granted
|
|
// this timestamp determines when failures are reported
|
|
subStartedAt atomic.Pointer[time.Time]
|
|
|
|
// the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume
|
|
subscribeAt atomic.Pointer[time.Time]
|
|
succRecordCounter atomic.Int32
|
|
}
|
|
|
|
func newTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *trackSubscription {
|
|
s := &trackSubscription{
|
|
subscriberID: subscriberID,
|
|
trackID: trackID,
|
|
logger: l,
|
|
}
|
|
t := time.Now()
|
|
s.subscribeAt.Store(&t)
|
|
return s
|
|
}
|
|
|
|
func (s *trackSubscription) setPublisher(publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.publisherID = publisherID
|
|
s.publisherIdentity = publisherIdentity
|
|
}
|
|
|
|
func (s *trackSubscription) getPublisherID() livekit.ParticipantID {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.publisherID
|
|
}
|
|
|
|
func (s *trackSubscription) setDesired(desired bool) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
if desired {
|
|
// as long as user explicitly set it to desired
|
|
// we'll reset the timer so it has sufficient time to reconcile
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
|
|
if s.desired == desired {
|
|
return false
|
|
}
|
|
s.desired = desired
|
|
|
|
// when no longer desired, we no longer care about change notifications
|
|
if desired {
|
|
// reset attempts
|
|
s.numAttempts.Store(0)
|
|
} else {
|
|
s.setChangedNotifierLocked(nil)
|
|
s.setRemovedNotifierLocked(nil)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// set permission and return true if it has changed
|
|
func (s *trackSubscription) setHasPermission(perm bool) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
if s.hasPermissionInitialized && s.hasPermission == perm {
|
|
return false
|
|
}
|
|
|
|
s.hasPermissionInitialized = true
|
|
s.hasPermission = perm
|
|
if s.hasPermission {
|
|
// when permission is granted, reset the timer so it has sufficient time to reconcile
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) getHasPermission() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.hasPermission
|
|
}
|
|
|
|
func (s *trackSubscription) isDesired() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired
|
|
}
|
|
|
|
func (s *trackSubscription) setSubscribedTrack(track types.SubscribedTrack) {
|
|
s.lock.Lock()
|
|
oldTrack := s.subscribedTrack
|
|
s.subscribedTrack = track
|
|
s.bound = false
|
|
settings := s.settings
|
|
s.lock.Unlock()
|
|
|
|
if settings != nil && track != nil {
|
|
s.logger.Debugw("restoring subscriber settings", "settings", logger.Proto(settings))
|
|
track.UpdateSubscriberSettings(settings, true)
|
|
}
|
|
if oldTrack != nil {
|
|
oldTrack.OnClose(nil)
|
|
}
|
|
}
|
|
|
|
func (s *trackSubscription) trySetKind(kind livekit.TrackType) {
|
|
s.kind.CompareAndSwap(nil, &kind)
|
|
}
|
|
|
|
func (s *trackSubscription) getKind() (livekit.TrackType, bool) {
|
|
kind := s.kind.Load()
|
|
if kind == nil {
|
|
return livekit.TrackType_AUDIO, false
|
|
}
|
|
return *kind, true
|
|
}
|
|
|
|
func (s *trackSubscription) getSubscribedTrack() types.SubscribedTrack {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.subscribedTrack
|
|
}
|
|
|
|
func (s *trackSubscription) setChangedNotifier(notifier types.ChangeNotifier) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.setChangedNotifierLocked(notifier)
|
|
}
|
|
|
|
func (s *trackSubscription) setChangedNotifierLocked(notifier types.ChangeNotifier) bool {
|
|
if s.changedNotifier == notifier {
|
|
return false
|
|
}
|
|
|
|
existing := s.changedNotifier
|
|
s.changedNotifier = notifier
|
|
|
|
if existing != nil {
|
|
go existing.RemoveObserver(string(s.subscriberID))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) setRemovedNotifier(notifier types.ChangeNotifier) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.setRemovedNotifierLocked(notifier)
|
|
}
|
|
|
|
func (s *trackSubscription) setRemovedNotifierLocked(notifier types.ChangeNotifier) bool {
|
|
if s.removedNotifier == notifier {
|
|
return false
|
|
}
|
|
|
|
existing := s.removedNotifier
|
|
s.removedNotifier = notifier
|
|
|
|
if existing != nil {
|
|
go existing.RemoveObserver(string(s.subscriberID))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) setSettings(settings *livekit.UpdateTrackSettings) {
|
|
s.lock.Lock()
|
|
s.settings = settings
|
|
subTrack := s.subscribedTrack
|
|
s.lock.Unlock()
|
|
if subTrack != nil {
|
|
subTrack.UpdateSubscriberSettings(settings, false)
|
|
}
|
|
}
|
|
|
|
// mark the subscription as bound - when we've received the client's answer
|
|
func (s *trackSubscription) setBound() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
s.bound = true
|
|
}
|
|
|
|
func (s *trackSubscription) isBound() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.bound
|
|
}
|
|
|
|
func (s *trackSubscription) recordAttempt(success bool) {
|
|
if !success {
|
|
if s.numAttempts.Load() == 0 {
|
|
// on first failure, we'd want to start the timer
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
}
|
|
s.numAttempts.Add(1)
|
|
} else {
|
|
s.numAttempts.Store(0)
|
|
}
|
|
}
|
|
|
|
func (s *trackSubscription) getNumAttempts() int32 {
|
|
return s.numAttempts.Load()
|
|
}
|
|
|
|
func (s *trackSubscription) handleSourceTrackRemoved() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
startedAt := s.subStartedAt.Load()
|
|
if startedAt == nil || time.Since(*startedAt) < trackRemoveGracePeriod {
|
|
// to prevent race conditions, if we've recently been asked to subscribe to a track
|
|
// ignore when source was removed. reconciler will take care of it eventually
|
|
// this would address the case when a track was unpublished and republished immediately
|
|
// it's possible for another caller to call setDesired(true) for the republished track before
|
|
// handleSourceTrackRemoved is called on the previously unpublished track
|
|
return
|
|
}
|
|
|
|
// source track removed, we would unsubscribe
|
|
s.logger.Debugw("unsubscribing from track since source track was removed")
|
|
s.desired = false
|
|
|
|
s.setChangedNotifierLocked(nil)
|
|
s.setRemovedNotifierLocked(nil)
|
|
}
|
|
|
|
func (s *trackSubscription) maybeRecordError(ts telemetry.TelemetryService, pID livekit.ParticipantID, err error, isUserError bool) {
|
|
if s.eventSent.Swap(true) {
|
|
return
|
|
}
|
|
|
|
ts.TrackSubscribeFailed(context.Background(), pID, s.trackID, err, isUserError)
|
|
}
|
|
|
|
func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pID livekit.ParticipantID) {
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
return
|
|
}
|
|
mediaTrack := subTrack.MediaTrack()
|
|
if mediaTrack == nil {
|
|
return
|
|
}
|
|
|
|
d := time.Since(*s.subscribeAt.Load())
|
|
s.logger.Debugw("track subscribed", "cost", d.Milliseconds())
|
|
subscriber := subTrack.Subscriber()
|
|
prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc()))
|
|
|
|
eventSent := s.eventSent.Swap(true)
|
|
|
|
pi := &livekit.ParticipantInfo{
|
|
Identity: string(subTrack.PublisherIdentity()),
|
|
Sid: string(subTrack.PublisherID()),
|
|
}
|
|
ts.TrackSubscribed(context.Background(), pID, mediaTrack.ToProto(), pi, !eventSent)
|
|
}
|
|
|
|
func (s *trackSubscription) durationSinceStart() time.Duration {
|
|
t := s.subStartedAt.Load()
|
|
if t == nil {
|
|
return 0
|
|
}
|
|
return time.Since(*t)
|
|
}
|
|
|
|
func (s *trackSubscription) needsSubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired && s.subscribedTrack == nil
|
|
}
|
|
|
|
func (s *trackSubscription) needsUnsubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.subscribedTrack != nil
|
|
}
|
|
|
|
func (s *trackSubscription) needsBind() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired && s.subscribedTrack != nil && !s.bound
|
|
}
|
|
|
|
func (s *trackSubscription) needsCleanup() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.subscribedTrack == nil
|
|
}
|