Files
livekit/pkg/rtc/subscriptionmanager.go
Raja Subramanian b81bac0ec3
Some checks failed
Test / test (push) Failing after 17s
Release to Docker / docker (push) Failing after 3m42s
Key telemetry stats worker using combination of roomID, participantID (#4323)
* Key telemetry stats work using combination of roomID, participantID

With forwarded participant, the same participantID can existing in two
rooms.

NOTE: This does not yet allow a participant session to report its
events/track stats into multiple rooms. That would require regitering
multiple listeners (from rooms a participant is forwarded to).

* missed file

* data channel stats

* PR comments + pass in room name so that telemetry events have proper room name also
2026-02-16 13:56:13 +05:30

1574 lines
42 KiB
Go

/*
* Copyright 2023 LiveKit, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rtc
import (
"context"
"errors"
"sync"
"time"
"github.com/pion/webrtc/v4/pkg/rtcerr"
"go.uber.org/atomic"
"golang.org/x/exp/maps"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
// using var instead of const to override in tests
var (
reconcileInterval = 3 * time.Second
// amount of time to give up if a track or publisher isn't found
// ensuring this is longer than iceFailedTimeout so we are certain the participant won't return
notFoundTimeout = time.Minute
// amount of time to try otherwise before flagging subscription as failed
subscriptionTimeout = iceFailedTimeoutTotal
maxUnsubscribeWait = time.Second
)
const (
trackIDForReconcileSubscriptions = livekit.TrackID("subscriptions_reconcile")
)
type SubscriptionManagerParams struct {
Logger logger.Logger
Participant types.LocalParticipant
TrackResolver types.MediaTrackResolver
OnTrackSubscribed func(subTrack types.SubscribedTrack)
OnTrackUnsubscribed func(subTrack types.SubscribedTrack)
OnSubscriptionError func(trackID livekit.TrackID, fatal bool, err error)
TelemetryListener types.ParticipantTelemetryListener
SubscriptionLimitVideo, SubscriptionLimitAudio int32
DataTrackResolver types.DataTrackResolver
UseOneShotSignallingMode bool
}
// SubscriptionManager manages a participant's subscriptions
type SubscriptionManager struct {
params SubscriptionManagerParams
lock sync.RWMutex
subscriptions map[livekit.TrackID]*mediaTrackSubscription
pendingUnsubscribes atomic.Int32
subscribedVideoCount, subscribedAudioCount atomic.Int32
subscribedTo map[livekit.ParticipantID]map[livekit.TrackID]struct{}
reconcileCh chan livekit.TrackID
reconcileDataTrackCh chan livekit.TrackID
closeCh chan struct{}
doneCh chan struct{}
onSubscribeStatusChanged func(publisherID livekit.ParticipantID, subscribed bool)
dataTrackSubscriptions map[livekit.TrackID]*dataTrackSubscription
}
func NewSubscriptionManager(params SubscriptionManagerParams) *SubscriptionManager {
m := &SubscriptionManager{
params: params,
subscriptions: make(map[livekit.TrackID]*mediaTrackSubscription),
subscribedTo: make(map[livekit.ParticipantID]map[livekit.TrackID]struct{}),
reconcileCh: make(chan livekit.TrackID, 50),
reconcileDataTrackCh: make(chan livekit.TrackID, 5),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
dataTrackSubscriptions: make(map[livekit.TrackID]*dataTrackSubscription),
}
go m.reconcileWorker()
return m
}
func (m *SubscriptionManager) Close(isExpectedToResume bool) {
m.lock.Lock()
if m.isClosed() {
m.lock.Unlock()
return
}
close(m.closeCh)
m.lock.Unlock()
<-m.doneCh
prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations()))
subTracks := m.GetSubscribedTracks()
downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks))
for _, st := range subTracks {
m.setDesired(st.ID(), false)
dt := st.DownTrack()
// nil check exists primarily for tests
if dt != nil {
downTracksToClose = append(downTracksToClose, st.DownTrack())
}
}
if isExpectedToResume {
for _, dt := range downTracksToClose {
dt.CloseWithFlush(false, true)
}
} else {
// flush blocks, so execute in parallel
for _, dt := range downTracksToClose {
go dt.CloseWithFlush(true, true)
}
}
m.lock.Lock()
for _, sub := range m.dataTrackSubscriptions {
dataDownTrack := sub.getDataDownTrack()
if dataDownTrack == nil {
// already unsubscribed
continue
}
dataTrack := dataDownTrack.PublishDataTrack()
if dataTrack == nil {
continue
}
dataTrack.RemoveSubscriber(sub.subscriberID)
}
m.lock.Unlock()
m.notifyDataTrackSubscriberHandles()
}
func (m *SubscriptionManager) isClosed() bool {
select {
case <-m.closeCh:
return true
default:
return false
}
}
func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID, isSync bool) {
if m.params.UseOneShotSignallingMode || isSync {
m.subscribeSynchronous(trackID)
return
}
sub, desireChanged := m.setDesired(trackID, true)
if sub == nil {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.lock.Lock()
m.subscriptions[trackID] = sub
m.lock.Unlock()
sub, desireChanged = m.setDesired(trackID, true)
}
if desireChanged {
sub.logger.Debugw("subscribing to track")
}
// always reconcile, since SubscribeToTrack could be called when the track is ready
m.queueReconcile(trackID)
}
func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) {
if m.params.UseOneShotSignallingMode {
m.unsubscribeSynchronous(trackID)
return
}
sub, desireChanged := m.setDesired(trackID, false)
if sub == nil || !desireChanged {
return
}
if sub.isCanceled() {
prometheus.RecordTrackSubscribeCancels(1)
}
sub.logger.Debugw("unsubscribing from track")
m.queueReconcile(trackID)
}
func (m *SubscriptionManager) SubscribeToDataTrack(trackID livekit.TrackID) {
sub, desireChanged := m.setDataTrackDesired(trackID, true)
if sub == nil {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newDataTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.lock.Lock()
m.dataTrackSubscriptions[trackID] = sub
m.lock.Unlock()
sub, desireChanged = m.setDataTrackDesired(trackID, true)
}
if desireChanged {
sub.logger.Debugw("subscribing to data track")
}
m.queueReconcileDataTrack(trackID)
}
func (m *SubscriptionManager) UnsubscribeFromDataTrack(trackID livekit.TrackID) {
sub, desireChanged := m.setDataTrackDesired(trackID, false)
if sub == nil || !desireChanged {
return
}
sub.logger.Debugw("unsubscribing from data track")
m.queueReconcileDataTrack(trackID)
}
func (m *SubscriptionManager) ClearAllSubscriptions() {
m.params.Logger.Debugw("clearing all subscriptions")
if m.params.UseOneShotSignallingMode {
for _, track := range m.GetSubscribedTracks() {
m.unsubscribeSynchronous(track.ID())
}
// no synchronous data tracks
}
numCancellations := 0
m.lock.RLock()
for _, sub := range m.subscriptions {
if sub.isCanceled() {
numCancellations++
}
sub.setDesired(false)
}
for _, sub := range m.dataTrackSubscriptions {
sub.setDesired(false)
}
m.lock.RUnlock()
prometheus.RecordTrackSubscribeCancels(int32(numCancellations))
m.ReconcileAll()
}
func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack {
m.lock.RLock()
defer m.lock.RUnlock()
tracks := make([]types.SubscribedTrack, 0, len(m.subscriptions))
for _, t := range m.subscriptions {
st := t.getSubscribedTrack()
if st != nil {
tracks = append(tracks, st)
}
}
return tracks
}
func (m *SubscriptionManager) IsTrackNameSubscribed(publisherIdentity livekit.ParticipantIdentity, trackName string) bool {
m.lock.RLock()
defer m.lock.RUnlock()
for _, s := range m.subscriptions {
st := s.getSubscribedTrack()
if st != nil && st.PublisherIdentity() == publisherIdentity && st.MediaTrack() != nil && st.MediaTrack().Name() == trackName {
return true
}
}
return false
}
func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
m.lock.RLock()
defer m.lock.RUnlock()
states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions))
for trackID, t := range m.subscriptions {
st := t.getSubscribedTrack()
if st != nil {
dt := st.DownTrack()
if dt != nil {
state := dt.StopWriteAndGetState()
if state.ForwarderState != nil {
states[trackID] = state.ForwarderState
}
}
}
}
return states
}
func (m *SubscriptionManager) HasSubscriptions() bool {
m.lock.RLock()
defer m.lock.RUnlock()
for _, s := range m.subscriptions {
if s.isDesired() {
return true
}
}
return false
}
func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantID {
m.lock.RLock()
defer m.lock.RUnlock()
return maps.Keys(m.subscribedTo)
}
func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool {
m.lock.RLock()
defer m.lock.RUnlock()
_, ok := m.subscribedTo[participantID]
return ok
}
func (m *SubscriptionManager) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) {
m.lock.Lock()
sub, ok := m.subscriptions[trackID]
if !ok {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.subscriptions[trackID] = sub
}
m.lock.Unlock()
sub.setSettings(settings)
}
func (m *SubscriptionManager) UpdateDataTrackSubscriptionOptions(trackID livekit.TrackID, subscriptionOptions *livekit.DataTrackSubscriptionOptions) {
m.lock.Lock()
sub, ok := m.dataTrackSubscriptions[trackID]
if !ok {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newDataTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.dataTrackSubscriptions[trackID] = sub
}
m.lock.Unlock()
sub.setSubscriptionOptions(subscriptionOptions)
}
// OnSubscribeStatusChanged callback will be notified when a participant subscribes or unsubscribes to another participant
// it will only fire once per publisher. If current participant is subscribed to multiple tracks from another, this
// callback will only fire once.
func (m *SubscriptionManager) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) {
m.lock.Lock()
m.onSubscribeStatusChanged = fn
m.lock.Unlock()
}
func (m *SubscriptionManager) WaitUntilSubscribed(timeout time.Duration) error {
expiresAt := time.Now().Add(timeout)
for expiresAt.After(time.Now()) {
allSubscribed := true
m.lock.RLock()
for _, sub := range m.subscriptions {
if sub.needsSubscribe() {
allSubscribed = false
break
}
}
m.lock.RUnlock()
if allSubscribed {
return nil
}
time.Sleep(50 * time.Millisecond)
}
return context.DeadlineExceeded
}
func (m *SubscriptionManager) ReconcileAll() {
m.queueReconcile(trackIDForReconcileSubscriptions)
m.queueReconcileDataTrack(trackIDForReconcileSubscriptions)
}
func (m *SubscriptionManager) setDesired(trackID livekit.TrackID, desired bool) (*mediaTrackSubscription, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
sub, ok := m.subscriptions[trackID]
if !ok {
return nil, false
}
return sub, sub.setDesired(desired)
}
func (m *SubscriptionManager) setDataTrackDesired(trackID livekit.TrackID, desired bool) (*dataTrackSubscription, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
sub, ok := m.dataTrackSubscriptions[trackID]
if !ok {
return nil, false
}
return sub, sub.setDesired(desired)
}
func (m *SubscriptionManager) canReconcile() bool {
p := m.params.Participant
if m.isClosed() || p.IsClosed() || p.IsDisconnected() {
return false
}
return true
}
func (m *SubscriptionManager) reconcileSubscriptions() {
var needsToReconcile []*mediaTrackSubscription
m.lock.RLock()
for _, sub := range m.subscriptions {
if sub.needsSubscribe() || sub.needsUnsubscribe() || sub.needsBind() || sub.needsCleanup() {
needsToReconcile = append(needsToReconcile, sub)
}
}
m.lock.RUnlock()
for _, s := range needsToReconcile {
m.reconcileSubscription(s)
}
}
func (m *SubscriptionManager) reconcileSubscription(s *mediaTrackSubscription) {
if !m.canReconcile() {
return
}
if s.needsSubscribe() {
if m.pendingUnsubscribes.Load() != 0 && s.durationSinceStart() < maxUnsubscribeWait {
// enqueue this in a bit, after pending unsubscribes are complete
go func() {
time.Sleep(time.Duration(sfu.RTPBlankFramesCloseSeconds * float32(time.Second)))
m.queueReconcile(s.trackID)
}()
return
}
numAttempts := s.getNumAttempts()
if numAttempts == 0 {
m.params.TelemetryListener.OnTrackSubscribeRequested(
s.subscriberID,
&livekit.TrackInfo{
Sid: string(s.trackID),
},
)
}
if err := m.subscribe(s); err != nil {
s.recordAttempt(false)
switch err {
case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrSubscriptionLimitExceeded:
// these are errors that are outside of our control, so we'll keep trying
// - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
// - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance)
// - ErrNotOpen: Track is closing or already closed
// - ErrSubscriptionLimitExceeded: the participant have reached the limit of subscriptions, wait for the other subscription to be unsubscribed
// We'll still log an event to reflect this in telemetry since it's been too long
if s.durationSinceStart() > subscriptionTimeout {
s.maybeRecordError(m.params.TelemetryListener, err, true)
}
case ErrTrackNotFound:
// source track was never published or closed
// if after timeout we'd unsubscribe from it.
// this is the *only* case we'd change desired state
if s.durationSinceStart() > notFoundTimeout {
s.maybeRecordError(m.params.TelemetryListener, err, true)
s.logger.Infow("unsubscribing from track after notFoundTimeout", "error", err)
s.setDesired(false)
m.queueReconcile(s.trackID)
m.params.OnSubscriptionError(s.trackID, false, err)
}
default:
// all other errors
if s.durationSinceStart() > subscriptionTimeout {
s.logger.Warnw(
"failed to subscribe, triggering error handler", err,
"attempt", s.getNumAttempts(),
)
s.maybeRecordError(m.params.TelemetryListener, err, false)
m.params.OnSubscriptionError(s.trackID, true, err)
} else {
s.logger.Debugw(
"failed to subscribe, retrying",
"error", err,
"attempt", s.getNumAttempts(),
)
}
}
} else {
s.recordAttempt(true)
}
return
}
if s.needsUnsubscribe() {
if err := m.unsubscribe(s); err != nil {
s.logger.Warnw("failed to unsubscribe", err)
}
// do not remove subscription from map. Wait for subscribed track to close
// and the callback (handleSubscribedTrackClose) to set the subscribedTrack to nil
// and the clean up path to handle removing subscription from the subscription map.
// It is possible that the track is re-published before subscribed track is closed.
// That could create a new subscription and a duplicate entry in SDP.
// Waiting for subscribed track close would ensure that the track is removed from
// the peer connection before re-published track is re-subscribed and added back to the SDP.
return
}
if s.needsBind() {
// check bound status, notify error callback if it's not bound
// if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go
// back to needsSubscribe state
if activeAt := m.params.Participant.ActiveAt(); !activeAt.IsZero() {
wait := min(time.Since(activeAt), s.durationSinceStart())
if wait > subscriptionTimeout {
s.logger.Warnw("track not bound after timeout", nil)
s.maybeRecordError(m.params.TelemetryListener, ErrTrackNotBound, false)
m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound)
}
}
}
m.lock.Lock()
if s.needsCleanup() {
s.logger.Debugw("cleanup removing subscription")
delete(m.subscriptions, s.trackID)
}
m.lock.Unlock()
}
func (m *SubscriptionManager) reconcileDataTrackSubscriptions() {
var needsToReconcile []*dataTrackSubscription
m.lock.RLock()
for _, sub := range m.dataTrackSubscriptions {
if sub.needsSubscribe() || sub.needsUnsubscribe() {
needsToReconcile = append(needsToReconcile, sub)
}
}
m.lock.RUnlock()
for _, s := range needsToReconcile {
m.reconcileDataTrackSubscription(s)
}
}
func (m *SubscriptionManager) reconcileDataTrackSubscription(s *dataTrackSubscription) {
if !m.canReconcile() {
return
}
if s.needsSubscribe() {
if err := m.subscribeDataTrack(s); err != nil {
s.recordAttempt(false)
switch err {
case ErrNoSubscribePermission:
// these are errors that are outside of our control, so we'll keep trying
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
case ErrTrackNotFound:
// source track was never published or closed
// if after timeout we'd unsubscribe from it.
// this is the *only* case we'd change desired state
if s.durationSinceStart() > notFoundTimeout {
s.logger.Infow("unsubscribing from data track after notFoundTimeout", "error", err)
s.setDesired(false)
m.queueReconcile(s.trackID)
}
default:
// all other errors
if s.durationSinceStart() > subscriptionTimeout {
s.logger.Warnw(
"failed to subscribe, triggering error handler", err,
"attempt", s.getNumAttempts(),
)
} else {
s.logger.Debugw(
"failed to subscribe, retrying",
"error", err,
"attempt", s.getNumAttempts(),
)
}
}
} else {
s.recordAttempt(true)
m.notifyDataTrackSubscriberHandles()
}
return
}
if s.needsUnsubscribe() {
if err := m.unsubscribeDataTrack(s); err != nil {
s.logger.Warnw("failed to unsubscribe", err)
}
m.lock.Lock()
delete(m.dataTrackSubscriptions, s.trackID)
m.lock.Unlock()
m.notifyDataTrackSubscriberHandles()
return
}
m.lock.Lock()
if s.needsCleanup() {
s.logger.Debugw("cleanup removing data track subscription")
delete(m.dataTrackSubscriptions, s.trackID)
m.notifyDataTrackSubscriberHandles()
}
m.lock.Unlock()
}
// trigger an immediate reconciliation, when trackID is empty, will reconcile all subscriptions
func (m *SubscriptionManager) queueReconcile(trackID livekit.TrackID) {
select {
case m.reconcileCh <- trackID:
default:
// queue is full, will reconcile based on timer
}
}
func (m *SubscriptionManager) queueReconcileDataTrack(trackID livekit.TrackID) {
select {
case m.reconcileDataTrackCh <- trackID:
default:
// queue is full, will reconcile based on timer
}
}
func (m *SubscriptionManager) reconcileWorker() {
defer close(m.doneCh)
reconcileTicker := time.NewTicker(reconcileInterval)
defer reconcileTicker.Stop()
for {
select {
case <-m.closeCh:
return
case <-reconcileTicker.C:
m.reconcileSubscriptions()
m.reconcileDataTrackSubscriptions()
case trackID := <-m.reconcileCh:
m.lock.Lock()
s := m.subscriptions[trackID]
m.lock.Unlock()
if s != nil {
m.reconcileSubscription(s)
} else {
m.reconcileSubscriptions()
}
case trackID := <-m.reconcileDataTrackCh:
m.lock.Lock()
s := m.dataTrackSubscriptions[trackID]
m.lock.Unlock()
if s != nil {
m.reconcileDataTrackSubscription(s)
} else {
m.reconcileDataTrackSubscriptions()
}
}
}
}
func (m *SubscriptionManager) hasCapacityForSubscription(kind livekit.TrackType) bool {
switch kind {
case livekit.TrackType_VIDEO:
if m.params.SubscriptionLimitVideo > 0 && m.subscribedVideoCount.Load() >= m.params.SubscriptionLimitVideo {
return false
}
case livekit.TrackType_AUDIO:
if m.params.SubscriptionLimitAudio > 0 && m.subscribedAudioCount.Load() >= m.params.SubscriptionLimitAudio {
return false
}
}
return true
}
func (m *SubscriptionManager) subscribe(sub *mediaTrackSubscription) error {
sub.logger.Debugw("executing subscribe")
if !m.params.Participant.CanSubscribe() {
return ErrNoSubscribePermission
}
if kind, ok := sub.getKind(); ok && !m.hasCapacityForSubscription(kind) {
return ErrSubscriptionLimitExceeded
}
trackID := sub.trackID
res := m.params.TrackResolver(m.params.Participant, trackID)
sub.logger.Debugw("resolved track", "result", res)
if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) {
// set callback only when we haven't done it before
// we set the observer before checking for existence of track, so that we may get notified
// when the track becomes available
res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() {
m.queueReconcile(trackID)
})
}
if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) {
res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() {
// re-resolve the track in case the same track had been re-published
res := m.params.TrackResolver(m.params.Participant, trackID)
if res.Track != nil {
// do not unsubscribe, track is still available
return
}
m.handleSourceTrackRemoved(trackID)
})
}
track := res.Track
if track == nil {
return ErrTrackNotFound
}
sub.trySetKind(track.Kind())
if !m.hasCapacityForSubscription(track.Kind()) {
return ErrSubscriptionLimitExceeded
}
sub.setPublisher(res.PublisherIdentity, res.PublisherID)
permChanged := sub.setHasPermission(res.HasPermission)
if permChanged {
m.params.Participant.SendSubscriptionPermissionUpdate(sub.getPublisherID(), trackID, res.HasPermission)
}
if !res.HasPermission {
return ErrNoTrackPermission
}
if err := m.addSubscriber(sub, track, false); err != nil {
return err
}
m.markSubscribedTo(sub.getPublisherID(), trackID)
return nil
}
func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) error {
m.params.Logger.Debugw("executing subscribe synchronous", "trackID", trackID)
if !m.params.Participant.CanSubscribe() {
return ErrNoSubscribePermission
}
res := m.params.TrackResolver(m.params.Participant, trackID)
m.params.Logger.Debugw("resolved track", "trackID", trackID, " result", res)
track := res.Track
if track == nil {
return ErrTrackNotFound
}
m.lock.Lock()
sub, ok := m.subscriptions[trackID]
if !ok {
sLogger := m.params.Logger.WithValues(
"trackID", trackID,
)
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
m.subscriptions[trackID] = sub
}
m.lock.Unlock()
sub.setDesired(true)
return m.addSubscriber(sub, track, true)
}
func (m *SubscriptionManager) addSubscriber(sub *mediaTrackSubscription, track types.MediaTrack, isSync bool) error {
trackID := track.ID()
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
// ignore error(s): already subscribed
if !utils.ErrorIsOneOf(err, ErrNoReceiver) {
// as track resolution could take some time, not logging errors due to waiting for track resolution
m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID)
}
return err
}
if err == errAlreadySubscribed {
sub.logger.Debugw(
"already subscribed to track",
"subscribedAudioCount", m.subscribedAudioCount.Load(),
"subscribedVideoCount", m.subscribedVideoCount.Load(),
)
}
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
subTrack.OnClose(func(isExpectedToResume bool) {
m.handleSubscribedTrackClose(sub, isExpectedToResume)
if isSync {
m.lock.Lock()
delete(m.subscriptions, trackID)
m.lock.Unlock()
}
})
subTrack.AddOnBind(func(err error) {
if err != nil {
sub.logger.Infow("failed to bind track", "err", err)
sub.maybeRecordError(m.params.TelemetryListener, err, true)
m.UnsubscribeFromTrack(trackID)
m.params.OnSubscriptionError(trackID, false, err)
return
}
sub.setBound()
sub.maybeRecordSuccess(m.params.TelemetryListener)
})
sub.setSubscribedTrack(subTrack)
switch track.Kind() {
case livekit.TrackType_VIDEO:
m.subscribedVideoCount.Inc()
case livekit.TrackType_AUDIO:
m.subscribedAudioCount.Inc()
}
if !isSync && subTrack.NeedsNegotiation() {
m.params.Participant.Negotiate(false)
}
go m.params.OnTrackSubscribed(subTrack)
sub.logger.Debugw(
"subscribed to track",
"subscribedAudioCount", m.subscribedAudioCount.Load(),
"subscribedVideoCount", m.subscribedVideoCount.Load(),
)
}
return nil
}
func (m *SubscriptionManager) unsubscribe(s *mediaTrackSubscription) error {
s.logger.Debugw("executing unsubscribe")
subTrack := s.getSubscribedTrack()
if subTrack == nil {
// already unsubscribed
return nil
}
track := subTrack.MediaTrack()
pID := s.subscriberID
m.pendingUnsubscribes.Inc()
go func() {
defer m.pendingUnsubscribes.Dec()
track.RemoveSubscriber(pID, false)
}()
return nil
}
func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error {
m.lock.Lock()
sub := m.subscriptions[trackID]
delete(m.subscriptions, trackID)
m.lock.Unlock()
if sub == nil {
// already unsubscribed or not subscribed
return nil
}
sub.logger.Debugw("executing unsubscribe synchronous")
if sub.isCanceled() {
prometheus.RecordTrackSubscribeCancels(1)
}
subTrack := sub.getSubscribedTrack()
if subTrack == nil {
// already unsubscribed
return nil
}
track := subTrack.MediaTrack()
track.RemoveSubscriber(sub.subscriberID, false)
return nil
}
func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) {
m.lock.Lock()
sub := m.subscriptions[trackID]
m.lock.Unlock()
if sub != nil {
if sub.isCanceled() {
prometheus.RecordTrackSubscribeCancels(1)
}
sub.handleSourceTrackRemoved()
}
}
// DownTrack closing is how the publisher signifies that the subscription is no longer fulfilled
// this could be due to a few reasons:
// - subscriber-initiated unsubscribe
// - UpTrack was closed
// - publisher revoked permissions for the participant
func (m *SubscriptionManager) handleSubscribedTrackClose(s *mediaTrackSubscription, isExpectedToResume bool) {
s.logger.Debugw(
"subscribed track closed",
"isExpectedToResume", isExpectedToResume,
)
wasBound := s.isBound()
subTrack := s.getSubscribedTrack()
if subTrack == nil {
return
}
s.setSubscribedTrack(nil)
var relieveFromLimits bool
switch subTrack.MediaTrack().Kind() {
case livekit.TrackType_VIDEO:
videoCount := m.subscribedVideoCount.Dec()
relieveFromLimits = m.params.SubscriptionLimitVideo > 0 && videoCount == m.params.SubscriptionLimitVideo-1
case livekit.TrackType_AUDIO:
audioCount := m.subscribedAudioCount.Dec()
relieveFromLimits = m.params.SubscriptionLimitAudio > 0 && audioCount == m.params.SubscriptionLimitAudio-1
}
m.unmarkSubscribedTo(s.getPublisherID(), s.trackID)
go m.params.OnTrackUnsubscribed(subTrack)
// trigger to decrement unsubscribed counter as long as track has been bound
// Only log an analytics event when
// * the participant isn't closing
// * it's not a migration
if wasBound {
m.params.TelemetryListener.OnTrackUnsubscribed(
s.subscriberID,
&livekit.TrackInfo{Sid: string(s.trackID), Type: subTrack.MediaTrack().Kind()},
!isExpectedToResume,
)
dt := subTrack.DownTrack()
if dt != nil {
stats := dt.GetTrackStats()
if stats != nil {
m.params.TelemetryListener.OnTrackSubscribeRTPStats(
s.subscriberID,
s.trackID,
dt.Mime(),
stats,
)
}
}
}
if !isExpectedToResume {
sender := subTrack.RTPSender()
if sender != nil {
s.logger.Debugw("removing PeerConnection track",
"kind", subTrack.MediaTrack().Kind(),
)
if err := m.params.Participant.RemoveTrackLocal(sender); err != nil {
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
m.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"publisher", subTrack.PublisherIdentity(),
"publisherID", subTrack.PublisherID(),
)
}
}
}
m.params.Participant.Negotiate(false)
} else {
t := time.Now()
s.subscribeAt.Store(&t)
}
if !m.params.UseOneShotSignallingMode {
if relieveFromLimits {
m.queueReconcile(trackIDForReconcileSubscriptions)
} else {
m.queueReconcile(s.trackID)
}
}
}
func (m *SubscriptionManager) subscribeDataTrack(sub *dataTrackSubscription) error {
sub.logger.Debugw("executing subscribe")
if !m.params.Participant.CanSubscribe() {
return ErrNoSubscribePermission
}
trackID := sub.trackID
res := m.params.DataTrackResolver(m.params.Participant, trackID)
sub.logger.Debugw("resolved data track", "result", res)
if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) {
// set callback only when we haven't done it before
// we set the observer before checking for existence of track, so that we may get notified
// when the track becomes available
res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() {
m.queueReconcileDataTrack(trackID)
})
}
if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) {
res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() {
// re-resolve the track in case the same track had been re-published
res := m.params.DataTrackResolver(m.params.Participant, trackID)
if res.DataTrack != nil {
// do not unsubscribe, track is still available
return
}
m.handleSourceDataTrackRemoved(trackID)
})
}
dataTrack := res.DataTrack
if dataTrack == nil {
return ErrTrackNotFound
}
sub.setPublisher(res.PublisherIdentity, res.PublisherID)
dataDownTrack, err := dataTrack.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
return err
}
if err == errAlreadySubscribed {
sub.logger.Debugw("already subscribed to data track")
}
if err == nil && dataDownTrack != nil { // subTrack could be nil if already subscribed
sub.setDataDownTrack(dataDownTrack)
sub.logger.Debugw("subscribed to data track")
}
m.markSubscribedTo(sub.getPublisherID(), trackID)
return nil
}
func (m *SubscriptionManager) unsubscribeDataTrack(s *dataTrackSubscription) error {
s.logger.Debugw("executing unsubscribe")
dataDownTrack := s.getDataDownTrack()
if dataDownTrack == nil {
// already unsubscribed
return nil
}
dataTrack := dataDownTrack.PublishDataTrack()
dataTrack.RemoveSubscriber(s.subscriberID)
m.unmarkSubscribedTo(s.getPublisherID(), s.trackID)
return nil
}
func (m *SubscriptionManager) notifyDataTrackSubscriberHandles() {
m.lock.Lock()
handles := make(map[uint32]*livekit.DataTrackSubscriberHandles_PublishedDataTrack, len(m.dataTrackSubscriptions))
for _, sub := range m.dataTrackSubscriptions {
if !sub.isDesired() {
continue
}
dataDownTrack := sub.getDataDownTrack()
if dataDownTrack == nil {
continue
}
handles[uint32(dataDownTrack.Handle())] = &livekit.DataTrackSubscriberHandles_PublishedDataTrack{
PublisherIdentity: string(sub.publisherIdentity),
PublisherSid: string(sub.publisherID),
TrackSid: string(sub.trackID),
}
}
m.lock.Unlock()
m.params.Participant.SendDataTrackSubscriberHandles(handles)
}
func (m *SubscriptionManager) handleSourceDataTrackRemoved(trackID livekit.TrackID) {
m.lock.Lock()
sub := m.dataTrackSubscriptions[trackID]
m.lock.Unlock()
if sub != nil {
sub.handleSourceTrackRemoved()
}
}
func (m *SubscriptionManager) markSubscribedTo(publisherID livekit.ParticipantID, trackID livekit.TrackID) {
// add mark the participant as someone we've subscribed to
firstSubscribe := false
m.lock.Lock()
pTracks := m.subscribedTo[publisherID]
changedCB := m.onSubscribeStatusChanged
if pTracks == nil {
pTracks = make(map[livekit.TrackID]struct{})
m.subscribedTo[publisherID] = pTracks
firstSubscribe = true
}
pTracks[trackID] = struct{}{}
m.lock.Unlock()
if changedCB != nil && firstSubscribe {
changedCB(publisherID, true)
}
}
func (m *SubscriptionManager) unmarkSubscribedTo(publisherID livekit.ParticipantID, trackID livekit.TrackID) {
// remove from subscribedTo
lastSubscription := false
m.lock.Lock()
changedCB := m.onSubscribeStatusChanged
pTracks := m.subscribedTo[publisherID]
if pTracks != nil {
delete(pTracks, trackID)
if len(pTracks) == 0 {
delete(m.subscribedTo, publisherID)
lastSubscription = true
}
}
m.lock.Unlock()
if changedCB != nil && lastSubscription {
go changedCB(publisherID, false)
}
}
func (m *SubscriptionManager) getNumCancellations() int {
m.lock.RLock()
defer m.lock.RUnlock()
numCancellations := 0
for _, sub := range m.subscriptions {
if sub.isCanceled() {
numCancellations++
}
}
return numCancellations
}
// --------------------------------------------------------------------------------------
type trackSubscription struct {
subscriberID livekit.ParticipantID
trackID livekit.TrackID
logger logger.Logger
lock sync.RWMutex
desired bool
publisherID livekit.ParticipantID
publisherIdentity livekit.ParticipantIdentity
changedNotifier types.ChangeNotifier
removedNotifier types.ChangeNotifier
numAttempts atomic.Int32
// the later of when subscription was requested OR when the first failure was encountered OR when permission is granted
// this timestamp determines when failures are reported
subStartedAt atomic.Pointer[time.Time]
// the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume
subscribeAt atomic.Pointer[time.Time]
}
func (s *trackSubscription) setPublisher(publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
s.lock.Lock()
defer s.lock.Unlock()
s.publisherID = publisherID
s.publisherIdentity = publisherIdentity
}
func (s *trackSubscription) getPublisherID() livekit.ParticipantID {
s.lock.RLock()
defer s.lock.RUnlock()
return s.publisherID
}
func (s *trackSubscription) setDesired(desired bool) bool {
s.lock.Lock()
defer s.lock.Unlock()
if desired {
// as long as user explicitly set it to desired
// we'll reset the timer so it has sufficient time to reconcile
t := time.Now()
s.subStartedAt.Store(&t)
s.subscribeAt.Store(&t)
}
if s.desired == desired {
return false
}
s.desired = desired
// when no longer desired, we no longer care about change notifications
if desired {
// reset attempts
s.numAttempts.Store(0)
} else {
s.setChangedNotifierLocked(nil)
s.setRemovedNotifierLocked(nil)
}
return true
}
func (s *trackSubscription) isDesired() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.desired
}
func (s *trackSubscription) recordAttempt(success bool) {
if !success {
if s.numAttempts.Load() == 0 {
// on first failure, we'd want to start the timer
t := time.Now()
s.subStartedAt.Store(&t)
}
s.numAttempts.Add(1)
} else {
s.numAttempts.Store(0)
}
}
func (s *trackSubscription) getNumAttempts() int32 {
return s.numAttempts.Load()
}
func (s *trackSubscription) durationSinceStart() time.Duration {
t := s.subStartedAt.Load()
if t == nil {
return 0
}
return time.Since(*t)
}
func (s *trackSubscription) setChangedNotifier(notifier types.ChangeNotifier) bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.setChangedNotifierLocked(notifier)
}
func (s *trackSubscription) setChangedNotifierLocked(notifier types.ChangeNotifier) bool {
if s.changedNotifier == notifier {
return false
}
existing := s.changedNotifier
s.changedNotifier = notifier
if existing != nil {
go existing.RemoveObserver(string(s.subscriberID))
}
return true
}
func (s *trackSubscription) setRemovedNotifier(notifier types.ChangeNotifier) bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.setRemovedNotifierLocked(notifier)
}
func (s *trackSubscription) setRemovedNotifierLocked(notifier types.ChangeNotifier) bool {
if s.removedNotifier == notifier {
return false
}
existing := s.removedNotifier
s.removedNotifier = notifier
if existing != nil {
go existing.RemoveObserver(string(s.subscriberID))
}
return true
}
func (s *trackSubscription) handleSourceTrackRemoved() {
s.lock.Lock()
defer s.lock.Unlock()
// source track removed, we would unsubscribe
s.logger.Debugw("unsubscribing from track since source track was removed")
s.desired = false
s.setChangedNotifierLocked(nil)
s.setRemovedNotifierLocked(nil)
}
// --------------------------------------------------------------------------------------
type mediaTrackSubscription struct {
trackSubscription
settings *livekit.UpdateTrackSettings
hasPermissionInitialized bool
hasPermission bool
subscribedTrack types.SubscribedTrack
eventSent atomic.Bool
bound bool
kind atomic.Pointer[livekit.TrackType]
succRecordCounter atomic.Int32
}
func newMediaTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *mediaTrackSubscription {
s := &mediaTrackSubscription{
trackSubscription: trackSubscription{
subscriberID: subscriberID,
trackID: trackID,
logger: l,
},
}
t := time.Now()
s.subscribeAt.Store(&t)
return s
}
// set permission and return true if it has changed
func (s *mediaTrackSubscription) setHasPermission(perm bool) bool {
s.lock.Lock()
defer s.lock.Unlock()
if s.hasPermissionInitialized && s.hasPermission == perm {
return false
}
s.hasPermissionInitialized = true
s.hasPermission = perm
if s.hasPermission {
// when permission is granted, reset the timer so it has sufficient time to reconcile
t := time.Now()
s.subStartedAt.Store(&t)
s.subscribeAt.Store(&t)
}
return true
}
func (s *mediaTrackSubscription) getHasPermission() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.hasPermission
}
func (s *mediaTrackSubscription) setSubscribedTrack(track types.SubscribedTrack) {
s.lock.Lock()
oldTrack := s.subscribedTrack
s.subscribedTrack = track
s.bound = false
settings := s.settings
s.lock.Unlock()
if settings != nil && track != nil {
s.logger.Debugw("restoring subscriber settings", "settings", logger.Proto(settings))
track.UpdateSubscriberSettings(settings, true)
}
if oldTrack != nil {
oldTrack.OnClose(nil)
}
}
func (s *mediaTrackSubscription) getSubscribedTrack() types.SubscribedTrack {
s.lock.RLock()
defer s.lock.RUnlock()
return s.subscribedTrack
}
func (s *mediaTrackSubscription) trySetKind(kind livekit.TrackType) {
s.kind.CompareAndSwap(nil, &kind)
}
func (s *mediaTrackSubscription) getKind() (livekit.TrackType, bool) {
kind := s.kind.Load()
if kind == nil {
return livekit.TrackType_AUDIO, false
}
return *kind, true
}
func (s *mediaTrackSubscription) setSettings(settings *livekit.UpdateTrackSettings) {
s.lock.Lock()
s.settings = settings
subTrack := s.subscribedTrack
s.lock.Unlock()
if subTrack != nil {
subTrack.UpdateSubscriberSettings(settings, false)
}
}
// mark the subscription as bound - when we've received the client's answer
func (s *mediaTrackSubscription) setBound() {
s.lock.Lock()
defer s.lock.Unlock()
s.bound = true
}
func (s *mediaTrackSubscription) isBound() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.bound
}
func (s *mediaTrackSubscription) maybeRecordError(tl types.ParticipantTelemetryListener, err error, isUserError bool) {
if s.eventSent.Swap(true) {
return
}
tl.OnTrackSubscribeFailed(s.subscriberID, s.trackID, err, isUserError)
}
func (s *mediaTrackSubscription) maybeRecordSuccess(tl types.ParticipantTelemetryListener) {
subTrack := s.getSubscribedTrack()
if subTrack == nil {
return
}
mediaTrack := subTrack.MediaTrack()
if mediaTrack == nil {
return
}
d := time.Since(*s.subscribeAt.Load())
s.logger.Debugw("track subscribed", "cost", d.Milliseconds())
subscriber := subTrack.Subscriber()
prometheus.RecordSubscribeTime(
subscriber.GetCountry(),
mediaTrack.Source(),
mediaTrack.Kind(),
d,
subscriber.GetClientInfo().GetSdk(),
subscriber.Kind(),
int(s.succRecordCounter.Inc()),
)
eventSent := s.eventSent.Swap(true)
pi := &livekit.ParticipantInfo{
Identity: string(subTrack.PublisherIdentity()),
Sid: string(subTrack.PublisherID()),
}
tl.OnTrackSubscribed(s.subscriberID, mediaTrack.ToProto(), pi, !eventSent)
}
func (s *mediaTrackSubscription) isCanceled() bool {
return !s.eventSent.Load() && s.isDesired()
}
func (s *mediaTrackSubscription) needsSubscribe() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.desired && s.subscribedTrack == nil
}
func (s *mediaTrackSubscription) needsUnsubscribe() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return !s.desired && s.subscribedTrack != nil
}
func (s *mediaTrackSubscription) needsBind() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.desired && s.subscribedTrack != nil && !s.bound
}
func (s *mediaTrackSubscription) needsCleanup() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return !s.desired && s.subscribedTrack == nil
}
// -----------------------------------------------------------------
type dataTrackSubscription struct {
trackSubscription
subscriptionOptions *livekit.DataTrackSubscriptionOptions
dataDownTrack types.DataDownTrack
}
func newDataTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *dataTrackSubscription {
s := &dataTrackSubscription{
trackSubscription: trackSubscription{
subscriberID: subscriberID,
trackID: trackID,
logger: l,
},
}
t := time.Now()
s.subscribeAt.Store(&t)
return s
}
func (s *dataTrackSubscription) needsSubscribe() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.desired && s.dataDownTrack == nil
}
func (s *dataTrackSubscription) needsUnsubscribe() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return !s.desired && s.dataDownTrack != nil
}
func (s *dataTrackSubscription) needsCleanup() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return !s.desired && s.dataDownTrack == nil
}
func (s *dataTrackSubscription) setDataDownTrack(dataDownTrack types.DataDownTrack) {
s.lock.Lock()
s.dataDownTrack = dataDownTrack
subscriptionOptions := s.subscriptionOptions
s.lock.Unlock()
if dataDownTrack != nil {
s.logger.Debugw("restoring data track subscription options", "subscriptionOptions", logger.Proto(subscriptionOptions))
dataDownTrack.UpdateSubscriptionOptions(subscriptionOptions)
}
// DT-TODO - DataTrack close callback on previous if not nil?, see setSubscribedTrack for example
}
func (s *dataTrackSubscription) getDataDownTrack() types.DataDownTrack {
s.lock.RLock()
defer s.lock.RUnlock()
return s.dataDownTrack
}
func (s *dataTrackSubscription) setSubscriptionOptions(subscriptionOptions *livekit.DataTrackSubscriptionOptions) {
s.lock.Lock()
s.subscriptionOptions = subscriptionOptions
dataDownTrack := s.dataDownTrack
s.lock.Unlock()
if dataDownTrack != nil {
dataDownTrack.UpdateSubscriptionOptions(subscriptionOptions)
}
}