Better lock for sender report TS offset. (#2771)

* Better lock for sender report TS offset.

It is possible that a resume has happened and new time stamp offset
calculated. But, a sender report from publisher comes with a time stamp
prior to the time stamp which was used for offset calculation. Using
that sender report in the forwarding path causes jumps.

Example
- Track forwarding, let us tsOffset = `a`
- Unmute/layer switch - one of those events happens, a new tsOffset will
  be calculated, let us say that offset is `b` and it is based on
  incoming time stmap of `c`.
- A sender report from publisher could arrive with timestamp = `d`.
  o If `d` >= `c`,  the offset `b` is correct and can be applied.
  o But, it is possible that `d` < `c`, in that case, offset `a` should
    be used and not `b`.

To address this, keep track of incoming extended timestamp at switch
point and accept incoming sender reports which have a timestamp >=
switch point timestamp.

* clean up

* log more details on invalid layer
This commit is contained in:
Raja Subramanian
2024-06-07 23:56:10 +05:30
committed by GitHub
parent 73852d0a13
commit cee3fdb25e
2 changed files with 24 additions and 14 deletions
+16 -14
View File
@@ -233,14 +233,15 @@ type Forwarder struct {
pubMuted bool
resumeBehindThreshold float64
started bool
preStartTime time.Time
extFirstTS uint64
lastSSRC uint32
referenceLayerSpatial int32
dummyStartTSOffset uint64
refInfos [buffer.DefaultMaxLayerSpatial + 1]refInfo
refIsSVC bool
started bool
preStartTime time.Time
extFirstTS uint64
lastSSRC uint32
lastSwitchExtIncomingTS uint64
referenceLayerSpatial int32
dummyStartTSOffset uint64
refInfos [buffer.DefaultMaxLayerSpatial + 1]refInfo
refIsSVC bool
provisional *VideoAllocationProvisional
@@ -568,7 +569,7 @@ func (f *Forwarder) GetMaxSubscribedSpatial() int32 {
return layer
}
func (f *Forwarder) getReferenceLayer() (int32, int32) {
func (f *Forwarder) getRefLayer() (int32, int32) {
if f.lastSSRC == 0 {
return buffer.InvalidLayerSpatial, buffer.InvalidLayerSpatial
}
@@ -594,10 +595,10 @@ func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *buffer.R
defer f.lock.Unlock()
f.refIsSVC = isSVC
refLayer, _ := f.getReferenceLayer()
refLayer, _ := f.getRefLayer()
if layer >= 0 && int(layer) < len(f.refInfos) {
f.refInfos[layer] = refInfo{srData, 0, false}
if layer == refLayer {
if layer == refLayer && srData.RTPTimestampExt >= f.lastSwitchExtIncomingTS {
f.refInfos[layer].tsOffset = f.rtpMunger.GetTSOffset()
f.refInfos[layer].isTSOffsetValid = true
}
@@ -643,7 +644,7 @@ func (f *Forwarder) GetSenderReportParams() (int32, uint64, *buffer.RTCPSenderRe
f.lock.RLock()
defer f.lock.RUnlock()
refLayer, currentLayerSpatial := f.getReferenceLayer()
refLayer, currentLayerSpatial := f.getRefLayer()
if refLayer == buffer.InvalidLayerSpatial || !f.refInfos[refLayer].isTSOffsetValid {
return buffer.InvalidLayerSpatial, 0, nil
}
@@ -1566,7 +1567,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
}, ErrUnknownKind
}
func (f *Forwarder) getReferenceLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) {
func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) {
if refLayer < 0 || int(refLayer) > len(f.refInfos) || targetLayer < 0 || int(targetLayer) > len(f.refInfos) {
return 0, fmt.Errorf("invalid layer(s), refLayer: %d, targetLayer: %d", refLayer, targetLayer)
}
@@ -1670,7 +1671,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
switchingAt := time.Now()
if !f.skipReferenceTS {
var err error
refTS, err = f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, f.referenceLayerSpatial, layer)
refTS, err = f.getRefLayerRTPTimestamp(extPkt.Packet.Timestamp, f.referenceLayerSpatial, layer)
if err != nil {
// error out if refTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
@@ -1853,6 +1854,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
}
f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC)
f.lastSSRC = extPkt.Packet.SSRC
f.lastSwitchExtIncomingTS = extPkt.ExtTimestamp
}
tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt, tp.marker)
+8
View File
@@ -720,6 +720,14 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
"mime", w.codec.MimeType,
"layer", layer,
"spatialLayer", spatialLayer,
"sn", pkt.Packet.SequenceNumber,
"esn", pkt.ExtSequenceNumber,
"timestamp", pkt.Packet.Timestamp,
"ets", pkt.ExtTimestamp,
"payloadSize", len(pkt.Packet.Payload),
"rtpVersion", pkt.Packet.Version,
"payloadType", pkt.Packet.PayloadType,
"ssrc", pkt.Packet.SSRC,
)
}