mirror of
https://github.com/livekit/livekit.git
synced 2026-06-04 23:52:06 +00:00
Fix pre-extended value on wrap back restart. (#2202)
When wrapping back on a restart, was not setting pre-extended values properly. It was missing a cycle. That caused rare large sequence number gap.
This commit is contained in:
@@ -70,16 +70,15 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
|
||||
return
|
||||
}
|
||||
|
||||
result.PreExtendedHighest = w.extendedHighest
|
||||
|
||||
gap := val - w.highest
|
||||
if gap > T(w.fullRange>>1) {
|
||||
// out-of-order
|
||||
result.IsRestart, result.PreExtendedStart, result.ExtendedVal = w.maybeAdjustStart(val)
|
||||
return
|
||||
return w.maybeAdjustStart(val)
|
||||
}
|
||||
|
||||
// in-order
|
||||
result.PreExtendedHighest = w.extendedHighest
|
||||
|
||||
if val < w.highest {
|
||||
w.cycles += w.fullRange
|
||||
}
|
||||
@@ -124,7 +123,7 @@ func (w *WrapAround[T, ET]) updateExtendedHighest() {
|
||||
w.extendedHighest = getExtendedHighest(w.cycles, w.highest)
|
||||
}
|
||||
|
||||
func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtendedStart ET, extendedVal ET) {
|
||||
func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (result WrapAroundUpdateResult[ET]) {
|
||||
// re-adjust start if necessary. The conditions are
|
||||
// 1. Not seen more than half the range yet
|
||||
// 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
|
||||
@@ -135,14 +134,19 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
|
||||
if w.isWrapBack(val, w.highest) {
|
||||
cycles -= w.fullRange
|
||||
}
|
||||
extendedVal = getExtendedHighest(cycles, val)
|
||||
result.PreExtendedHighest = w.extendedHighest
|
||||
result.ExtendedVal = getExtendedHighest(cycles, val)
|
||||
return
|
||||
}
|
||||
|
||||
if val-w.start > T(w.fullRange>>1) {
|
||||
// out-of-order with existing start => a new start
|
||||
isRestart = true
|
||||
preExtendedStart = w.GetExtendedStart()
|
||||
result.IsRestart = true
|
||||
if val > w.start {
|
||||
result.PreExtendedStart = w.fullRange + ET(w.start)
|
||||
} else {
|
||||
result.PreExtendedStart = ET(w.start)
|
||||
}
|
||||
|
||||
if w.isWrapBack(val, w.highest) {
|
||||
w.cycles = w.fullRange
|
||||
@@ -155,7 +159,8 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
|
||||
cycles -= w.fullRange
|
||||
}
|
||||
}
|
||||
extendedVal = getExtendedHighest(cycles, val)
|
||||
result.PreExtendedHighest = w.extendedHighest
|
||||
result.ExtendedVal = getExtendedHighest(cycles, val)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -67,8 +67,8 @@ func TestWrapAroundUint16(t *testing.T) {
|
||||
input: (1 << 16) - 6,
|
||||
updated: WrapAroundUpdateResult[uint32]{
|
||||
IsRestart: true,
|
||||
PreExtendedStart: 8,
|
||||
PreExtendedHighest: 10,
|
||||
PreExtendedStart: (1 << 16) + 8,
|
||||
PreExtendedHighest: (1 << 16) + 10,
|
||||
ExtendedVal: (1 << 16) - 6,
|
||||
},
|
||||
start: (1 << 16) - 6,
|
||||
@@ -236,8 +236,8 @@ func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) {
|
||||
res = w.Update(65533)
|
||||
expectedResult = WrapAroundUpdateResult[uint64]{
|
||||
IsRestart: true,
|
||||
PreExtendedStart: 23,
|
||||
PreExtendedHighest: 25,
|
||||
PreExtendedStart: (1 << 16) + 23,
|
||||
PreExtendedHighest: (1 << 16) + 25,
|
||||
ExtendedVal: 65533,
|
||||
}
|
||||
require.Equal(t, expectedResult, res)
|
||||
@@ -267,6 +267,52 @@ func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) {
|
||||
require.Equal(t, uint64(0x7f1234), w.GetExtendedHighest())
|
||||
}
|
||||
|
||||
func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) {
|
||||
w := NewWrapAround[uint16, uint64]()
|
||||
|
||||
// initialize
|
||||
w.Update(65534)
|
||||
require.Equal(t, uint16(65534), w.GetStart())
|
||||
require.Equal(t, uint64(65534), w.GetExtendedStart())
|
||||
require.Equal(t, uint16(65534), w.GetHighest())
|
||||
require.Equal(t, uint64(65534), w.GetExtendedHighest())
|
||||
|
||||
// an in-order update with a roll over
|
||||
w.Update(32)
|
||||
require.Equal(t, uint16(65534), w.GetStart())
|
||||
require.Equal(t, uint64(65534), w.GetExtendedStart())
|
||||
require.Equal(t, uint16(32), w.GetHighest())
|
||||
require.Equal(t, uint64(65568), w.GetExtendedHighest())
|
||||
|
||||
// duplicate of start
|
||||
res := w.Update(65534)
|
||||
expectedResult := WrapAroundUpdateResult[uint64]{
|
||||
IsRestart: false,
|
||||
PreExtendedStart: 0,
|
||||
PreExtendedHighest: 65568,
|
||||
ExtendedVal: 65534,
|
||||
}
|
||||
require.Equal(t, expectedResult, res)
|
||||
require.Equal(t, uint16(65534), w.GetStart())
|
||||
require.Equal(t, uint64(65534), w.GetExtendedStart())
|
||||
require.Equal(t, uint16(32), w.GetHighest())
|
||||
require.Equal(t, uint64(65568), w.GetExtendedHighest())
|
||||
|
||||
// duplicate of start - again
|
||||
res = w.Update(65534)
|
||||
expectedResult = WrapAroundUpdateResult[uint64]{
|
||||
IsRestart: false,
|
||||
PreExtendedStart: 0,
|
||||
PreExtendedHighest: 65568,
|
||||
ExtendedVal: 65534,
|
||||
}
|
||||
require.Equal(t, expectedResult, res)
|
||||
require.Equal(t, uint16(65534), w.GetStart())
|
||||
require.Equal(t, uint64(65534), w.GetExtendedStart())
|
||||
require.Equal(t, uint16(32), w.GetHighest())
|
||||
require.Equal(t, uint64(65568), w.GetExtendedHighest())
|
||||
}
|
||||
|
||||
func TestWrapAroundUint32(t *testing.T) {
|
||||
w := NewWrapAround[uint32, uint64]()
|
||||
testCases := []struct {
|
||||
@@ -314,8 +360,8 @@ func TestWrapAroundUint32(t *testing.T) {
|
||||
input: (1 << 32) - 6,
|
||||
updated: WrapAroundUpdateResult[uint64]{
|
||||
IsRestart: true,
|
||||
PreExtendedStart: 8,
|
||||
PreExtendedHighest: 10,
|
||||
PreExtendedStart: (1 << 32) + 8,
|
||||
PreExtendedHighest: (1 << 32) + 10,
|
||||
ExtendedVal: (1 << 32) - 6,
|
||||
},
|
||||
start: (1 << 32) - 6,
|
||||
|
||||
Reference in New Issue
Block a user