Files
livekit/pkg/rtc/mediatracksubscriptions.go
2022-04-19 16:45:35 +05:30

591 lines
17 KiB
Go

package rtc
import (
"context"
"errors"
"sync"
"time"
"github.com/bep/debounce"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry"
)
const (
initialQualityUpdateWait = 10 * time.Second
)
// MediaTrackSubscriptions manages subscriptions of a media track
type MediaTrackSubscriptions struct {
params MediaTrackSubscriptionsParams
subscribedTracksMu sync.RWMutex
subscribedTracks map[livekit.ParticipantID]types.SubscribedTrack
pendingClose map[livekit.ParticipantID]types.SubscribedTrack
onNoSubscribers func()
// quality level enable/disable
maxQualityLock sync.RWMutex
maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality
maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality
maxSubscribedQuality livekit.VideoQuality
maxSubscribedQualityDebounce func(func())
onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedQuality, maxSubscribedQuality livekit.VideoQuality)
maxQualityTimer *time.Timer
}
type MediaTrackSubscriptionsParams struct {
MediaTrack types.MediaTrack
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
VideoConfig config.VideoConfig
Telemetry telemetry.TelemetryService
Logger logger.Logger
}
func NewMediaTrackSubscriptions(params MediaTrackSubscriptionsParams) *MediaTrackSubscriptions {
t := &MediaTrackSubscriptions{
params: params,
subscribedTracks: make(map[livekit.ParticipantID]types.SubscribedTrack),
pendingClose: make(map[livekit.ParticipantID]types.SubscribedTrack),
maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality),
maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality),
maxSubscribedQuality: livekit.VideoQuality_HIGH,
maxSubscribedQualityDebounce: debounce.New(params.VideoConfig.DynacastPauseDelay),
}
return t
}
func (t *MediaTrackSubscriptions) Start() {
t.startMaxQualityTimer(false)
}
func (t *MediaTrackSubscriptions) Restart() {
t.startMaxQualityTimer(true)
}
func (t *MediaTrackSubscriptions) Close() {
t.stopMaxQualityTimer()
}
func (t *MediaTrackSubscriptions) OnNoSubscribers(f func()) {
t.onNoSubscribers = f
}
func (t *MediaTrackSubscriptions) SetMuted(muted bool) {
// update mute of all subscribed tracks
for _, st := range t.getAllSubscribedTracks() {
st.SetPublisherMuted(muted)
}
}
func (t *MediaTrackSubscriptions) IsSubscriber(subID livekit.ParticipantID) bool {
t.subscribedTracksMu.RLock()
defer t.subscribedTracksMu.RUnlock()
_, ok := t.subscribedTracks[subID]
return ok
}
// AddSubscriber subscribes sub to current mediaTrack
func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, codec webrtc.RTPCodecCapability, wr WrappedReceiver) (*sfu.DownTrack, error) {
trackID := t.params.MediaTrack.ID()
subscriberID := sub.ID()
// don't subscribe to the same track multiple times
t.subscribedTracksMu.Lock()
if _, ok := t.subscribedTracks[subscriberID]; ok {
t.subscribedTracksMu.Unlock()
return nil, nil
}
t.subscribedTracksMu.Unlock()
var rtcpFeedback []webrtc.RTCPFeedback
switch t.params.MediaTrack.Kind() {
case livekit.TrackType_AUDIO:
rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Audio
case livekit.TrackType_VIDEO:
rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Video
}
downTrack, err := sfu.NewDownTrack(
webrtc.RTPCodecCapability{
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback: rtcpFeedback,
},
wr,
t.params.BufferFactory,
subscriberID,
t.params.ReceiverConfig.PacketBufferSize,
LoggerWithTrack(sub.GetLogger(), trackID),
)
if err != nil {
return nil, err
}
subTrack := NewSubscribedTrack(SubscribedTrackParams{
PublisherID: t.params.MediaTrack.PublisherID(),
PublisherIdentity: t.params.MediaTrack.PublisherIdentity(),
SubscriberID: subscriberID,
MediaTrack: t.params.MediaTrack,
DownTrack: downTrack,
AdaptiveStream: sub.GetAdaptiveStream(),
})
var transceiver *webrtc.RTPTransceiver
var sender *webrtc.RTPSender
if sub.ProtocolVersion().SupportsTransceiverReuse() {
//
// AddTrack will create a new transceiver or re-use an unused one
// if the attributes match. This prevents SDP from bloating
// because of dormant transceivers building up.
//
sender, err = sub.SubscriberPC().AddTrack(downTrack)
if err != nil {
return nil, err
}
// as there is no way to get transceiver from sender, search
for _, tr := range sub.SubscriberPC().GetTransceivers() {
if tr.Sender() == sender {
transceiver = tr
break
}
}
if transceiver == nil {
// cannot add, no transceiver
return nil, errors.New("cannot subscribe without a transceiver in place")
}
} else {
transceiver, err = sub.SubscriberPC().AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
})
if err != nil {
return nil, err
}
sender = transceiver.Sender()
if sender == nil {
// cannot add, no sender
return nil, errors.New("cannot subscribe without a sender in place")
}
}
sendParameters := sender.GetParameters()
downTrack.SetRTPHeaderExtensions(sendParameters.HeaderExtensions)
downTrack.SetTransceiver(transceiver)
// when out track is bound, start loop to send reports
downTrack.OnBind(func() {
go subTrack.Bound()
go t.sendDownTrackBindingReports(sub)
})
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat)
})
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
t.notifySubscriberMaxQuality(subscriberID, QualityForSpatialLayer(layer))
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
go sub.UpdateRTT(rtt)
})
downTrack.OnCloseHandler(func() {
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, subscriberID)
delete(t.pendingClose, subscriberID)
t.subscribedTracksMu.Unlock()
t.maybeNotifyNoSubscribers()
t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}
// if the source has been terminated, we'll need to terminate all the subscribed tracks
// however, if the dest sub has disconnected, then we can skip
if sender == nil {
return
}
t.params.Logger.Debugw("removing peerconnection track",
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
"kind", t.params.MediaTrack.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
// sub closing, can skip removing subscribedtracks
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
t.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"subscriber", sub.Identity(),
"subscriberID", subscriberID,
)
}
}
sub.RemoveSubscribedTrack(subTrack)
sub.Negotiate()
})
t.subscribedTracksMu.Lock()
t.subscribedTracks[subscriberID] = subTrack
t.subscribedTracksMu.Unlock()
subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted())
// since sub will lock, run it in a goroutine to avoid deadlocks
go func() {
sub.AddSubscribedTrack(subTrack)
sub.Negotiate()
}()
// initialize to default layer
t.notifySubscriberMaxQuality(subscriberID, livekit.VideoQuality_HIGH)
t.params.Telemetry.TrackSubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto(),
&livekit.ParticipantInfo{Sid: string(t.params.MediaTrack.PublisherID()), Identity: string(t.params.MediaTrack.PublisherIdentity())})
return downTrack, nil
}
// RemoveSubscriber removes participant from subscription
// stop all forwarders to the client
func (t *MediaTrackSubscriptions) RemoveSubscriber(participantID livekit.ParticipantID, resume bool) {
subTrack := t.getSubscribedTrack(participantID)
t.subscribedTracksMu.Lock()
delete(t.subscribedTracks, participantID)
if subTrack != nil {
t.pendingClose[participantID] = subTrack
}
t.subscribedTracksMu.Unlock()
if subTrack != nil {
subTrack.DownTrack().CloseWithFlush(!resume)
}
}
func (t *MediaTrackSubscriptions) RemoveAllSubscribers() {
t.params.Logger.Debugw("removing all subscribers")
t.subscribedTracksMu.Lock()
subscribedTracks := t.getAllSubscribedTracksLocked()
t.subscribedTracks = make(map[livekit.ParticipantID]types.SubscribedTrack)
for _, subTrack := range subscribedTracks {
t.pendingClose[subTrack.SubscriberID()] = subTrack
}
t.subscribedTracksMu.Unlock()
for _, subTrack := range subscribedTracks {
subTrack.DownTrack().Close()
}
}
func (t *MediaTrackSubscriptions) ResyncAllSubscribers() {
t.params.Logger.Debugw("resyncing all subscribers")
for _, subTrack := range t.getAllSubscribedTracks() {
subTrack.DownTrack().Resync()
}
}
func (t *MediaTrackSubscriptions) RevokeDisallowedSubscribers(allowedSubscriberIDs []livekit.ParticipantID) []livekit.ParticipantID {
var revokedSubscriberIDs []livekit.ParticipantID
// LK-TODO: large number of subscribers needs to be solved for this loop
for _, subTrack := range t.getAllSubscribedTracks() {
found := false
for _, allowedID := range allowedSubscriberIDs {
if subTrack.SubscriberID() == allowedID {
found = true
break
}
}
if !found {
go subTrack.DownTrack().Close()
revokedSubscriberIDs = append(revokedSubscriberIDs, subTrack.SubscriberID())
}
}
return revokedSubscriberIDs
}
func (t *MediaTrackSubscriptions) GetAllSubscribers() []livekit.ParticipantID {
t.subscribedTracksMu.RLock()
defer t.subscribedTracksMu.RUnlock()
subs := make([]livekit.ParticipantID, 0, len(t.subscribedTracks))
for id := range t.subscribedTracks {
subs = append(subs, id)
}
return subs
}
func (t *MediaTrackSubscriptions) UpdateVideoLayers() {
for _, st := range t.getAllSubscribedTracks() {
st.UpdateVideoLayer()
}
}
func (t *MediaTrackSubscriptions) getSubscribedTrack(subscriberID livekit.ParticipantID) types.SubscribedTrack {
t.subscribedTracksMu.RLock()
defer t.subscribedTracksMu.RUnlock()
return t.subscribedTracks[subscriberID]
}
func (t *MediaTrackSubscriptions) getAllSubscribedTracks() []types.SubscribedTrack {
t.subscribedTracksMu.RLock()
defer t.subscribedTracksMu.RUnlock()
return t.getAllSubscribedTracksLocked()
}
func (t *MediaTrackSubscriptions) getAllSubscribedTracksLocked() []types.SubscribedTrack {
subTracks := make([]types.SubscribedTrack, 0, len(t.subscribedTracks))
for _, subTrack := range t.subscribedTracks {
subTracks = append(subTracks, subTrack)
}
return subTracks
}
// TODO: send for all down tracks from the source participant
// https://tools.ietf.org/html/rfc7941
func (t *MediaTrackSubscriptions) sendDownTrackBindingReports(sub types.LocalParticipant) {
var sd []rtcp.SourceDescriptionChunk
subTrack := t.getSubscribedTrack(sub.ID())
if subTrack == nil {
return
}
chunks := subTrack.DownTrack().CreateSourceDescriptionChunks()
if chunks == nil {
return
}
sd = append(sd, chunks...)
pkts := []rtcp.Packet{
&rtcp.SourceDescription{Chunks: sd},
}
go func() {
defer RecoverSilent()
batch := pkts
i := 0
for {
if err := sub.SubscriberPC().WriteRTCP(batch); err != nil {
t.params.Logger.Errorw("could not write RTCP", err)
return
}
if i > 5 {
return
}
i++
time.Sleep(20 * time.Millisecond)
}
}()
}
func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} {
subscribedTrackInfo := make([]map[string]interface{}, 0)
for _, val := range t.getAllSubscribedTracks() {
if st, ok := val.(*SubscribedTrack); ok {
dt := st.DownTrack().DebugInfo()
dt["PubMuted"] = st.pubMuted.Load()
dt["SubMuted"] = st.subMuted.Load()
subscribedTrackInfo = append(subscribedTrackInfo, dt)
}
}
return subscribedTrackInfo
}
func (t *MediaTrackSubscriptions) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedQuality, maxSubscribedQuality livekit.VideoQuality)) {
t.onSubscribedMaxQualityChange = f
}
func (t *MediaTrackSubscriptions) notifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.maxQualityLock.Lock()
if quality == livekit.VideoQuality_OFF {
_, ok := t.maxSubscriberQuality[subscriberID]
if !ok {
t.maxQualityLock.Unlock()
return
}
delete(t.maxSubscriberQuality, subscriberID)
} else {
maxQuality, ok := t.maxSubscriberQuality[subscriberID]
if ok && maxQuality == quality {
t.maxQualityLock.Unlock()
return
}
t.maxSubscriberQuality[subscriberID] = quality
}
t.maxQualityLock.Unlock()
t.UpdateQualityChange(false)
}
func (t *MediaTrackSubscriptions) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.maxQualityLock.Lock()
if quality == livekit.VideoQuality_OFF {
_, ok := t.maxSubscriberNodeQuality[nodeID]
if !ok {
t.maxQualityLock.Unlock()
return
}
delete(t.maxSubscriberNodeQuality, nodeID)
} else {
maxQuality, ok := t.maxSubscriberNodeQuality[nodeID]
if ok && maxQuality == quality {
t.maxQualityLock.Unlock()
return
}
t.maxSubscriberNodeQuality[nodeID] = quality
}
t.maxQualityLock.Unlock()
t.UpdateQualityChange(false)
}
func (t *MediaTrackSubscriptions) UpdateQualityChange(force bool) {
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.maxQualityLock.Lock()
maxSubscribedQuality := livekit.VideoQuality_OFF
for _, subQuality := range t.maxSubscriberQuality {
if maxSubscribedQuality == livekit.VideoQuality_OFF || subQuality > maxSubscribedQuality {
maxSubscribedQuality = subQuality
}
}
for _, subQuality := range t.maxSubscriberNodeQuality {
if maxSubscribedQuality == livekit.VideoQuality_OFF || subQuality > maxSubscribedQuality {
maxSubscribedQuality = subQuality
}
}
if maxSubscribedQuality == t.maxSubscribedQuality && !force {
t.maxQualityLock.Unlock()
return
}
// if quality comes down(or become OFF), delay notify to publisher
if (t.maxSubscribedQuality != livekit.VideoQuality_OFF) &&
(t.maxSubscribedQuality > maxSubscribedQuality || maxSubscribedQuality == livekit.VideoQuality_OFF) &&
t.params.VideoConfig.DynacastPauseDelay > 0 && !force {
t.params.Logger.Debugw("throttle quality change", "from", t.maxSubscribedQuality, "to", maxSubscribedQuality)
t.maxQualityLock.Unlock()
t.maxSubscribedQualityDebounce(func() {
t.UpdateQualityChange(true)
})
return
}
t.maxSubscribedQuality = maxSubscribedQuality
var subscribedQualities []*livekit.SubscribedQuality
if t.maxSubscribedQuality == livekit.VideoQuality_OFF {
subscribedQualities = []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
{Quality: livekit.VideoQuality_HIGH, Enabled: false},
}
} else {
for q := livekit.VideoQuality_LOW; q <= livekit.VideoQuality_HIGH; q++ {
subscribedQualities = append(subscribedQualities, &livekit.SubscribedQuality{
Quality: q,
Enabled: q <= t.maxSubscribedQuality,
})
}
}
t.maxQualityLock.Unlock()
if t.onSubscribedMaxQualityChange != nil {
t.onSubscribedMaxQualityChange(subscribedQualities, maxSubscribedQuality)
}
}
func (t *MediaTrackSubscriptions) startMaxQualityTimer(force bool) {
t.maxQualityLock.Lock()
defer t.maxQualityLock.Unlock()
if t.params.MediaTrack.Kind() != livekit.TrackType_VIDEO {
return
}
t.maxQualityTimer = time.AfterFunc(initialQualityUpdateWait, func() {
t.stopMaxQualityTimer()
t.UpdateQualityChange(force)
})
}
func (t *MediaTrackSubscriptions) stopMaxQualityTimer() {
t.maxQualityLock.Lock()
defer t.maxQualityLock.Unlock()
if t.maxQualityTimer != nil {
t.maxQualityTimer.Stop()
t.maxQualityTimer = nil
}
}
func (t *MediaTrackSubscriptions) maybeNotifyNoSubscribers() {
if t.onNoSubscribers == nil {
return
}
t.subscribedTracksMu.RLock()
empty := len(t.subscribedTracks) == 0 && len(t.pendingClose) == 0
t.subscribedTracksMu.RUnlock()
if empty {
t.onNoSubscribers()
}
}