An attempt to use publisher side RTCP sender report while forwarding (#1286)

* WIP commit

* comment

* clean up

* remove unused stuff

* cleaner comment

* remove unused stuff

* remove unused stuff

* more comments

* TrackSender method to handle RTCP sender report data

* fix test

* push rtcp sender report data to down tracks

* Need payload type for codec id mapping in relay protocol

* rename variable a  bit
This commit is contained in:
Raja Subramanian
2023-01-06 14:07:18 +05:30
committed by GitHub
parent e158512cdf
commit 2b89c821ab
7 changed files with 252 additions and 67 deletions
+15
View File
@@ -12,6 +12,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
// wrapper around WebRTC receiver, overriding its ID
@@ -287,3 +288,17 @@ func (d *DummyReceiver) GetPrimaryReceiverForRed() sfu.TrackReceiver {
func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver {
return d
}
func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetRTCPSenderReportData(layer)
}
return nil
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
+29 -6
View File
@@ -83,9 +83,10 @@ type Buffer struct {
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
// callbacks
onClose func()
onRtcpFeedback func([]rtcp.Packet)
onFpsChanged func()
onClose func()
onRtcpFeedback func([]rtcp.Packet)
onRtcpSenderReport func(*RTCPSenderReportData)
onFpsChanged func()
// logger
logger logger.Logger
@@ -615,14 +616,32 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
srData := &RTCPSenderReportData{
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
ArrivalTime: time.Now(),
}
b.RLock()
if b.rtpStats != nil {
b.rtpStats.SetRtcpSenderReportData(srData)
}
b.RUnlock()
if b.onRtcpSenderReport != nil {
b.onRtcpSenderReport(srData)
}
}
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return
if b.rtpStats != nil {
return b.rtpStats.GetRtcpSenderReportData()
}
b.rtpStats.SetRtcpSenderReportData(rtpTime, mediatransportutil.NtpTime(ntpTime), time.Now())
return nil
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
@@ -664,6 +683,10 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
b.onRtcpFeedback = fn
}
func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) {
b.onRtcpSenderReport = fn
}
// GetMediaSSRC returns the associated SSRC of the RTP stream
func (b *Buffer) GetMediaSSRC() uint32 {
return b.mediaSSRC
+77 -17
View File
@@ -81,6 +81,12 @@ type SnInfo struct {
marker bool
}
type RTCPSenderReportData struct {
RTPTimestamp uint32
NTPTimestamp mediatransportutil.NtpTime
ArrivalTime time.Time
}
type RTPStatsParams struct {
ClockRate uint32
IsReceiverReportDriven bool
@@ -158,9 +164,7 @@ type RTPStats struct {
rtt uint32
maxRtt uint32
rtpSR uint32
ntpSR mediatransportutil.NtpTime
arrivalSR int64
srData *RTCPSenderReportData
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
@@ -248,9 +252,15 @@ func (r *RTPStats) Seed(from *RTPStats) {
r.rtt = from.rtt
r.maxRtt = from.maxRtt
r.rtpSR = from.rtpSR
r.ntpSR = from.ntpSR
r.arrivalSR = from.arrivalSR
if from.srData != nil {
r.srData = &RTCPSenderReportData{
RTPTimestamp: from.srData.RTPTimestamp,
NTPTimestamp: from.srData.NTPTimestamp,
ArrivalTime: from.srData.ArrivalTime,
}
} else {
r.srData = nil
}
r.nextSnapshotId = from.nextSnapshotId
for id, ss := range from.snapshots {
@@ -649,16 +659,38 @@ func (r *RTPStats) GetRtt() uint32 {
return r.rtt
}
func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS mediatransportutil.NtpTime, arrival time.Time) {
func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.lock.Lock()
defer r.lock.Unlock()
r.rtpSR = rtpTS
r.ntpSR = ntpTS
r.arrivalSR = arrival.UnixNano()
if srData == nil {
r.srData = nil
return
}
r.srData = &RTCPSenderReportData{
RTPTimestamp: srData.RTPTimestamp,
NTPTimestamp: srData.NTPTimestamp,
ArrivalTime: srData.ArrivalTime,
}
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData {
r.lock.Lock()
defer r.lock.Unlock()
if r.srData == nil {
return nil
}
return &RTCPSenderReportData{
RTPTimestamp: r.srData.RTPTimestamp,
NTPTimestamp: r.srData.NTPTimestamp,
ArrivalTime: r.srData.ArrivalTime,
}
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData) *rtcp.SenderReport {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -666,9 +698,33 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
return nil
}
now := time.Now()
nowNTP := mediatransportutil.ToNtpTime(now)
nowRTP := r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9)
var nowNTP mediatransportutil.NtpTime
var nowRTP uint32
if srData == nil || srData.NTPTimestamp == 0 || srData.ArrivalTime.IsZero() {
r.params.Logger.Infow("reference layer sender report not available")
} else {
// NTP timestamp in sender report could have a different base, i. e. it may not be wall clock time at the time of send.
// So, do not compare local NTP to what is received from remote side. Record receive time locally and do a difference
// using local time now (i. e. same time base) and add the difference to remote NTP to get the current time in remote
// NTP time base.
timeSinceLastSR := time.Since(srData.ArrivalTime)
nowNTP = mediatransportutil.ToNtpTime(srData.NTPTimestamp.Time().Add(timeSinceLastSR))
nowRTP = srData.RTPTimestamp + uint32(timeSinceLastSR.Milliseconds()*int64(r.params.ClockRate)/1000)
if nowRTP-r.highestTS > (1 << 31) {
r.params.Logger.Infow(
"reference layer sender report could not be used",
"nowRTP", nowRTP,
"highestTS", r.highestTS,
"timeSinceLastSR", timeSinceLastSR,
)
nowNTP = 0 // reset to force calculation using highest send time
}
}
if nowNTP == 0 {
now := time.Now()
nowNTP = mediatransportutil.ToNtpTime(now)
nowRTP = r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9)
}
return &rtcp.SenderReport{
SSRC: ssrc,
@@ -721,8 +777,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
var dlsr uint32
if r.arrivalSR != 0 {
delayMS := uint32((time.Now().UnixNano() - r.arrivalSR) / 1e6)
if r.srData != nil && !r.srData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
@@ -733,13 +789,17 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
jitter = r.jitterOverridden
}
lastSR := uint32(0)
if r.srData != nil {
lastSR = uint32(r.srData.NTPTimestamp >> 16)
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: r.packetsLost,
LastSequenceNumber: now.extStartSN,
Jitter: uint32(jitter),
LastSenderReport: uint32(r.ntpSR >> 16),
LastSenderReport: lastSR,
Delay: dlsr,
}
}
+7 -2
View File
@@ -36,6 +36,7 @@ type TrackSender interface {
ID() string
SubscriberID() livekit.ParticipantID
TrackInfoAvailable()
HandleRTCPSenderReportData(payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error
}
const (
@@ -248,7 +249,7 @@ func NewDownTrack(
kind: kind,
codec: codecs[0].RTPCodecCapability,
}
d.forwarder = NewForwarder(d.kind, d.logger)
d.forwarder = NewForwarder(d.kind, d.logger, d.receiver.GetReferenceLayerRTPTimestamp)
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change
@@ -974,7 +975,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
return d.rtpStats.GetRtcpSenderReport(d.ssrc)
return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial()))
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
@@ -1523,3 +1524,7 @@ func (d *DownTrack) sendPaddingOnMute() {
time.Sleep(paddingOnMuteInterval)
}
}
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error {
return nil
}
+65 -41
View File
@@ -169,30 +169,33 @@ var (
// -------------------------------------------------------------------
type ForwarderState struct {
Started bool
LastTSCalc int64
RTP RTPMungerState
VP8 VP8MungerState
Started bool
ReferenceLayerSpatial int32
LastTSCalc int64
RTP RTPMungerState
VP8 VP8MungerState
}
func (f ForwarderState) String() string {
return fmt.Sprintf("ForwarderState{started: %v, lTSCalc: %d, rtp: %s, vp8: %s}",
f.Started, f.LastTSCalc, f.RTP.String(), f.VP8.String())
return fmt.Sprintf("ForwarderState{started: %v, ref: %d, lTSCalc: %d, rtp: %s, vp8: %s}",
f.Started, f.ReferenceLayerSpatial, f.LastTSCalc, f.RTP.String(), f.VP8.String())
}
// -------------------------------------------------------------------
type Forwarder struct {
lock sync.RWMutex
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
lock sync.RWMutex
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
muted bool
started bool
lastSSRC uint32
lTSCalc int64
started bool
lastSSRC uint32
lTSCalc int64
referenceLayerSpatial int32
maxLayers VideoLayers
currentLayers VideoLayers
@@ -213,10 +216,17 @@ type Forwarder struct {
ddLayerSelector *DDVideoLayerSelector
}
func NewForwarder(kind webrtc.RTPCodecType, logger logger.Logger) *Forwarder {
func NewForwarder(
kind webrtc.RTPCodecType,
logger logger.Logger,
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error),
) *Forwarder {
f := &Forwarder{
kind: kind,
logger: logger,
kind: kind,
logger: logger,
getReferenceLayerRTPTimestamp: getReferenceLayerRTPTimestamp,
referenceLayerSpatial: InvalidLayerSpatial,
// start off with nothing, let streamallocator set things
currentLayers: InvalidLayers,
@@ -265,9 +275,10 @@ func (f *Forwarder) GetState() ForwarderState {
}
state := ForwarderState{
Started: f.started,
LastTSCalc: f.lTSCalc,
RTP: f.rtpMunger.GetLast(),
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
LastTSCalc: f.lTSCalc,
RTP: f.rtpMunger.GetLast(),
}
if f.vp8Munger != nil {
@@ -292,6 +303,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
}
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
}
func (f *Forwarder) Mute(muted bool) (bool, VideoLayers) {
@@ -369,6 +381,13 @@ func (f *Forwarder) TargetLayers() VideoLayers {
return f.targetLayers
}
func (f *Forwarder) GetReferenceLayerSpatial() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.referenceLayerSpatial
}
func (f *Forwarder) GetForwardingStatus() ForwardingStatus {
f.lock.RLock()
defer f.lock.RUnlock()
@@ -1362,7 +1381,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
switch f.kind {
case webrtc.RTPCodecTypeAudio:
return f.getTranslationParamsAudio(extPkt)
return f.getTranslationParamsAudio(extPkt, layer)
case webrtc.RTPCodecTypeVideo:
return f.getTranslationParamsVideo(extPkt, layer)
}
@@ -1371,37 +1390,42 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, tp *TranslationParams) (*TranslationParams, error) {
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (*TranslationParams, error) {
if f.lastSSRC != extPkt.Packet.SSRC {
if !f.started {
f.started = true
f.referenceLayerSpatial = layer
f.rtpMunger.SetLastSnTs(extPkt)
if f.vp8Munger != nil {
f.vp8Munger.SetLast(extPkt)
}
} else {
// LK-TODO-START
// The below offset calculation is not technically correct.
// Timestamps based on the system time of an intermediate box like
// SFU is not going to be accurate. Packets arrival/processing
// are subject to vagaries of network delays, SFU processing etc.
// But, the correct way is a lot harder. Will have to
// look at RTCP SR to get timestamps and align (and figure out alignment
// of layers and use that during layer switch in simulcast case).
// That can get tricky. Given the complexity of that approach, maybe
// this is just fine till it is not :-).
// LK-TODO-END
// Compute how much time passed between the old RTP extPkt
// and the current packet, and fix timestamp on source change
tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6
if tDiffMs < 0 {
tDiffMs = 0
var td uint32
if f.getReferenceLayerRTPTimestamp != nil {
refTS, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
if err == nil {
last := f.rtpMunger.GetLast()
td = refTS - last.LastTS
if td > (1 << 31) {
f.logger.Infow("reference timestamp out-of-order", "lastTS", last.LastTS, "refTS", refTS, "td", td)
td = 0 // reset to force arrival time based calculation
}
}
}
td := uint32(tDiffMs * int64(f.codec.ClockRate) / 1000)
if td == 0 {
td = 1
tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6
if tDiffMs < 0 {
tDiffMs = 0
}
td = uint32(tDiffMs * int64(f.codec.ClockRate) / 1000)
if td == 0 {
td = 1
}
}
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td)
if f.vp8Munger != nil {
f.vp8Munger.UpdateOffsets(extPkt)
@@ -1435,8 +1459,8 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, tp *Tra
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) {
return f.getTranslationParamsCommon(extPkt, nil)
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
return f.getTranslationParamsCommon(extPkt, layer, nil)
}
// should be called with lock held
@@ -1508,7 +1532,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
return tp, nil
}
_, err := f.getTranslationParamsCommon(extPkt, tp)
_, err := f.getTranslationParamsCommon(extPkt, layer, tp)
if tp.shouldDrop || f.vp8Munger == nil || len(extPkt.Packet.Payload) == 0 {
return tp, err
}
+1 -1
View File
@@ -18,7 +18,7 @@ func disable(f *Forwarder) {
}
func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder {
f := NewForwarder(kind, logger.GetLogger())
f := NewForwarder(kind, logger.GetLogger(), nil)
f.DetermineCodec(codec)
return f
}
+58
View File
@@ -2,6 +2,7 @@ package sfu
import (
"errors"
"fmt"
"io"
"strings"
"sync"
@@ -62,6 +63,9 @@ type TrackReceiver interface {
GetRedReceiver() TrackReceiver
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
}
// WebRTCReceiver receives a media track
@@ -310,6 +314,11 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
SmoothIntervals: w.audioConfig.SmoothIntervals,
})
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
})
})
var duration time.Duration
switch layer {
@@ -670,3 +679,52 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
}
return w.getBuffer(layer).GetTemporalLayerFpsForSpatial(layer)
}
func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if layer == InvalidLayerSpatial || int(layer) >= len(w.buffers) {
return nil
}
return w.buffers[layer].GetSenderReportData()
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if layer == referenceLayer {
return ts, nil
}
if layer == InvalidLayerSpatial || int(layer) >= len(w.buffers) {
return 0, fmt.Errorf("invalid layer: %d", layer)
}
srLayer := w.buffers[layer].GetSenderReportData()
if srLayer == nil || srLayer.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
}
if referenceLayer == InvalidLayerSpatial || int(referenceLayer) >= len(w.buffers) {
return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer)
}
srRef := w.buffers[referenceLayer].GetSenderReportData()
if srRef == nil || srRef.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)
}
// line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer
// NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher
// constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for
// RTP time stamp (uint32 arithmetic).
ntpDiff := float64(int64(srRef.NTPTimestamp-srLayer.NTPTimestamp)) / float64(1<<32)
normalizedTS := srLayer.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate))
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.
// Add the offset to layer's ts to map it to corresponding RTP timestamp in
// the reference layer.
return ts + (srRef.RTPTimestamp - normalizedTS), nil
}