mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
Handle RED extended sequence number. (#2123)
When converting from RED -> Opus, if there is a loss, SFU recovers that loss if it can using a subsequent redundant packet. That path was not setting the extended sequence number properly. Also, ensuring use of monotonic clock for first packet time adjustment also.
This commit is contained in:
@@ -474,7 +474,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64)
|
||||
}
|
||||
|
||||
samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second))
|
||||
now := time.Now()
|
||||
timeSinceFirst := time.Since(r.firstTime)
|
||||
now := r.firstTime.Add(timeSinceFirst)
|
||||
firstTime := now.Add(-samplesDuration)
|
||||
if firstTime.Before(r.firstTime) {
|
||||
r.logger.Debugw(
|
||||
|
||||
@@ -281,10 +281,24 @@ func (r *RTPStatsSender) Update(
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.Infow(
|
||||
"adjusting start sequence number",
|
||||
"snBefore", r.extStartSN,
|
||||
"snAfter", extSequenceNumber,
|
||||
"tsBefore", r.extStartTS,
|
||||
"tsAfter", extTimestamp,
|
||||
)
|
||||
r.extStartSN = extSequenceNumber
|
||||
}
|
||||
|
||||
if extTimestamp < r.extStartTS {
|
||||
r.logger.Infow(
|
||||
"adjusting start timestamp",
|
||||
"snBefore", r.extStartSN,
|
||||
"snAfter", extSequenceNumber,
|
||||
"tsBefore", r.extStartTS,
|
||||
"tsAfter", extTimestamp,
|
||||
)
|
||||
r.extStartTS = extTimestamp
|
||||
}
|
||||
|
||||
|
||||
@@ -69,8 +69,14 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
|
||||
return
|
||||
}
|
||||
|
||||
for _, sendPkt := range pkts {
|
||||
for i, sendPkt := range pkts {
|
||||
pPkt := *pkt
|
||||
if i != len(pkts)-1 {
|
||||
// patch extended sequence number and time stmap for all but the last packet,
|
||||
// last packet is the primary payload
|
||||
pPkt.ExtSequenceNumber -= uint64(pkts[len(pkts)-1].SequenceNumber - pkts[i].SequenceNumber)
|
||||
pPkt.ExtTimestamp -= uint64(pkts[len(pkts)-1].Timestamp - pkts[i].Timestamp)
|
||||
}
|
||||
pPkt.Packet = sendPkt
|
||||
|
||||
// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
|
||||
@@ -143,6 +149,7 @@ func (r *RedPrimaryReceiver) getSendPktsFromRed(rtp *rtp.Packet) ([]*rtp.Packet,
|
||||
switch {
|
||||
case diff == 0: // duplicate
|
||||
break
|
||||
|
||||
case diff > 0x8000: // unorder
|
||||
// in history
|
||||
if 65535-diff < 8 {
|
||||
@@ -191,25 +198,36 @@ func extractPktsFromRed(redPkt *rtp.Packet, recoverBits byte) ([]*rtp.Packet, er
|
||||
var blocks []block
|
||||
var blockLength int
|
||||
for {
|
||||
if len(payload) < 1 {
|
||||
// illegal data, need at least one byte for primary encoding
|
||||
return nil, ErrIncompleteRedHeader
|
||||
}
|
||||
|
||||
if payload[0]&0x80 == 0 {
|
||||
// last block is primary encoding data
|
||||
pt := uint8(payload[0] & 0x7F)
|
||||
|
||||
blocks = append(blocks, block{pt: pt, primary: true})
|
||||
|
||||
payload = payload[1:]
|
||||
blocks = append(blocks, block{primary: true})
|
||||
break
|
||||
} else {
|
||||
if len(payload) < 4 {
|
||||
// illegal data
|
||||
return nil, ErrIncompleteRedHeader
|
||||
}
|
||||
|
||||
blockHead := binary.BigEndian.Uint32(payload[0:])
|
||||
length := int(blockHead & 0x03FF)
|
||||
blockHead >>= 10
|
||||
tsOffset := blockHead & 0x3FFF
|
||||
blockHead >>= 14
|
||||
pt := uint8(blockHead & 0x7F)
|
||||
payload = payload[4:]
|
||||
blockLength += length
|
||||
|
||||
blocks = append(blocks, block{pt: pt, length: length, tsOffset: tsOffset})
|
||||
|
||||
blockLength += length
|
||||
payload = payload[4:]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,22 +238,26 @@ func extractPktsFromRed(redPkt *rtp.Packet, recoverBits byte) ([]*rtp.Packet, er
|
||||
pkts := make([]*rtp.Packet, 0, len(blocks))
|
||||
for i, b := range blocks {
|
||||
if b.primary {
|
||||
header := redPkt.Header
|
||||
header.PayloadType = b.pt
|
||||
pkts = append(pkts, &rtp.Packet{Header: redPkt.Header, Payload: payload})
|
||||
break
|
||||
}
|
||||
|
||||
// last block is primary encoding
|
||||
recoverIndex := len(blocks) - i - 1
|
||||
if recoverIndex < 1 || recoverBits&(1<<(recoverIndex-1)) == 0 {
|
||||
// skip past packet/block that does not need recovery
|
||||
payload = payload[b.length:]
|
||||
continue
|
||||
}
|
||||
|
||||
// recover missing packet
|
||||
header := redPkt.Header
|
||||
header.SequenceNumber -= uint16(recoverIndex)
|
||||
header.Timestamp -= b.tsOffset
|
||||
header.PayloadType = b.pt
|
||||
pkts = append(pkts, &rtp.Packet{Header: header, Payload: payload[:b.length]})
|
||||
|
||||
payload = payload[b.length:]
|
||||
}
|
||||
|
||||
@@ -257,6 +279,11 @@ func extractPrimaryEncodingForRED(payload []byte) ([]byte, error) {
|
||||
|
||||
var blockLength int
|
||||
for {
|
||||
if len(payload) < 1 {
|
||||
// illegal data, need at least one byte for primary encoding
|
||||
return nil, ErrIncompleteRedHeader
|
||||
}
|
||||
|
||||
if payload[0]&0x80 == 0 {
|
||||
// last block is primary encoding data
|
||||
payload = payload[1:]
|
||||
@@ -266,6 +293,7 @@ func extractPrimaryEncodingForRED(payload []byte) ([]byte, error) {
|
||||
// illegal data
|
||||
return nil, ErrIncompleteRedHeader
|
||||
}
|
||||
|
||||
blockLength += int(binary.BigEndian.Uint16(payload[2:]) & 0x03FF)
|
||||
payload = payload[4:]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user