Files
livekit/pkg/rtc/subscribedtrack.go
CloudWebRTC cfedcc71d0 feat: acquire requested video layer directly at HIGH quality by default (#4595)
* feat: acquire requested video layer directly at HIGH quality by default

Two changes that together remove the visible low->high quality ramp for a new
subscriber (both publisher-first and subscriber-first join orders):

1. Default a subscriber's initial video quality to HIGH on bind instead of LOW
   for adaptive stream, so the subscribed max layer is the top layer. Adaptive
   stream clients can still scale down afterwards based on viewport.

2. On initial layer acquisition the forwarder/selector latch directly onto the
   allocator's target (the requested top layer) instead of opportunistically
   latching onto the first lower key frame that arrives. A short
   initial-acquisition grace aims the target at the requested layer; if it does
   not show up in time, the target falls back to the highest layer seen so
   acquisition never stalls.

Always on - no configuration flag.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat: gate start-at-desired-quality behind EnableStartAtDesiredQuality flag

Put the "acquire requested video layer directly at HIGH quality" behavior
behind a per-subscriber EnableStartAtDesiredQuality flag (default off, so
the original low->high ramp-up is restored unless enabled).

Plumbed from config.RTC.EnableStartAtDesiredQuality through ParticipantParams
-> SubscribedTrack/DownTrack -> Forwarder -> simulcast selector, gating all
three behavior changes: the HIGH default on bind, the forwarder's
initial-acquisition grace, and the selector's direct-latch-onto-target.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* remove config.

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-19 20:12:54 +08:00

498 lines
16 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"sync"
"time"
"github.com/bep/debounce"
"github.com/pion/webrtc/v4"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/telemetry"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/observability/roomobs"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const (
subscriptionDebounceInterval = 100 * time.Millisecond
)
var _ types.SubscribedTrack = (*SubscribedTrack)(nil)
type SubscribedTrackParams struct {
ReceiverConfig ReceiverConfig
SubscriberConfig DirectionConfig
Subscriber types.LocalParticipant
MediaTrack types.MediaTrack
AdaptiveStream bool
EnableStartAtDesiredQuality bool
TelemetryListener types.ParticipantTelemetryListener
WrappedReceiver *WrappedReceiver
IsRelayed bool
OnDownTrackCreated func(downTrack *sfu.DownTrack)
OnDownTrackClosed func(subscriberID livekit.ParticipantID)
OnSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, layer int32)
OnSubscriberAudioCodecChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool)
}
type SubscribedTrack struct {
params SubscribedTrackParams
logger logger.Logger
downTrack *sfu.DownTrack
sender atomic.Pointer[webrtc.RTPSender]
needsNegotiation atomic.Bool
versionGenerator utils.TimedVersionGenerator
settingsLock sync.Mutex
settings *livekit.UpdateTrackSettings
settingsVersion utils.TimedVersion
bindLock sync.Mutex
bound bool
onBindCallbacks []func(error)
onClose atomic.Value // func(bool)
debouncer func(func())
statsKey telemetry.StatsKey
reporter roomobs.TrackReporter
}
func NewSubscribedTrack(params SubscribedTrackParams) (*SubscribedTrack, error) {
s := &SubscribedTrack{
params: params,
logger: params.Subscriber.GetLogger().WithComponent(sutils.ComponentSub).WithValues(
"trackID", params.MediaTrack.ID(),
"publisherID", params.MediaTrack.PublisherID(),
"publisher", params.MediaTrack.PublisherIdentity(),
),
versionGenerator: utils.NewDefaultTimedVersionGenerator(),
debouncer: debounce.New(subscriptionDebounceInterval),
statsKey: telemetry.StatsKeyForTrack(
params.Subscriber.GetCountry(),
livekit.StreamType_DOWNSTREAM,
params.Subscriber.ID(),
params.MediaTrack.ID(),
params.MediaTrack.Source(),
params.MediaTrack.Kind(),
),
reporter: params.Subscriber.GetReporter().WithTrack(params.MediaTrack.ID().String()),
}
var rtcpFeedback []webrtc.RTCPFeedback
var maxTrack int
switch params.MediaTrack.Kind() {
case livekit.TrackType_AUDIO:
rtcpFeedback = params.SubscriberConfig.RTCPFeedback.Audio
maxTrack = params.ReceiverConfig.PacketBufferSizeAudio
case livekit.TrackType_VIDEO:
rtcpFeedback = params.SubscriberConfig.RTCPFeedback.Video
maxTrack = params.ReceiverConfig.PacketBufferSizeVideo
default:
s.logger.Warnw("unexpected track type", nil, "kind", params.MediaTrack.Kind())
}
codecs := params.WrappedReceiver.Codecs()
for _, c := range codecs {
c.RTCPFeedback = rtcpFeedback
}
streamID := params.WrappedReceiver.StreamID()
if params.Subscriber.SupportsSyncStreamID() && params.MediaTrack.Stream() != "" {
streamID = PackSyncStreamID(params.MediaTrack.PublisherID(), params.MediaTrack.Stream())
}
isEncrypted := params.MediaTrack.IsEncrypted()
var trailer []byte
if isEncrypted {
trailer = params.Subscriber.GetTrailer()
}
subClientInfo := ClientInfo{ClientInfo: params.Subscriber.GetClientInfo()}
subSupportsPacketTrailer := subClientInfo.SupportsPacketTrailer()
// Strip packet trailer if track has packet trailer but subscriber does not have cap
stripPacketTrailer := params.MediaTrack.HasPacketTrailer() && !subSupportsPacketTrailer
downTrack, err := sfu.NewDownTrack(sfu.DownTrackParams{
Codecs: codecs,
IsEncrypted: isEncrypted,
Source: params.MediaTrack.Source(),
Receiver: params.WrappedReceiver,
BufferFactory: params.Subscriber.GetBufferFactory(),
SubID: params.Subscriber.ID(),
StreamID: streamID,
MaxTrack: maxTrack,
PlayoutDelayLimit: params.Subscriber.GetPlayoutDelayConfig(),
Pacer: params.Subscriber.GetPacer(),
Trailer: trailer,
StripPacketTrailer: stripPacketTrailer,
Logger: LoggerWithTrack(
params.Subscriber.GetLogger().WithComponent(sutils.ComponentSub),
params.MediaTrack.ID(),
params.IsRelayed,
),
RTCPWriter: params.Subscriber.WriteSubscriberRTCP,
DisableSenderReportPassThrough: params.Subscriber.GetDisableSenderReportPassThrough(),
SupportsCodecChange: params.Subscriber.SupportsCodecChange(),
EnableStartAtDesiredQuality: params.EnableStartAtDesiredQuality,
Listener: s,
})
if err != nil {
return nil, err
}
if params.OnDownTrackCreated != nil {
params.OnDownTrackCreated(downTrack)
}
downTrack.AddReceiverReportListener(params.Subscriber.HandleReceiverReport)
s.downTrack = downTrack
return s, nil
}
func (t *SubscribedTrack) AddOnBind(f func(error)) {
t.bindLock.Lock()
bound := t.bound
if !bound {
t.onBindCallbacks = append(t.onBindCallbacks, f)
}
t.bindLock.Unlock()
if bound {
// fire immediately, do not need to persist since bind is a one time event
go f(nil)
}
}
// for DownTrack callback to notify us that it's bound
func (t *SubscribedTrack) Bound(err error) {
t.bindLock.Lock()
if err == nil {
t.bound = true
}
callbacks := t.onBindCallbacks
t.onBindCallbacks = nil
t.bindLock.Unlock()
if err == nil && t.MediaTrack().Kind() == livekit.TrackType_VIDEO {
// When AdaptiveStream is enabled, default the subscriber to LOW quality stream
// we would want LOW instead of OFF for a couple of reasons
// 1. when a subscriber unsubscribes from a track, we would forget their previously defined settings
// depending on client implementation, subscription on/off is kept separately from adaptive stream
// So when there are no changes to desired resolution, but the user re-subscribes, we may leave stream at OFF
// 2. when interacting with dynacast *and* adaptive stream. If the publisher was not publishing at the
// time of subscription, we might not be able to trigger adaptive stream updates on the client side
// (since there isn't any video frames coming through). this will leave the stream "stuck" on off, without
// a trigger to re-enable it
t.settingsLock.Lock()
if t.settings != nil {
if t.params.AdaptiveStream {
// remove `disabled` flag to force a visibility update
t.settings.Disabled = false
t.logger.Debugw("enabling subscriber track settings on bind", "settings", logger.Proto(t.settings))
}
} else {
if t.params.EnableStartAtDesiredQuality {
// default to HIGH quality so the subscriber acquires the top layer directly instead of
// ramping up from a lower layer. adaptive stream clients can still scale down afterwards
// based on viewport.
t.settings = &livekit.UpdateTrackSettings{Quality: livekit.VideoQuality_HIGH}
} else if t.params.AdaptiveStream {
t.settings = &livekit.UpdateTrackSettings{Quality: livekit.VideoQuality_LOW}
} else {
t.settings = &livekit.UpdateTrackSettings{Quality: livekit.VideoQuality_HIGH}
}
t.logger.Debugw("initializing subscriber track settings on bind", "settings", logger.Proto(t.settings))
}
t.settingsLock.Unlock()
t.applySettings()
}
for _, cb := range callbacks {
go cb(err)
}
}
// for DownTrack callback to notify us that it's closed
func (t *SubscribedTrack) Close(isExpectedToResume bool) {
if onClose := t.onClose.Load(); onClose != nil {
onClose.(func(bool))(isExpectedToResume)
}
}
func (t *SubscribedTrack) OnClose(f func(bool)) {
t.onClose.Store(f)
}
func (t *SubscribedTrack) IsBound() bool {
t.bindLock.Lock()
defer t.bindLock.Unlock()
return t.bound
}
func (t *SubscribedTrack) ID() livekit.TrackID {
return livekit.TrackID(t.downTrack.ID())
}
func (t *SubscribedTrack) PublisherID() livekit.ParticipantID {
return t.params.MediaTrack.PublisherID()
}
func (t *SubscribedTrack) PublisherIdentity() livekit.ParticipantIdentity {
return t.params.MediaTrack.PublisherIdentity()
}
func (t *SubscribedTrack) PublisherVersion() uint32 {
return t.params.MediaTrack.PublisherVersion()
}
func (t *SubscribedTrack) SubscriberID() livekit.ParticipantID {
return t.params.Subscriber.ID()
}
func (t *SubscribedTrack) SubscriberIdentity() livekit.ParticipantIdentity {
return t.params.Subscriber.Identity()
}
func (t *SubscribedTrack) Subscriber() types.LocalParticipant {
return t.params.Subscriber
}
func (t *SubscribedTrack) DownTrack() *sfu.DownTrack {
return t.downTrack
}
func (t *SubscribedTrack) MediaTrack() types.MediaTrack {
return t.params.MediaTrack
}
// has subscriber indicated it wants to mute this track
func (t *SubscribedTrack) IsMuted() bool {
t.settingsLock.Lock()
defer t.settingsLock.Unlock()
return t.isMutedLocked()
}
func (t *SubscribedTrack) isMutedLocked() bool {
if t.settings == nil {
return false
}
return t.settings.Disabled
}
func (t *SubscribedTrack) SetPublisherMuted(muted bool) {
t.downTrack.PubMute(muted)
}
func (t *SubscribedTrack) UpdateSubscriberSettings(settings *livekit.UpdateTrackSettings, isImmediate bool) {
t.settingsLock.Lock()
if proto.Equal(t.settings, settings) {
t.logger.Debugw("skipping subscriber track settings", "settings", logger.Proto(t.settings))
t.settingsLock.Unlock()
return
}
isImmediate = isImmediate || (!settings.Disabled && settings.Disabled != t.isMutedLocked())
t.settings = utils.CloneProto(settings)
t.logger.Debugw("saving subscriber track settings", "settings", logger.Proto(t.settings))
t.settingsLock.Unlock()
if isImmediate {
t.applySettings()
} else {
// avoid frequent changes to mute & video layers, unless it became visible
t.debouncer(t.applySettings)
}
}
func (t *SubscribedTrack) UpdateVideoLayer() {
t.applySettings()
}
func (t *SubscribedTrack) applySettings() {
t.settingsLock.Lock()
if t.settings == nil {
t.settingsLock.Unlock()
return
}
t.settingsVersion = t.versionGenerator.Next()
settingsVersion := t.settingsVersion
t.settingsLock.Unlock()
dt := t.DownTrack()
spatial := buffer.InvalidLayerSpatial
temporal := buffer.InvalidLayerTemporal
if dt.Kind() == webrtc.RTPCodecTypeVideo {
mt := t.MediaTrack()
quality := t.settings.Quality
mimeType := dt.Mime()
if t.settings.Width > 0 {
quality = mt.GetQualityForDimension(mimeType, t.settings.Width, t.settings.Height)
}
spatial = buffer.GetSpatialLayerForVideoQuality(mimeType, quality, mt.ToProto())
if t.settings.Fps > 0 {
temporal = mt.GetTemporalLayerForSpatialFps(mimeType, spatial, t.settings.Fps)
}
}
t.settingsLock.Lock()
if settingsVersion != t.settingsVersion {
// a newer settings has superseded this one
t.settingsLock.Unlock()
return
}
t.logger.Debugw("applying subscriber track settings", "settings", logger.Proto(t.settings))
if t.settings.Disabled {
dt.Mute(true)
t.settingsLock.Unlock()
return
} else {
dt.Mute(false)
}
if dt.Kind() == webrtc.RTPCodecTypeVideo {
dt.SetMaxSpatialLayer(spatial)
if temporal != buffer.InvalidLayerTemporal {
dt.SetMaxTemporalLayer(temporal)
}
}
t.settingsLock.Unlock()
}
func (t *SubscribedTrack) NeedsNegotiation() bool {
return t.needsNegotiation.Load()
}
func (t *SubscribedTrack) SetNeedsNegotiation(needs bool) {
t.needsNegotiation.Store(needs)
}
func (t *SubscribedTrack) RTPSender() *webrtc.RTPSender {
return t.sender.Load()
}
func (t *SubscribedTrack) SetRTPSender(sender *webrtc.RTPSender) {
t.sender.Store(sender)
}
// DownTrackListener implementation
var _ sfu.DownTrackListener = (*SubscribedTrack)(nil)
func (t *SubscribedTrack) OnBindAndConnected() {
if t.params.Subscriber.Hidden() {
return
}
t.params.MediaTrack.OnTrackSubscribed()
}
func (t *SubscribedTrack) OnStatsUpdate(stat *livekit.AnalyticsStat) {
t.params.TelemetryListener.OnTrackStats(t.statsKey, stat)
if cs, ok := telemetry.CondenseStat(stat); ok {
ti := t.params.WrappedReceiver.TrackInfo()
t.reporter.Tx(func(tx roomobs.TrackTx) {
tx.ParticipantSession().ReportKindCode(roomobs.ParticipantKindCode(t.params.Subscriber.Kind()))
tx.ParticipantSession().ReportKindDetailsCodes(roomobs.ParticipantKindDetailsCodes(t.params.Subscriber.KindDetails()))
tx.ReportName(ti.Name)
tx.ReportKind(roomobs.TrackKindSub)
tx.ReportType(roomobs.TrackTypeFromProto(ti.Type))
tx.ReportSource(roomobs.TrackSourceFromProto(ti.Source))
tx.ReportMime(mime.NormalizeMimeType(ti.MimeType).ReporterType())
tx.ReportLayer(roomobs.PackTrackLayer(ti.Height, ti.Width))
tx.ReportDuration(uint16(cs.EndTime.Sub(cs.StartTime).Milliseconds()))
tx.ReportFrames(uint16(cs.Frames))
tx.ReportSendBytes(uint32(cs.Bytes))
tx.ReportSendPackets(cs.Packets)
tx.ReportPacketsLost(cs.PacketsLost)
tx.ReportScore(stat.Score)
})
}
}
func (t *SubscribedTrack) OnMaxSubscribedLayerChanged(layer int32) {
if t.params.OnSubscriberMaxQualityChange != nil {
t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), t.downTrack.Mime(), layer)
}
}
func (t *SubscribedTrack) OnRttUpdate(rtt uint32) {
go t.params.Subscriber.UpdateMediaRTT(rtt)
}
func (t *SubscribedTrack) OnCodecNegotiated(codec webrtc.RTPCodecCapability) {
if isAvailable, needsPublish := t.params.WrappedReceiver.DetermineReceiver(codec); !isAvailable || !needsPublish {
return
}
if t.params.OnSubscriberMaxQualityChange != nil || t.params.OnSubscriberAudioCodecChange != nil {
go func() {
mimeType := mime.NormalizeMimeType(codec.MimeType)
switch t.params.MediaTrack.Kind() {
case livekit.TrackType_VIDEO:
spatial := buffer.GetSpatialLayerForVideoQuality(
mimeType,
livekit.VideoQuality_HIGH,
t.params.MediaTrack.ToProto(),
)
if t.params.OnSubscriberMaxQualityChange != nil {
t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial)
}
case livekit.TrackType_AUDIO:
if t.params.OnSubscriberAudioCodecChange != nil {
t.params.OnSubscriberAudioCodecChange(t.downTrack.SubscriberID(), mimeType, true)
}
}
}()
}
}
func (t *SubscribedTrack) OnDownTrackClose(isExpectedToResume bool) {
// Cache transceiver for potential re-use on resume.
// To ensure subscription manager does not re-subscribe before caching,
// delete the subscribed track only after caching.
if isExpectedToResume {
if tr := t.downTrack.GetTransceiver(); tr != nil {
t.params.Subscriber.CacheDownTrack(t.ID(), tr, t.downTrack.GetState())
}
}
if t.params.OnDownTrackClosed != nil {
t.params.OnDownTrackClosed(t.params.Subscriber.ID())
}
t.Close(isExpectedToResume)
}
func (t *SubscribedTrack) OnStreamStarted() {
t.params.TelemetryListener.OnTrackSubscribeStreamStarted(t.params.Subscriber.ID(), t.params.MediaTrack.ToProto())
}