From ddece1fbb0ea6c3fa2ec7437d98faea82d651ca9 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 8 Apr 2024 11:29:55 +0530 Subject: [PATCH] Use aarival time in cached packets. (#2633) --- pkg/sfu/buffer/buffer.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 70f9940b3..8b7678671 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -301,9 +301,10 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { return } + now := time.Now() if b.twcc != nil && b.twccExt != 0 && !b.closed.Load() { if ext := rtpPacket.GetExtension(b.twccExt); ext != nil { - b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), time.Now().UnixNano(), rtpPacket.Marker) + b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker) } } @@ -316,7 +317,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { return } - pb.writeRTX(&rtpPacket) + pb.writeRTX(&rtpPacket, now) return } @@ -325,7 +326,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { copy(packet, pkt) b.pPackets = append(b.pPackets, pendingPacket{ packet: packet, - arrivalTime: time.Now(), + arrivalTime: now, }) b.Unlock() return @@ -353,11 +354,11 @@ func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) { if rtpPacket.Padding && len(rtpPacket.Payload) == 0 { continue } - primaryBuffer.writeRTX(&rtpPacket) + primaryBuffer.writeRTX(&rtpPacket, pp.arrivalTime) } } -func (b *Buffer) writeRTX(rtxPkt *rtp.Packet) (n int, err error) { +func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime time.Time) (n int, err error) { b.Lock() defer b.Unlock() if !b.bound { @@ -368,19 +369,18 @@ func (b *Buffer) writeRTX(rtxPkt *rtp.Packet) (n int, err error) { b.rtxPktBuf = make([]byte, bucket.MaxPktSize) } - videoPkt := *rtxPkt - videoPkt.PayloadType = b.payloadType - videoPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2]) - videoPkt.SSRC = b.mediaSSRC - videoPkt.Payload = rtxPkt.Payload[2:] - n, err = videoPkt.MarshalTo(b.rtxPktBuf) - + repairedPkt := *rtxPkt + repairedPkt.PayloadType = b.payloadType + repairedPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2]) + repairedPkt.SSRC = b.mediaSSRC + repairedPkt.Payload = rtxPkt.Payload[2:] + n, err = repairedPkt.MarshalTo(b.rtxPktBuf) if err != nil { - b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.mediaSSRC, "sn", videoPkt.SequenceNumber) + b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.mediaSSRC, "sn", repairedPkt.SequenceNumber) return } - b.calc(b.rtxPktBuf[:n], &videoPkt, time.Now(), true) + b.calc(b.rtxPktBuf[:n], &repairedPkt, arrivalTime, true) return } @@ -664,7 +664,6 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) { - if b.audioLevelExt != 0 && !isRTX { if !b.latestTSForAudioLevelInitialized { b.latestTSForAudioLevelInitialized = true