mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 13:39:52 +00:00
Some checks failed
Test / test (push) Failing after 17s
Release to Docker / docker (push) Failing after 3m42s
* Key telemetry stats work using combination of roomID, participantID With forwarded participant, the same participantID can existing in two rooms. NOTE: This does not yet allow a participant session to report its events/track stats into multiple rooms. That would require regitering multiple listeners (from rooms a participant is forwarded to). * missed file * data channel stats * PR comments + pass in room name so that telemetry events have proper room name also
1574 lines
42 KiB
Go
1574 lines
42 KiB
Go
/*
|
|
* Copyright 2023 LiveKit, Inc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/webrtc/v4/pkg/rtcerr"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/sfu"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils"
|
|
)
|
|
|
|
// using var instead of const to override in tests
|
|
var (
|
|
reconcileInterval = 3 * time.Second
|
|
// amount of time to give up if a track or publisher isn't found
|
|
// ensuring this is longer than iceFailedTimeout so we are certain the participant won't return
|
|
notFoundTimeout = time.Minute
|
|
// amount of time to try otherwise before flagging subscription as failed
|
|
subscriptionTimeout = iceFailedTimeoutTotal
|
|
maxUnsubscribeWait = time.Second
|
|
)
|
|
|
|
const (
|
|
trackIDForReconcileSubscriptions = livekit.TrackID("subscriptions_reconcile")
|
|
)
|
|
|
|
type SubscriptionManagerParams struct {
|
|
Logger logger.Logger
|
|
Participant types.LocalParticipant
|
|
TrackResolver types.MediaTrackResolver
|
|
OnTrackSubscribed func(subTrack types.SubscribedTrack)
|
|
OnTrackUnsubscribed func(subTrack types.SubscribedTrack)
|
|
OnSubscriptionError func(trackID livekit.TrackID, fatal bool, err error)
|
|
TelemetryListener types.ParticipantTelemetryListener
|
|
|
|
SubscriptionLimitVideo, SubscriptionLimitAudio int32
|
|
|
|
DataTrackResolver types.DataTrackResolver
|
|
|
|
UseOneShotSignallingMode bool
|
|
}
|
|
|
|
// SubscriptionManager manages a participant's subscriptions
|
|
type SubscriptionManager struct {
|
|
params SubscriptionManagerParams
|
|
lock sync.RWMutex
|
|
subscriptions map[livekit.TrackID]*mediaTrackSubscription
|
|
pendingUnsubscribes atomic.Int32
|
|
|
|
subscribedVideoCount, subscribedAudioCount atomic.Int32
|
|
|
|
subscribedTo map[livekit.ParticipantID]map[livekit.TrackID]struct{}
|
|
reconcileCh chan livekit.TrackID
|
|
reconcileDataTrackCh chan livekit.TrackID
|
|
closeCh chan struct{}
|
|
doneCh chan struct{}
|
|
|
|
onSubscribeStatusChanged func(publisherID livekit.ParticipantID, subscribed bool)
|
|
|
|
dataTrackSubscriptions map[livekit.TrackID]*dataTrackSubscription
|
|
}
|
|
|
|
func NewSubscriptionManager(params SubscriptionManagerParams) *SubscriptionManager {
|
|
m := &SubscriptionManager{
|
|
params: params,
|
|
subscriptions: make(map[livekit.TrackID]*mediaTrackSubscription),
|
|
subscribedTo: make(map[livekit.ParticipantID]map[livekit.TrackID]struct{}),
|
|
reconcileCh: make(chan livekit.TrackID, 50),
|
|
reconcileDataTrackCh: make(chan livekit.TrackID, 5),
|
|
closeCh: make(chan struct{}),
|
|
doneCh: make(chan struct{}),
|
|
dataTrackSubscriptions: make(map[livekit.TrackID]*dataTrackSubscription),
|
|
}
|
|
|
|
go m.reconcileWorker()
|
|
return m
|
|
}
|
|
|
|
func (m *SubscriptionManager) Close(isExpectedToResume bool) {
|
|
m.lock.Lock()
|
|
if m.isClosed() {
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
close(m.closeCh)
|
|
m.lock.Unlock()
|
|
|
|
<-m.doneCh
|
|
|
|
prometheus.RecordTrackSubscribeCancels(int32(m.getNumCancellations()))
|
|
|
|
subTracks := m.GetSubscribedTracks()
|
|
downTracksToClose := make([]*sfu.DownTrack, 0, len(subTracks))
|
|
for _, st := range subTracks {
|
|
m.setDesired(st.ID(), false)
|
|
dt := st.DownTrack()
|
|
// nil check exists primarily for tests
|
|
if dt != nil {
|
|
downTracksToClose = append(downTracksToClose, st.DownTrack())
|
|
}
|
|
}
|
|
|
|
if isExpectedToResume {
|
|
for _, dt := range downTracksToClose {
|
|
dt.CloseWithFlush(false, true)
|
|
}
|
|
} else {
|
|
// flush blocks, so execute in parallel
|
|
for _, dt := range downTracksToClose {
|
|
go dt.CloseWithFlush(true, true)
|
|
}
|
|
}
|
|
|
|
m.lock.Lock()
|
|
for _, sub := range m.dataTrackSubscriptions {
|
|
dataDownTrack := sub.getDataDownTrack()
|
|
if dataDownTrack == nil {
|
|
// already unsubscribed
|
|
continue
|
|
}
|
|
|
|
dataTrack := dataDownTrack.PublishDataTrack()
|
|
if dataTrack == nil {
|
|
continue
|
|
}
|
|
|
|
dataTrack.RemoveSubscriber(sub.subscriberID)
|
|
}
|
|
m.lock.Unlock()
|
|
m.notifyDataTrackSubscriberHandles()
|
|
}
|
|
|
|
func (m *SubscriptionManager) isClosed() bool {
|
|
select {
|
|
case <-m.closeCh:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID, isSync bool) {
|
|
if m.params.UseOneShotSignallingMode || isSync {
|
|
m.subscribeSynchronous(trackID)
|
|
return
|
|
}
|
|
|
|
sub, desireChanged := m.setDesired(trackID, true)
|
|
if sub == nil {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
|
|
m.lock.Lock()
|
|
m.subscriptions[trackID] = sub
|
|
m.lock.Unlock()
|
|
|
|
sub, desireChanged = m.setDesired(trackID, true)
|
|
}
|
|
if desireChanged {
|
|
sub.logger.Debugw("subscribing to track")
|
|
}
|
|
|
|
// always reconcile, since SubscribeToTrack could be called when the track is ready
|
|
m.queueReconcile(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) {
|
|
if m.params.UseOneShotSignallingMode {
|
|
m.unsubscribeSynchronous(trackID)
|
|
return
|
|
}
|
|
|
|
sub, desireChanged := m.setDesired(trackID, false)
|
|
if sub == nil || !desireChanged {
|
|
return
|
|
}
|
|
|
|
if sub.isCanceled() {
|
|
prometheus.RecordTrackSubscribeCancels(1)
|
|
}
|
|
|
|
sub.logger.Debugw("unsubscribing from track")
|
|
m.queueReconcile(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) SubscribeToDataTrack(trackID livekit.TrackID) {
|
|
sub, desireChanged := m.setDataTrackDesired(trackID, true)
|
|
if sub == nil {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newDataTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
|
|
m.lock.Lock()
|
|
m.dataTrackSubscriptions[trackID] = sub
|
|
m.lock.Unlock()
|
|
|
|
sub, desireChanged = m.setDataTrackDesired(trackID, true)
|
|
}
|
|
if desireChanged {
|
|
sub.logger.Debugw("subscribing to data track")
|
|
}
|
|
|
|
m.queueReconcileDataTrack(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) UnsubscribeFromDataTrack(trackID livekit.TrackID) {
|
|
sub, desireChanged := m.setDataTrackDesired(trackID, false)
|
|
if sub == nil || !desireChanged {
|
|
return
|
|
}
|
|
|
|
sub.logger.Debugw("unsubscribing from data track")
|
|
m.queueReconcileDataTrack(trackID)
|
|
}
|
|
|
|
func (m *SubscriptionManager) ClearAllSubscriptions() {
|
|
m.params.Logger.Debugw("clearing all subscriptions")
|
|
|
|
if m.params.UseOneShotSignallingMode {
|
|
for _, track := range m.GetSubscribedTracks() {
|
|
m.unsubscribeSynchronous(track.ID())
|
|
}
|
|
|
|
// no synchronous data tracks
|
|
}
|
|
|
|
numCancellations := 0
|
|
m.lock.RLock()
|
|
for _, sub := range m.subscriptions {
|
|
if sub.isCanceled() {
|
|
numCancellations++
|
|
}
|
|
sub.setDesired(false)
|
|
}
|
|
|
|
for _, sub := range m.dataTrackSubscriptions {
|
|
sub.setDesired(false)
|
|
}
|
|
m.lock.RUnlock()
|
|
prometheus.RecordTrackSubscribeCancels(int32(numCancellations))
|
|
|
|
m.ReconcileAll()
|
|
}
|
|
|
|
func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
tracks := make([]types.SubscribedTrack, 0, len(m.subscriptions))
|
|
for _, t := range m.subscriptions {
|
|
st := t.getSubscribedTrack()
|
|
if st != nil {
|
|
tracks = append(tracks, st)
|
|
}
|
|
}
|
|
return tracks
|
|
}
|
|
|
|
func (m *SubscriptionManager) IsTrackNameSubscribed(publisherIdentity livekit.ParticipantIdentity, trackName string) bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
for _, s := range m.subscriptions {
|
|
st := s.getSubscribedTrack()
|
|
if st != nil && st.PublisherIdentity() == publisherIdentity && st.MediaTrack() != nil && st.MediaTrack().Name() == trackName {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions))
|
|
for trackID, t := range m.subscriptions {
|
|
st := t.getSubscribedTrack()
|
|
if st != nil {
|
|
dt := st.DownTrack()
|
|
if dt != nil {
|
|
state := dt.StopWriteAndGetState()
|
|
if state.ForwarderState != nil {
|
|
states[trackID] = state.ForwarderState
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return states
|
|
}
|
|
|
|
func (m *SubscriptionManager) HasSubscriptions() bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
for _, s := range m.subscriptions {
|
|
if s.isDesired() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *SubscriptionManager) GetSubscribedParticipants() []livekit.ParticipantID {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
return maps.Keys(m.subscribedTo)
|
|
}
|
|
|
|
func (m *SubscriptionManager) IsSubscribedTo(participantID livekit.ParticipantID) bool {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
_, ok := m.subscribedTo[participantID]
|
|
return ok
|
|
}
|
|
|
|
func (m *SubscriptionManager) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) {
|
|
m.lock.Lock()
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
m.subscriptions[trackID] = sub
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
sub.setSettings(settings)
|
|
}
|
|
|
|
func (m *SubscriptionManager) UpdateDataTrackSubscriptionOptions(trackID livekit.TrackID, subscriptionOptions *livekit.DataTrackSubscriptionOptions) {
|
|
m.lock.Lock()
|
|
sub, ok := m.dataTrackSubscriptions[trackID]
|
|
if !ok {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newDataTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
m.dataTrackSubscriptions[trackID] = sub
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
sub.setSubscriptionOptions(subscriptionOptions)
|
|
}
|
|
|
|
// OnSubscribeStatusChanged callback will be notified when a participant subscribes or unsubscribes to another participant
|
|
// it will only fire once per publisher. If current participant is subscribed to multiple tracks from another, this
|
|
// callback will only fire once.
|
|
func (m *SubscriptionManager) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) {
|
|
m.lock.Lock()
|
|
m.onSubscribeStatusChanged = fn
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
func (m *SubscriptionManager) WaitUntilSubscribed(timeout time.Duration) error {
|
|
expiresAt := time.Now().Add(timeout)
|
|
for expiresAt.After(time.Now()) {
|
|
allSubscribed := true
|
|
m.lock.RLock()
|
|
for _, sub := range m.subscriptions {
|
|
if sub.needsSubscribe() {
|
|
allSubscribed = false
|
|
break
|
|
}
|
|
}
|
|
m.lock.RUnlock()
|
|
if allSubscribed {
|
|
return nil
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
return context.DeadlineExceeded
|
|
}
|
|
|
|
func (m *SubscriptionManager) ReconcileAll() {
|
|
m.queueReconcile(trackIDForReconcileSubscriptions)
|
|
m.queueReconcileDataTrack(trackIDForReconcileSubscriptions)
|
|
}
|
|
|
|
func (m *SubscriptionManager) setDesired(trackID livekit.TrackID, desired bool) (*mediaTrackSubscription, bool) {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
return sub, sub.setDesired(desired)
|
|
}
|
|
|
|
func (m *SubscriptionManager) setDataTrackDesired(trackID livekit.TrackID, desired bool) (*dataTrackSubscription, bool) {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
sub, ok := m.dataTrackSubscriptions[trackID]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
return sub, sub.setDesired(desired)
|
|
}
|
|
|
|
func (m *SubscriptionManager) canReconcile() bool {
|
|
p := m.params.Participant
|
|
if m.isClosed() || p.IsClosed() || p.IsDisconnected() {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileSubscriptions() {
|
|
var needsToReconcile []*mediaTrackSubscription
|
|
m.lock.RLock()
|
|
for _, sub := range m.subscriptions {
|
|
if sub.needsSubscribe() || sub.needsUnsubscribe() || sub.needsBind() || sub.needsCleanup() {
|
|
needsToReconcile = append(needsToReconcile, sub)
|
|
}
|
|
}
|
|
m.lock.RUnlock()
|
|
|
|
for _, s := range needsToReconcile {
|
|
m.reconcileSubscription(s)
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileSubscription(s *mediaTrackSubscription) {
|
|
if !m.canReconcile() {
|
|
return
|
|
}
|
|
if s.needsSubscribe() {
|
|
if m.pendingUnsubscribes.Load() != 0 && s.durationSinceStart() < maxUnsubscribeWait {
|
|
// enqueue this in a bit, after pending unsubscribes are complete
|
|
go func() {
|
|
time.Sleep(time.Duration(sfu.RTPBlankFramesCloseSeconds * float32(time.Second)))
|
|
m.queueReconcile(s.trackID)
|
|
}()
|
|
return
|
|
}
|
|
|
|
numAttempts := s.getNumAttempts()
|
|
if numAttempts == 0 {
|
|
m.params.TelemetryListener.OnTrackSubscribeRequested(
|
|
s.subscriberID,
|
|
&livekit.TrackInfo{
|
|
Sid: string(s.trackID),
|
|
},
|
|
)
|
|
}
|
|
if err := m.subscribe(s); err != nil {
|
|
s.recordAttempt(false)
|
|
|
|
switch err {
|
|
case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrSubscriptionLimitExceeded:
|
|
// these are errors that are outside of our control, so we'll keep trying
|
|
// - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment
|
|
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
|
|
// - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance)
|
|
// - ErrNotOpen: Track is closing or already closed
|
|
// - ErrSubscriptionLimitExceeded: the participant have reached the limit of subscriptions, wait for the other subscription to be unsubscribed
|
|
// We'll still log an event to reflect this in telemetry since it's been too long
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.maybeRecordError(m.params.TelemetryListener, err, true)
|
|
}
|
|
case ErrTrackNotFound:
|
|
// source track was never published or closed
|
|
// if after timeout we'd unsubscribe from it.
|
|
// this is the *only* case we'd change desired state
|
|
if s.durationSinceStart() > notFoundTimeout {
|
|
s.maybeRecordError(m.params.TelemetryListener, err, true)
|
|
s.logger.Infow("unsubscribing from track after notFoundTimeout", "error", err)
|
|
s.setDesired(false)
|
|
m.queueReconcile(s.trackID)
|
|
m.params.OnSubscriptionError(s.trackID, false, err)
|
|
}
|
|
default:
|
|
// all other errors
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.logger.Warnw(
|
|
"failed to subscribe, triggering error handler", err,
|
|
"attempt", s.getNumAttempts(),
|
|
)
|
|
s.maybeRecordError(m.params.TelemetryListener, err, false)
|
|
m.params.OnSubscriptionError(s.trackID, true, err)
|
|
} else {
|
|
s.logger.Debugw(
|
|
"failed to subscribe, retrying",
|
|
"error", err,
|
|
"attempt", s.getNumAttempts(),
|
|
)
|
|
}
|
|
}
|
|
} else {
|
|
s.recordAttempt(true)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if s.needsUnsubscribe() {
|
|
if err := m.unsubscribe(s); err != nil {
|
|
s.logger.Warnw("failed to unsubscribe", err)
|
|
}
|
|
// do not remove subscription from map. Wait for subscribed track to close
|
|
// and the callback (handleSubscribedTrackClose) to set the subscribedTrack to nil
|
|
// and the clean up path to handle removing subscription from the subscription map.
|
|
// It is possible that the track is re-published before subscribed track is closed.
|
|
// That could create a new subscription and a duplicate entry in SDP.
|
|
// Waiting for subscribed track close would ensure that the track is removed from
|
|
// the peer connection before re-published track is re-subscribed and added back to the SDP.
|
|
return
|
|
}
|
|
|
|
if s.needsBind() {
|
|
// check bound status, notify error callback if it's not bound
|
|
// if a publisher leaves or closes the source track, SubscribedTrack will be closed as well and it will go
|
|
// back to needsSubscribe state
|
|
if activeAt := m.params.Participant.ActiveAt(); !activeAt.IsZero() {
|
|
wait := min(time.Since(activeAt), s.durationSinceStart())
|
|
if wait > subscriptionTimeout {
|
|
s.logger.Warnw("track not bound after timeout", nil)
|
|
s.maybeRecordError(m.params.TelemetryListener, ErrTrackNotBound, false)
|
|
m.params.OnSubscriptionError(s.trackID, true, ErrTrackNotBound)
|
|
}
|
|
}
|
|
}
|
|
|
|
m.lock.Lock()
|
|
if s.needsCleanup() {
|
|
s.logger.Debugw("cleanup removing subscription")
|
|
delete(m.subscriptions, s.trackID)
|
|
}
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileDataTrackSubscriptions() {
|
|
var needsToReconcile []*dataTrackSubscription
|
|
m.lock.RLock()
|
|
for _, sub := range m.dataTrackSubscriptions {
|
|
if sub.needsSubscribe() || sub.needsUnsubscribe() {
|
|
needsToReconcile = append(needsToReconcile, sub)
|
|
}
|
|
}
|
|
m.lock.RUnlock()
|
|
|
|
for _, s := range needsToReconcile {
|
|
m.reconcileDataTrackSubscription(s)
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileDataTrackSubscription(s *dataTrackSubscription) {
|
|
if !m.canReconcile() {
|
|
return
|
|
}
|
|
if s.needsSubscribe() {
|
|
if err := m.subscribeDataTrack(s); err != nil {
|
|
s.recordAttempt(false)
|
|
|
|
switch err {
|
|
case ErrNoSubscribePermission:
|
|
// these are errors that are outside of our control, so we'll keep trying
|
|
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
|
|
case ErrTrackNotFound:
|
|
// source track was never published or closed
|
|
// if after timeout we'd unsubscribe from it.
|
|
// this is the *only* case we'd change desired state
|
|
if s.durationSinceStart() > notFoundTimeout {
|
|
s.logger.Infow("unsubscribing from data track after notFoundTimeout", "error", err)
|
|
s.setDesired(false)
|
|
m.queueReconcile(s.trackID)
|
|
}
|
|
default:
|
|
// all other errors
|
|
if s.durationSinceStart() > subscriptionTimeout {
|
|
s.logger.Warnw(
|
|
"failed to subscribe, triggering error handler", err,
|
|
"attempt", s.getNumAttempts(),
|
|
)
|
|
} else {
|
|
s.logger.Debugw(
|
|
"failed to subscribe, retrying",
|
|
"error", err,
|
|
"attempt", s.getNumAttempts(),
|
|
)
|
|
}
|
|
}
|
|
} else {
|
|
s.recordAttempt(true)
|
|
m.notifyDataTrackSubscriberHandles()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if s.needsUnsubscribe() {
|
|
if err := m.unsubscribeDataTrack(s); err != nil {
|
|
s.logger.Warnw("failed to unsubscribe", err)
|
|
}
|
|
|
|
m.lock.Lock()
|
|
delete(m.dataTrackSubscriptions, s.trackID)
|
|
m.lock.Unlock()
|
|
m.notifyDataTrackSubscriberHandles()
|
|
return
|
|
}
|
|
|
|
m.lock.Lock()
|
|
if s.needsCleanup() {
|
|
s.logger.Debugw("cleanup removing data track subscription")
|
|
delete(m.dataTrackSubscriptions, s.trackID)
|
|
m.notifyDataTrackSubscriberHandles()
|
|
}
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
// trigger an immediate reconciliation, when trackID is empty, will reconcile all subscriptions
|
|
func (m *SubscriptionManager) queueReconcile(trackID livekit.TrackID) {
|
|
select {
|
|
case m.reconcileCh <- trackID:
|
|
default:
|
|
// queue is full, will reconcile based on timer
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) queueReconcileDataTrack(trackID livekit.TrackID) {
|
|
select {
|
|
case m.reconcileDataTrackCh <- trackID:
|
|
default:
|
|
// queue is full, will reconcile based on timer
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) reconcileWorker() {
|
|
defer close(m.doneCh)
|
|
|
|
reconcileTicker := time.NewTicker(reconcileInterval)
|
|
defer reconcileTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.closeCh:
|
|
return
|
|
|
|
case <-reconcileTicker.C:
|
|
m.reconcileSubscriptions()
|
|
m.reconcileDataTrackSubscriptions()
|
|
|
|
case trackID := <-m.reconcileCh:
|
|
m.lock.Lock()
|
|
s := m.subscriptions[trackID]
|
|
m.lock.Unlock()
|
|
if s != nil {
|
|
m.reconcileSubscription(s)
|
|
} else {
|
|
m.reconcileSubscriptions()
|
|
}
|
|
|
|
case trackID := <-m.reconcileDataTrackCh:
|
|
m.lock.Lock()
|
|
s := m.dataTrackSubscriptions[trackID]
|
|
m.lock.Unlock()
|
|
if s != nil {
|
|
m.reconcileDataTrackSubscription(s)
|
|
} else {
|
|
m.reconcileDataTrackSubscriptions()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) hasCapacityForSubscription(kind livekit.TrackType) bool {
|
|
switch kind {
|
|
case livekit.TrackType_VIDEO:
|
|
if m.params.SubscriptionLimitVideo > 0 && m.subscribedVideoCount.Load() >= m.params.SubscriptionLimitVideo {
|
|
return false
|
|
}
|
|
|
|
case livekit.TrackType_AUDIO:
|
|
if m.params.SubscriptionLimitAudio > 0 && m.subscribedAudioCount.Load() >= m.params.SubscriptionLimitAudio {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *SubscriptionManager) subscribe(sub *mediaTrackSubscription) error {
|
|
sub.logger.Debugw("executing subscribe")
|
|
|
|
if !m.params.Participant.CanSubscribe() {
|
|
return ErrNoSubscribePermission
|
|
}
|
|
|
|
if kind, ok := sub.getKind(); ok && !m.hasCapacityForSubscription(kind) {
|
|
return ErrSubscriptionLimitExceeded
|
|
}
|
|
|
|
trackID := sub.trackID
|
|
res := m.params.TrackResolver(m.params.Participant, trackID)
|
|
sub.logger.Debugw("resolved track", "result", res)
|
|
|
|
if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) {
|
|
// set callback only when we haven't done it before
|
|
// we set the observer before checking for existence of track, so that we may get notified
|
|
// when the track becomes available
|
|
res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() {
|
|
m.queueReconcile(trackID)
|
|
})
|
|
}
|
|
if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) {
|
|
res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() {
|
|
// re-resolve the track in case the same track had been re-published
|
|
res := m.params.TrackResolver(m.params.Participant, trackID)
|
|
if res.Track != nil {
|
|
// do not unsubscribe, track is still available
|
|
return
|
|
}
|
|
m.handleSourceTrackRemoved(trackID)
|
|
})
|
|
}
|
|
|
|
track := res.Track
|
|
if track == nil {
|
|
return ErrTrackNotFound
|
|
}
|
|
sub.trySetKind(track.Kind())
|
|
if !m.hasCapacityForSubscription(track.Kind()) {
|
|
return ErrSubscriptionLimitExceeded
|
|
}
|
|
|
|
sub.setPublisher(res.PublisherIdentity, res.PublisherID)
|
|
|
|
permChanged := sub.setHasPermission(res.HasPermission)
|
|
if permChanged {
|
|
m.params.Participant.SendSubscriptionPermissionUpdate(sub.getPublisherID(), trackID, res.HasPermission)
|
|
}
|
|
if !res.HasPermission {
|
|
return ErrNoTrackPermission
|
|
}
|
|
|
|
if err := m.addSubscriber(sub, track, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.markSubscribedTo(sub.getPublisherID(), trackID)
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) error {
|
|
m.params.Logger.Debugw("executing subscribe synchronous", "trackID", trackID)
|
|
|
|
if !m.params.Participant.CanSubscribe() {
|
|
return ErrNoSubscribePermission
|
|
}
|
|
|
|
res := m.params.TrackResolver(m.params.Participant, trackID)
|
|
m.params.Logger.Debugw("resolved track", "trackID", trackID, " result", res)
|
|
|
|
track := res.Track
|
|
if track == nil {
|
|
return ErrTrackNotFound
|
|
}
|
|
|
|
m.lock.Lock()
|
|
sub, ok := m.subscriptions[trackID]
|
|
if !ok {
|
|
sLogger := m.params.Logger.WithValues(
|
|
"trackID", trackID,
|
|
)
|
|
sub = newMediaTrackSubscription(m.params.Participant.ID(), trackID, sLogger)
|
|
m.subscriptions[trackID] = sub
|
|
}
|
|
m.lock.Unlock()
|
|
sub.setDesired(true)
|
|
|
|
return m.addSubscriber(sub, track, true)
|
|
}
|
|
|
|
func (m *SubscriptionManager) addSubscriber(sub *mediaTrackSubscription, track types.MediaTrack, isSync bool) error {
|
|
trackID := track.ID()
|
|
subTrack, err := track.AddSubscriber(m.params.Participant)
|
|
if err != nil && !errors.Is(err, errAlreadySubscribed) {
|
|
// ignore error(s): already subscribed
|
|
if !utils.ErrorIsOneOf(err, ErrNoReceiver) {
|
|
// as track resolution could take some time, not logging errors due to waiting for track resolution
|
|
m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID)
|
|
}
|
|
return err
|
|
}
|
|
if err == errAlreadySubscribed {
|
|
sub.logger.Debugw(
|
|
"already subscribed to track",
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
|
|
subTrack.OnClose(func(isExpectedToResume bool) {
|
|
m.handleSubscribedTrackClose(sub, isExpectedToResume)
|
|
|
|
if isSync {
|
|
m.lock.Lock()
|
|
delete(m.subscriptions, trackID)
|
|
m.lock.Unlock()
|
|
}
|
|
})
|
|
subTrack.AddOnBind(func(err error) {
|
|
if err != nil {
|
|
sub.logger.Infow("failed to bind track", "err", err)
|
|
sub.maybeRecordError(m.params.TelemetryListener, err, true)
|
|
m.UnsubscribeFromTrack(trackID)
|
|
m.params.OnSubscriptionError(trackID, false, err)
|
|
return
|
|
}
|
|
sub.setBound()
|
|
sub.maybeRecordSuccess(m.params.TelemetryListener)
|
|
})
|
|
sub.setSubscribedTrack(subTrack)
|
|
|
|
switch track.Kind() {
|
|
case livekit.TrackType_VIDEO:
|
|
m.subscribedVideoCount.Inc()
|
|
case livekit.TrackType_AUDIO:
|
|
m.subscribedAudioCount.Inc()
|
|
}
|
|
|
|
if !isSync && subTrack.NeedsNegotiation() {
|
|
m.params.Participant.Negotiate(false)
|
|
}
|
|
|
|
go m.params.OnTrackSubscribed(subTrack)
|
|
|
|
sub.logger.Debugw(
|
|
"subscribed to track",
|
|
"subscribedAudioCount", m.subscribedAudioCount.Load(),
|
|
"subscribedVideoCount", m.subscribedVideoCount.Load(),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) unsubscribe(s *mediaTrackSubscription) error {
|
|
s.logger.Debugw("executing unsubscribe")
|
|
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
// already unsubscribed
|
|
return nil
|
|
}
|
|
|
|
track := subTrack.MediaTrack()
|
|
pID := s.subscriberID
|
|
m.pendingUnsubscribes.Inc()
|
|
go func() {
|
|
defer m.pendingUnsubscribes.Dec()
|
|
track.RemoveSubscriber(pID, false)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) unsubscribeSynchronous(trackID livekit.TrackID) error {
|
|
m.lock.Lock()
|
|
sub := m.subscriptions[trackID]
|
|
delete(m.subscriptions, trackID)
|
|
m.lock.Unlock()
|
|
if sub == nil {
|
|
// already unsubscribed or not subscribed
|
|
return nil
|
|
}
|
|
|
|
sub.logger.Debugw("executing unsubscribe synchronous")
|
|
|
|
if sub.isCanceled() {
|
|
prometheus.RecordTrackSubscribeCancels(1)
|
|
}
|
|
|
|
subTrack := sub.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
// already unsubscribed
|
|
return nil
|
|
}
|
|
|
|
track := subTrack.MediaTrack()
|
|
track.RemoveSubscriber(sub.subscriberID, false)
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) handleSourceTrackRemoved(trackID livekit.TrackID) {
|
|
m.lock.Lock()
|
|
sub := m.subscriptions[trackID]
|
|
m.lock.Unlock()
|
|
|
|
if sub != nil {
|
|
if sub.isCanceled() {
|
|
prometheus.RecordTrackSubscribeCancels(1)
|
|
}
|
|
sub.handleSourceTrackRemoved()
|
|
}
|
|
}
|
|
|
|
// DownTrack closing is how the publisher signifies that the subscription is no longer fulfilled
|
|
// this could be due to a few reasons:
|
|
// - subscriber-initiated unsubscribe
|
|
// - UpTrack was closed
|
|
// - publisher revoked permissions for the participant
|
|
func (m *SubscriptionManager) handleSubscribedTrackClose(s *mediaTrackSubscription, isExpectedToResume bool) {
|
|
s.logger.Debugw(
|
|
"subscribed track closed",
|
|
"isExpectedToResume", isExpectedToResume,
|
|
)
|
|
wasBound := s.isBound()
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
return
|
|
}
|
|
s.setSubscribedTrack(nil)
|
|
|
|
var relieveFromLimits bool
|
|
switch subTrack.MediaTrack().Kind() {
|
|
case livekit.TrackType_VIDEO:
|
|
videoCount := m.subscribedVideoCount.Dec()
|
|
relieveFromLimits = m.params.SubscriptionLimitVideo > 0 && videoCount == m.params.SubscriptionLimitVideo-1
|
|
case livekit.TrackType_AUDIO:
|
|
audioCount := m.subscribedAudioCount.Dec()
|
|
relieveFromLimits = m.params.SubscriptionLimitAudio > 0 && audioCount == m.params.SubscriptionLimitAudio-1
|
|
}
|
|
|
|
m.unmarkSubscribedTo(s.getPublisherID(), s.trackID)
|
|
|
|
go m.params.OnTrackUnsubscribed(subTrack)
|
|
|
|
// trigger to decrement unsubscribed counter as long as track has been bound
|
|
// Only log an analytics event when
|
|
// * the participant isn't closing
|
|
// * it's not a migration
|
|
if wasBound {
|
|
m.params.TelemetryListener.OnTrackUnsubscribed(
|
|
s.subscriberID,
|
|
&livekit.TrackInfo{Sid: string(s.trackID), Type: subTrack.MediaTrack().Kind()},
|
|
!isExpectedToResume,
|
|
)
|
|
|
|
dt := subTrack.DownTrack()
|
|
if dt != nil {
|
|
stats := dt.GetTrackStats()
|
|
if stats != nil {
|
|
m.params.TelemetryListener.OnTrackSubscribeRTPStats(
|
|
s.subscriberID,
|
|
s.trackID,
|
|
dt.Mime(),
|
|
stats,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !isExpectedToResume {
|
|
sender := subTrack.RTPSender()
|
|
if sender != nil {
|
|
s.logger.Debugw("removing PeerConnection track",
|
|
"kind", subTrack.MediaTrack().Kind(),
|
|
)
|
|
|
|
if err := m.params.Participant.RemoveTrackLocal(sender); err != nil {
|
|
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
|
|
// most of these are safe to ignore, since the track state might have already
|
|
// been set to Inactive
|
|
m.params.Logger.Debugw("could not remove remoteTrack from forwarder",
|
|
"error", err,
|
|
"publisher", subTrack.PublisherIdentity(),
|
|
"publisherID", subTrack.PublisherID(),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
m.params.Participant.Negotiate(false)
|
|
} else {
|
|
t := time.Now()
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
if !m.params.UseOneShotSignallingMode {
|
|
if relieveFromLimits {
|
|
m.queueReconcile(trackIDForReconcileSubscriptions)
|
|
} else {
|
|
m.queueReconcile(s.trackID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) subscribeDataTrack(sub *dataTrackSubscription) error {
|
|
sub.logger.Debugw("executing subscribe")
|
|
|
|
if !m.params.Participant.CanSubscribe() {
|
|
return ErrNoSubscribePermission
|
|
}
|
|
|
|
trackID := sub.trackID
|
|
res := m.params.DataTrackResolver(m.params.Participant, trackID)
|
|
sub.logger.Debugw("resolved data track", "result", res)
|
|
|
|
if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) {
|
|
// set callback only when we haven't done it before
|
|
// we set the observer before checking for existence of track, so that we may get notified
|
|
// when the track becomes available
|
|
res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() {
|
|
m.queueReconcileDataTrack(trackID)
|
|
})
|
|
}
|
|
if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) {
|
|
res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() {
|
|
// re-resolve the track in case the same track had been re-published
|
|
res := m.params.DataTrackResolver(m.params.Participant, trackID)
|
|
if res.DataTrack != nil {
|
|
// do not unsubscribe, track is still available
|
|
return
|
|
}
|
|
m.handleSourceDataTrackRemoved(trackID)
|
|
})
|
|
}
|
|
|
|
dataTrack := res.DataTrack
|
|
if dataTrack == nil {
|
|
return ErrTrackNotFound
|
|
}
|
|
|
|
sub.setPublisher(res.PublisherIdentity, res.PublisherID)
|
|
|
|
dataDownTrack, err := dataTrack.AddSubscriber(m.params.Participant)
|
|
if err != nil && !errors.Is(err, errAlreadySubscribed) {
|
|
return err
|
|
}
|
|
if err == errAlreadySubscribed {
|
|
sub.logger.Debugw("already subscribed to data track")
|
|
}
|
|
if err == nil && dataDownTrack != nil { // subTrack could be nil if already subscribed
|
|
sub.setDataDownTrack(dataDownTrack)
|
|
sub.logger.Debugw("subscribed to data track")
|
|
}
|
|
|
|
m.markSubscribedTo(sub.getPublisherID(), trackID)
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) unsubscribeDataTrack(s *dataTrackSubscription) error {
|
|
s.logger.Debugw("executing unsubscribe")
|
|
|
|
dataDownTrack := s.getDataDownTrack()
|
|
if dataDownTrack == nil {
|
|
// already unsubscribed
|
|
return nil
|
|
}
|
|
|
|
dataTrack := dataDownTrack.PublishDataTrack()
|
|
dataTrack.RemoveSubscriber(s.subscriberID)
|
|
|
|
m.unmarkSubscribedTo(s.getPublisherID(), s.trackID)
|
|
return nil
|
|
}
|
|
|
|
func (m *SubscriptionManager) notifyDataTrackSubscriberHandles() {
|
|
m.lock.Lock()
|
|
handles := make(map[uint32]*livekit.DataTrackSubscriberHandles_PublishedDataTrack, len(m.dataTrackSubscriptions))
|
|
for _, sub := range m.dataTrackSubscriptions {
|
|
if !sub.isDesired() {
|
|
continue
|
|
}
|
|
dataDownTrack := sub.getDataDownTrack()
|
|
if dataDownTrack == nil {
|
|
continue
|
|
}
|
|
handles[uint32(dataDownTrack.Handle())] = &livekit.DataTrackSubscriberHandles_PublishedDataTrack{
|
|
PublisherIdentity: string(sub.publisherIdentity),
|
|
PublisherSid: string(sub.publisherID),
|
|
TrackSid: string(sub.trackID),
|
|
}
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
m.params.Participant.SendDataTrackSubscriberHandles(handles)
|
|
}
|
|
|
|
func (m *SubscriptionManager) handleSourceDataTrackRemoved(trackID livekit.TrackID) {
|
|
m.lock.Lock()
|
|
sub := m.dataTrackSubscriptions[trackID]
|
|
m.lock.Unlock()
|
|
|
|
if sub != nil {
|
|
sub.handleSourceTrackRemoved()
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) markSubscribedTo(publisherID livekit.ParticipantID, trackID livekit.TrackID) {
|
|
// add mark the participant as someone we've subscribed to
|
|
firstSubscribe := false
|
|
m.lock.Lock()
|
|
pTracks := m.subscribedTo[publisherID]
|
|
changedCB := m.onSubscribeStatusChanged
|
|
if pTracks == nil {
|
|
pTracks = make(map[livekit.TrackID]struct{})
|
|
m.subscribedTo[publisherID] = pTracks
|
|
firstSubscribe = true
|
|
}
|
|
pTracks[trackID] = struct{}{}
|
|
m.lock.Unlock()
|
|
|
|
if changedCB != nil && firstSubscribe {
|
|
changedCB(publisherID, true)
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) unmarkSubscribedTo(publisherID livekit.ParticipantID, trackID livekit.TrackID) {
|
|
// remove from subscribedTo
|
|
lastSubscription := false
|
|
m.lock.Lock()
|
|
changedCB := m.onSubscribeStatusChanged
|
|
pTracks := m.subscribedTo[publisherID]
|
|
if pTracks != nil {
|
|
delete(pTracks, trackID)
|
|
if len(pTracks) == 0 {
|
|
delete(m.subscribedTo, publisherID)
|
|
lastSubscription = true
|
|
}
|
|
}
|
|
m.lock.Unlock()
|
|
if changedCB != nil && lastSubscription {
|
|
go changedCB(publisherID, false)
|
|
}
|
|
}
|
|
|
|
func (m *SubscriptionManager) getNumCancellations() int {
|
|
m.lock.RLock()
|
|
defer m.lock.RUnlock()
|
|
|
|
numCancellations := 0
|
|
for _, sub := range m.subscriptions {
|
|
if sub.isCanceled() {
|
|
numCancellations++
|
|
}
|
|
}
|
|
return numCancellations
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
|
|
type trackSubscription struct {
|
|
subscriberID livekit.ParticipantID
|
|
trackID livekit.TrackID
|
|
logger logger.Logger
|
|
|
|
lock sync.RWMutex
|
|
desired bool
|
|
publisherID livekit.ParticipantID
|
|
publisherIdentity livekit.ParticipantIdentity
|
|
changedNotifier types.ChangeNotifier
|
|
removedNotifier types.ChangeNotifier
|
|
|
|
numAttempts atomic.Int32
|
|
|
|
// the later of when subscription was requested OR when the first failure was encountered OR when permission is granted
|
|
// this timestamp determines when failures are reported
|
|
subStartedAt atomic.Pointer[time.Time]
|
|
|
|
// the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume
|
|
subscribeAt atomic.Pointer[time.Time]
|
|
}
|
|
|
|
func (s *trackSubscription) setPublisher(publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.publisherID = publisherID
|
|
s.publisherIdentity = publisherIdentity
|
|
}
|
|
|
|
func (s *trackSubscription) getPublisherID() livekit.ParticipantID {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.publisherID
|
|
}
|
|
|
|
func (s *trackSubscription) setDesired(desired bool) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
if desired {
|
|
// as long as user explicitly set it to desired
|
|
// we'll reset the timer so it has sufficient time to reconcile
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
|
|
if s.desired == desired {
|
|
return false
|
|
}
|
|
s.desired = desired
|
|
|
|
// when no longer desired, we no longer care about change notifications
|
|
if desired {
|
|
// reset attempts
|
|
s.numAttempts.Store(0)
|
|
} else {
|
|
s.setChangedNotifierLocked(nil)
|
|
s.setRemovedNotifierLocked(nil)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) isDesired() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired
|
|
}
|
|
|
|
func (s *trackSubscription) recordAttempt(success bool) {
|
|
if !success {
|
|
if s.numAttempts.Load() == 0 {
|
|
// on first failure, we'd want to start the timer
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
}
|
|
s.numAttempts.Add(1)
|
|
} else {
|
|
s.numAttempts.Store(0)
|
|
}
|
|
}
|
|
|
|
func (s *trackSubscription) getNumAttempts() int32 {
|
|
return s.numAttempts.Load()
|
|
}
|
|
|
|
func (s *trackSubscription) durationSinceStart() time.Duration {
|
|
t := s.subStartedAt.Load()
|
|
if t == nil {
|
|
return 0
|
|
}
|
|
return time.Since(*t)
|
|
}
|
|
|
|
func (s *trackSubscription) setChangedNotifier(notifier types.ChangeNotifier) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.setChangedNotifierLocked(notifier)
|
|
}
|
|
|
|
func (s *trackSubscription) setChangedNotifierLocked(notifier types.ChangeNotifier) bool {
|
|
if s.changedNotifier == notifier {
|
|
return false
|
|
}
|
|
|
|
existing := s.changedNotifier
|
|
s.changedNotifier = notifier
|
|
|
|
if existing != nil {
|
|
go existing.RemoveObserver(string(s.subscriberID))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) setRemovedNotifier(notifier types.ChangeNotifier) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.setRemovedNotifierLocked(notifier)
|
|
}
|
|
|
|
func (s *trackSubscription) setRemovedNotifierLocked(notifier types.ChangeNotifier) bool {
|
|
if s.removedNotifier == notifier {
|
|
return false
|
|
}
|
|
|
|
existing := s.removedNotifier
|
|
s.removedNotifier = notifier
|
|
|
|
if existing != nil {
|
|
go existing.RemoveObserver(string(s.subscriberID))
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *trackSubscription) handleSourceTrackRemoved() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
// source track removed, we would unsubscribe
|
|
s.logger.Debugw("unsubscribing from track since source track was removed")
|
|
s.desired = false
|
|
|
|
s.setChangedNotifierLocked(nil)
|
|
s.setRemovedNotifierLocked(nil)
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
|
|
type mediaTrackSubscription struct {
|
|
trackSubscription
|
|
|
|
settings *livekit.UpdateTrackSettings
|
|
hasPermissionInitialized bool
|
|
hasPermission bool
|
|
subscribedTrack types.SubscribedTrack
|
|
eventSent atomic.Bool
|
|
bound bool
|
|
kind atomic.Pointer[livekit.TrackType]
|
|
|
|
succRecordCounter atomic.Int32
|
|
}
|
|
|
|
func newMediaTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *mediaTrackSubscription {
|
|
s := &mediaTrackSubscription{
|
|
trackSubscription: trackSubscription{
|
|
subscriberID: subscriberID,
|
|
trackID: trackID,
|
|
logger: l,
|
|
},
|
|
}
|
|
t := time.Now()
|
|
s.subscribeAt.Store(&t)
|
|
return s
|
|
}
|
|
|
|
// set permission and return true if it has changed
|
|
func (s *mediaTrackSubscription) setHasPermission(perm bool) bool {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
if s.hasPermissionInitialized && s.hasPermission == perm {
|
|
return false
|
|
}
|
|
|
|
s.hasPermissionInitialized = true
|
|
s.hasPermission = perm
|
|
if s.hasPermission {
|
|
// when permission is granted, reset the timer so it has sufficient time to reconcile
|
|
t := time.Now()
|
|
s.subStartedAt.Store(&t)
|
|
s.subscribeAt.Store(&t)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) getHasPermission() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.hasPermission
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) setSubscribedTrack(track types.SubscribedTrack) {
|
|
s.lock.Lock()
|
|
oldTrack := s.subscribedTrack
|
|
s.subscribedTrack = track
|
|
s.bound = false
|
|
settings := s.settings
|
|
s.lock.Unlock()
|
|
|
|
if settings != nil && track != nil {
|
|
s.logger.Debugw("restoring subscriber settings", "settings", logger.Proto(settings))
|
|
track.UpdateSubscriberSettings(settings, true)
|
|
}
|
|
if oldTrack != nil {
|
|
oldTrack.OnClose(nil)
|
|
}
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) getSubscribedTrack() types.SubscribedTrack {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.subscribedTrack
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) trySetKind(kind livekit.TrackType) {
|
|
s.kind.CompareAndSwap(nil, &kind)
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) getKind() (livekit.TrackType, bool) {
|
|
kind := s.kind.Load()
|
|
if kind == nil {
|
|
return livekit.TrackType_AUDIO, false
|
|
}
|
|
return *kind, true
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) setSettings(settings *livekit.UpdateTrackSettings) {
|
|
s.lock.Lock()
|
|
s.settings = settings
|
|
subTrack := s.subscribedTrack
|
|
s.lock.Unlock()
|
|
if subTrack != nil {
|
|
subTrack.UpdateSubscriberSettings(settings, false)
|
|
}
|
|
}
|
|
|
|
// mark the subscription as bound - when we've received the client's answer
|
|
func (s *mediaTrackSubscription) setBound() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
s.bound = true
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) isBound() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.bound
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) maybeRecordError(tl types.ParticipantTelemetryListener, err error, isUserError bool) {
|
|
if s.eventSent.Swap(true) {
|
|
return
|
|
}
|
|
|
|
tl.OnTrackSubscribeFailed(s.subscriberID, s.trackID, err, isUserError)
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) maybeRecordSuccess(tl types.ParticipantTelemetryListener) {
|
|
subTrack := s.getSubscribedTrack()
|
|
if subTrack == nil {
|
|
return
|
|
}
|
|
mediaTrack := subTrack.MediaTrack()
|
|
if mediaTrack == nil {
|
|
return
|
|
}
|
|
|
|
d := time.Since(*s.subscribeAt.Load())
|
|
s.logger.Debugw("track subscribed", "cost", d.Milliseconds())
|
|
subscriber := subTrack.Subscriber()
|
|
prometheus.RecordSubscribeTime(
|
|
subscriber.GetCountry(),
|
|
mediaTrack.Source(),
|
|
mediaTrack.Kind(),
|
|
d,
|
|
subscriber.GetClientInfo().GetSdk(),
|
|
subscriber.Kind(),
|
|
int(s.succRecordCounter.Inc()),
|
|
)
|
|
|
|
eventSent := s.eventSent.Swap(true)
|
|
|
|
pi := &livekit.ParticipantInfo{
|
|
Identity: string(subTrack.PublisherIdentity()),
|
|
Sid: string(subTrack.PublisherID()),
|
|
}
|
|
tl.OnTrackSubscribed(s.subscriberID, mediaTrack.ToProto(), pi, !eventSent)
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) isCanceled() bool {
|
|
return !s.eventSent.Load() && s.isDesired()
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) needsSubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired && s.subscribedTrack == nil
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) needsUnsubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.subscribedTrack != nil
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) needsBind() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired && s.subscribedTrack != nil && !s.bound
|
|
}
|
|
|
|
func (s *mediaTrackSubscription) needsCleanup() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.subscribedTrack == nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------
|
|
|
|
type dataTrackSubscription struct {
|
|
trackSubscription
|
|
|
|
subscriptionOptions *livekit.DataTrackSubscriptionOptions
|
|
|
|
dataDownTrack types.DataDownTrack
|
|
}
|
|
|
|
func newDataTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *dataTrackSubscription {
|
|
s := &dataTrackSubscription{
|
|
trackSubscription: trackSubscription{
|
|
subscriberID: subscriberID,
|
|
trackID: trackID,
|
|
logger: l,
|
|
},
|
|
}
|
|
t := time.Now()
|
|
s.subscribeAt.Store(&t)
|
|
return s
|
|
}
|
|
|
|
func (s *dataTrackSubscription) needsSubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.desired && s.dataDownTrack == nil
|
|
}
|
|
|
|
func (s *dataTrackSubscription) needsUnsubscribe() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.dataDownTrack != nil
|
|
}
|
|
|
|
func (s *dataTrackSubscription) needsCleanup() bool {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return !s.desired && s.dataDownTrack == nil
|
|
}
|
|
|
|
func (s *dataTrackSubscription) setDataDownTrack(dataDownTrack types.DataDownTrack) {
|
|
s.lock.Lock()
|
|
s.dataDownTrack = dataDownTrack
|
|
subscriptionOptions := s.subscriptionOptions
|
|
s.lock.Unlock()
|
|
|
|
if dataDownTrack != nil {
|
|
s.logger.Debugw("restoring data track subscription options", "subscriptionOptions", logger.Proto(subscriptionOptions))
|
|
dataDownTrack.UpdateSubscriptionOptions(subscriptionOptions)
|
|
}
|
|
|
|
// DT-TODO - DataTrack close callback on previous if not nil?, see setSubscribedTrack for example
|
|
}
|
|
|
|
func (s *dataTrackSubscription) getDataDownTrack() types.DataDownTrack {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.dataDownTrack
|
|
}
|
|
|
|
func (s *dataTrackSubscription) setSubscriptionOptions(subscriptionOptions *livekit.DataTrackSubscriptionOptions) {
|
|
s.lock.Lock()
|
|
s.subscriptionOptions = subscriptionOptions
|
|
dataDownTrack := s.dataDownTrack
|
|
s.lock.Unlock()
|
|
if dataDownTrack != nil {
|
|
dataDownTrack.UpdateSubscriptionOptions(subscriptionOptions)
|
|
}
|
|
}
|