Files
livekit/pkg/sfu/playoutdelay.go
Raja Subramanian 7fef374b19 Split down stream snapshot into sender view and receiver view. (#3422)
Receiver view is used for connection quality.

Sender view is used for analytics. One thing that this introduces is
that sender view uses the packet loss information from receiver view as
true loss is available only in the RTCP Receiver Reports received from
the remote side. So, the time alignment is off, i. e. receiver report
happens periodically and it includes information till the time at which
it was sent from remote side, but sender could have sent more packets
after that time.

The split should ensure that analytics does not rely on remote side
sending proper receiver repoerts albeit at slight misalignment of loss
statistic for remotes that send RTCP RR (which should be majority of the
cases)
2025-02-11 16:05:00 +05:30

203 lines
5.3 KiB
Go

// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sfu
import (
"sync"
"time"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
)
const (
jitterMultiToDelay = 10
targetDelayLogThreshold = 500
// limit max delay change to make it smoother for a/v sync
maxDelayChangePerSec = 80
)
// ----------------------------------------------------
type PlayoutDelayState int32
const (
PlayoutDelayStateChanged PlayoutDelayState = iota
PlayoutDelaySending
PlayoutDelayAcked
)
func (s PlayoutDelayState) String() string {
switch s {
case PlayoutDelayStateChanged:
return "StateChanged"
case PlayoutDelaySending:
return "Sending"
case PlayoutDelayAcked:
return "Acked"
}
return "Unknown"
}
// ----------------------------------------------------
type PlayoutDelayControllerState struct {
SenderSnapshotID uint32
}
func (p PlayoutDelayControllerState) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddUint32("SenderSnapshotID", p.SenderSnapshotID)
return nil
}
// ----------------------------------------------------
type PlayoutDelayController struct {
lock sync.Mutex
state atomic.Int32
minDelay, maxDelay uint32
currentDelay uint32
extBytes atomic.Value //[]byte
sendingAtSeq uint16
sendingAtTime time.Time
logger logger.Logger
rtpStats *rtpstats.RTPStatsSender
senderSnapshotID uint32
highDelayCount atomic.Uint32
}
func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *rtpstats.RTPStatsSender) (*PlayoutDelayController, error) {
if maxDelay == 0 && minDelay > 0 {
maxDelay = pd.MaxPlayoutDelayDefault
}
if maxDelay > pd.PlayoutDelayMaxValue {
maxDelay = pd.PlayoutDelayMaxValue
}
c := &PlayoutDelayController{
currentDelay: minDelay,
minDelay: minDelay,
maxDelay: maxDelay,
logger: logger,
rtpStats: rtpStats,
senderSnapshotID: rtpStats.NewSenderSnapshotId(),
}
return c, c.createExtData()
}
func (c *PlayoutDelayController) GetState() PlayoutDelayControllerState {
c.lock.Lock()
defer c.lock.Unlock()
return PlayoutDelayControllerState{
SenderSnapshotID: c.senderSnapshotID,
}
}
func (c *PlayoutDelayController) SeedState(pdcs PlayoutDelayControllerState) {
c.lock.Lock()
defer c.lock.Unlock()
c.senderSnapshotID = pdcs.SenderSnapshotID
}
func (c *PlayoutDelayController) SetJitter(jitter uint32) {
c.lock.Lock()
deltaInfoSender, _ := c.rtpStats.DeltaInfoSender(c.senderSnapshotID)
var nackPercent uint32
if deltaInfoSender != nil && deltaInfoSender.Packets > 0 {
nackPercent = deltaInfoSender.Nacks * 100 / deltaInfoSender.Packets
}
targetDelay := jitter * jitterMultiToDelay
if nackPercent > 60 {
targetDelay += (nackPercent - 60) * 2
}
elapsed := time.Since(c.sendingAtTime)
delayChangeLimit := uint32(maxDelayChangePerSec * elapsed.Seconds())
if delayChangeLimit > maxDelayChangePerSec {
delayChangeLimit = maxDelayChangePerSec
}
if targetDelay > c.currentDelay+delayChangeLimit {
targetDelay = c.currentDelay + delayChangeLimit
} else if c.currentDelay > targetDelay+delayChangeLimit {
targetDelay = c.currentDelay - delayChangeLimit
}
if targetDelay < c.minDelay {
targetDelay = c.minDelay
}
if targetDelay > c.maxDelay {
targetDelay = c.maxDelay
}
if c.currentDelay == targetDelay {
c.lock.Unlock()
return
}
if targetDelay > targetDelayLogThreshold {
if c.highDelayCount.Add(1)%100 == 1 {
c.logger.Infow("high playout delay", "target", targetDelay, "jitter", jitter, "nackPercent", nackPercent, "current", c.currentDelay)
}
}
c.currentDelay = targetDelay
c.lock.Unlock()
c.createExtData()
}
func (c *PlayoutDelayController) OnSeqAcked(seq uint16) {
c.lock.Lock()
defer c.lock.Unlock()
if PlayoutDelayState(c.state.Load()) == PlayoutDelaySending && (seq-c.sendingAtSeq) < 0x8000 {
c.state.Store(int32(PlayoutDelayAcked))
}
}
func (c *PlayoutDelayController) GetDelayExtension(seq uint16) []byte {
switch PlayoutDelayState(c.state.Load()) {
case PlayoutDelayStateChanged:
c.lock.Lock()
c.state.Store(int32(PlayoutDelaySending))
c.sendingAtSeq = seq
c.sendingAtTime = time.Now()
c.lock.Unlock()
return c.extBytes.Load().([]byte)
case PlayoutDelaySending:
return c.extBytes.Load().([]byte)
case PlayoutDelayAcked:
return nil
}
return nil
}
func (c *PlayoutDelayController) createExtData() error {
delay := pd.PlayoutDelayFromValue(
uint16(c.currentDelay),
uint16(c.maxDelay),
)
b, err := delay.Marshal()
if err == nil {
c.extBytes.Store(b)
c.state.Store(int32(PlayoutDelayStateChanged))
} else {
c.logger.Errorw("failed to marshal playout delay", err, "playoutDelay", delay)
}
return err
}