From cee3fdb25e7614f8b8df658cc569ad7c61782465 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 7 Jun 2024 23:56:10 +0530 Subject: [PATCH] Better lock for sender report TS offset. (#2771) * Better lock for sender report TS offset. It is possible that a resume has happened and new time stamp offset calculated. But, a sender report from publisher comes with a time stamp prior to the time stamp which was used for offset calculation. Using that sender report in the forwarding path causes jumps. Example - Track forwarding, let us tsOffset = `a` - Unmute/layer switch - one of those events happens, a new tsOffset will be calculated, let us say that offset is `b` and it is based on incoming time stmap of `c`. - A sender report from publisher could arrive with timestamp = `d`. o If `d` >= `c`, the offset `b` is correct and can be applied. o But, it is possible that `d` < `c`, in that case, offset `a` should be used and not `b`. To address this, keep track of incoming extended timestamp at switch point and accept incoming sender reports which have a timestamp >= switch point timestamp. * clean up * log more details on invalid layer --- pkg/sfu/forwarder.go | 30 ++++++++++++++++-------------- pkg/sfu/receiver.go | 8 ++++++++ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 87c6cc345..68e6df310 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -233,14 +233,15 @@ type Forwarder struct { pubMuted bool resumeBehindThreshold float64 - started bool - preStartTime time.Time - extFirstTS uint64 - lastSSRC uint32 - referenceLayerSpatial int32 - dummyStartTSOffset uint64 - refInfos [buffer.DefaultMaxLayerSpatial + 1]refInfo - refIsSVC bool + started bool + preStartTime time.Time + extFirstTS uint64 + lastSSRC uint32 + lastSwitchExtIncomingTS uint64 + referenceLayerSpatial int32 + dummyStartTSOffset uint64 + refInfos [buffer.DefaultMaxLayerSpatial + 1]refInfo + refIsSVC bool provisional *VideoAllocationProvisional @@ -568,7 +569,7 @@ func (f *Forwarder) GetMaxSubscribedSpatial() int32 { return layer } -func (f *Forwarder) getReferenceLayer() (int32, int32) { +func (f *Forwarder) getRefLayer() (int32, int32) { if f.lastSSRC == 0 { return buffer.InvalidLayerSpatial, buffer.InvalidLayerSpatial } @@ -594,10 +595,10 @@ func (f *Forwarder) SetRefSenderReport(isSVC bool, layer int32, srData *buffer.R defer f.lock.Unlock() f.refIsSVC = isSVC - refLayer, _ := f.getReferenceLayer() + refLayer, _ := f.getRefLayer() if layer >= 0 && int(layer) < len(f.refInfos) { f.refInfos[layer] = refInfo{srData, 0, false} - if layer == refLayer { + if layer == refLayer && srData.RTPTimestampExt >= f.lastSwitchExtIncomingTS { f.refInfos[layer].tsOffset = f.rtpMunger.GetTSOffset() f.refInfos[layer].isTSOffsetValid = true } @@ -643,7 +644,7 @@ func (f *Forwarder) GetSenderReportParams() (int32, uint64, *buffer.RTCPSenderRe f.lock.RLock() defer f.lock.RUnlock() - refLayer, currentLayerSpatial := f.getReferenceLayer() + refLayer, currentLayerSpatial := f.getRefLayer() if refLayer == buffer.InvalidLayerSpatial || !f.refInfos[refLayer].isTSOffsetValid { return buffer.InvalidLayerSpatial, 0, nil } @@ -1566,7 +1567,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) }, ErrUnknownKind } -func (f *Forwarder) getReferenceLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) { +func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) { if refLayer < 0 || int(refLayer) > len(f.refInfos) || targetLayer < 0 || int(targetLayer) > len(f.refInfos) { return 0, fmt.Errorf("invalid layer(s), refLayer: %d, targetLayer: %d", refLayer, targetLayer) } @@ -1670,7 +1671,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e switchingAt := time.Now() if !f.skipReferenceTS { var err error - refTS, err = f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, f.referenceLayerSpatial, layer) + refTS, err = f.getRefLayerRTPTimestamp(extPkt.Packet.Timestamp, f.referenceLayerSpatial, layer) if err != nil { // error out if refTS is not available. It can happen when there is no sender report // for the layer being switched to. Can especially happen at the start of the track when layer switches are @@ -1853,6 +1854,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i } f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC) f.lastSSRC = extPkt.Packet.SSRC + f.lastSwitchExtIncomingTS = extPkt.ExtTimestamp } tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt, tp.marker) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index f530e92a5..1fd0db1c6 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -720,6 +720,14 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { "mime", w.codec.MimeType, "layer", layer, "spatialLayer", spatialLayer, + "sn", pkt.Packet.SequenceNumber, + "esn", pkt.ExtSequenceNumber, + "timestamp", pkt.Packet.Timestamp, + "ets", pkt.ExtTimestamp, + "payloadSize", len(pkt.Packet.Payload), + "rtpVersion", pkt.Packet.Version, + "payloadType", pkt.Packet.PayloadType, + "ssrc", pkt.Packet.SSRC, ) }