diff --git a/go.mod b/go.mod index 3c73a9e29..57a58017e 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jellydator/ttlcache/v3 v3.4.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 - github.com/livekit/mediatransportutil v0.0.0-20251201055132-772504244e19 + github.com/livekit/mediatransportutil v0.0.0-20251204091721-6b6e9a44e81f github.com/livekit/protocol v1.43.3-0.20251204035522-21f690495229 github.com/livekit/psrpc v0.7.1 github.com/mackerelio/go-osstat v0.2.6 diff --git a/go.sum b/go.sum index ad1b9c4fc..9314e84cb 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,8 @@ github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkI github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y= github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5ATTo469PQPkqzdoU7be46ryiCDO3boc= github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20251201055132-772504244e19 h1:JLLiGw3xtmLPzncsxe8wxvNQu91de+IV3BHIG5KlGoQ= -github.com/livekit/mediatransportutil v0.0.0-20251201055132-772504244e19/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= +github.com/livekit/mediatransportutil v0.0.0-20251204091721-6b6e9a44e81f h1:2zvkGDPaC34ufp0RGDG28AcbhXdWsiOg9giAe2BqiMk= +github.com/livekit/mediatransportutil v0.0.0-20251204091721-6b6e9a44e81f/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= github.com/livekit/protocol v1.43.3-0.20251204035522-21f690495229 h1:vY7Y24LPMSt8t8FndopzaNF+vn3+SkzIr6toMu2bBxQ= github.com/livekit/protocol v1.43.3-0.20251204035522-21f690495229/go.mod h1:n00Ul4P6o2YILGhxw+O57B0h/bF3Je9PzRN36fElCmw= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index 8641a4047..ee3fd6d5f 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -24,8 +24,7 @@ import ( "go.uber.org/atomic" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" - "github.com/livekit/livekit-server/pkg/sfu/utils" - + "github.com/livekit/mediatransportutil/pkg/utils" "github.com/livekit/protocol/logger" ) diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index 6fd323e90..a2138ed7d 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -22,9 +22,9 @@ import ( "github.com/pion/rtcp" "go.uber.org/zap/zapcore" - "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/latency" + "github.com/livekit/mediatransportutil/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" protoutils "github.com/livekit/protocol/utils" diff --git a/pkg/sfu/rtpstats/rtpstats_receiver_lite.go b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go index 84812a323..14a07dae2 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go @@ -17,7 +17,7 @@ package rtpstats import ( "go.uber.org/zap/zapcore" - "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/mediatransportutil/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils/mono" ) diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go deleted file mode 100644 index 249ca5ebd..000000000 --- a/pkg/sfu/utils/wraparound.go +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "unsafe" - - "go.uber.org/zap/zapcore" -) - -type number interface { - uint16 | uint32 -} - -type extendedNumber interface { - uint32 | uint64 -} - -type WrapAroundParams struct { - IsRestartAllowed bool -} - -type WrapAround[T number, ET extendedNumber] struct { - params WrapAroundParams - fullRange ET - - initialized bool - start T - highest T - cycles ET - extendedHighest ET -} - -func NewWrapAround[T number, ET extendedNumber](params WrapAroundParams) *WrapAround[T, ET] { - var t T - return &WrapAround[T, ET]{ - params: params, - fullRange: 1 << (unsafe.Sizeof(t) * 8), - } -} - -func (w *WrapAround[T, ET]) MarshalLogObject(e zapcore.ObjectEncoder) error { - if w == nil { - return nil - } - - e.AddUint64("fullRange", uint64(w.fullRange)) - e.AddBool("initialized", w.initialized) - e.AddUint64("start", uint64(w.start)) - e.AddUint64("highest", uint64(w.highest)) - e.AddUint64("cycles", uint64(w.cycles)) - e.AddUint64("extendedHighest", uint64(w.extendedHighest)) - return nil -} - -func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) { - w.initialized = from.initialized - w.start = from.start - w.highest = from.highest - w.cycles = from.cycles - w.updateExtendedHighest() -} - -type WrapAroundUpdateResult[ET extendedNumber] struct { - IsUnhandled bool // when set, other fields are invalid - IsRestart bool - PreExtendedStart ET // valid only if IsRestart = true - PreExtendedHighest ET - ExtendedVal ET -} - -func (w *WrapAroundUpdateResult[ET]) MarshalLogObject(e zapcore.ObjectEncoder) error { - if w == nil { - return nil - } - - e.AddBool("IsUnhandled", w.IsUnhandled) - e.AddBool("IsRestart", w.IsRestart) - e.AddUint64("PreExtendedStart", uint64(w.PreExtendedStart)) - e.AddUint64("PreExtendedHighest", uint64(w.PreExtendedHighest)) - e.AddUint64("ExtendedVal", uint64(w.ExtendedVal)) - return nil -} - -func (w *WrapAround[T, ET]) UpdateWithOrderKnown(val T, orderKnown bool) (result WrapAroundUpdateResult[ET]) { - if !w.initialized { - result.PreExtendedHighest = ET(val) - 1 - result.ExtendedVal = ET(val) - - w.start = val - w.highest = val - w.updateExtendedHighest() - w.initialized = true - return - } - - if !orderKnown { - gap := val - w.highest - if gap > T(w.fullRange>>1) { - // out-of-order - return w.maybeAdjustStart(val) - } - } - - // in-order - result.PreExtendedHighest = w.extendedHighest - - if val < w.highest { - w.cycles += w.fullRange - } - w.highest = val - - w.updateExtendedHighest() - result.ExtendedVal = w.extendedHighest - return -} - -func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { - return w.UpdateWithOrderKnown(val, false) -} - -func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) { - if !w.initialized || result.PreExtendedHighest >= result.ExtendedVal { - return - } - - w.ResetHighest(result.PreExtendedHighest) -} - -func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { - if numCycles < 0 || !w.initialized { - return w.Update(val) - } - - result.PreExtendedHighest = w.extendedHighest - - w.cycles += ET(numCycles) * w.fullRange - w.highest = val - - w.updateExtendedHighest() - result.ExtendedVal = w.extendedHighest - return -} - -func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { - if w.isWrapBack(w.start, T(ev)) { - w.cycles -= w.fullRange - w.updateExtendedHighest() - } - w.start = T(ev) -} - -func (w *WrapAround[T, ET]) ResetHighest(ev ET) { - w.highest = T(ev) - w.cycles = ev & ^(w.fullRange - 1) - w.updateExtendedHighest() -} - -func (w *WrapAround[T, ET]) GetStart() T { - return w.start -} - -func (w *WrapAround[T, ET]) GetExtendedStart() ET { - return ET(w.start) -} - -func (w *WrapAround[T, ET]) GetHighest() T { - return w.highest -} - -func (w *WrapAround[T, ET]) GetExtendedHighest() ET { - return w.extendedHighest -} - -func (w *WrapAround[T, ET]) updateExtendedHighest() { - w.extendedHighest = getExtended(w.cycles, w.highest) -} - -func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (result WrapAroundUpdateResult[ET]) { - // re-adjust start if necessary. The conditions are - // 1. Not seen more than half the range yet - // 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space - // 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space - cycles := w.cycles - totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1 - if totalNum > (w.fullRange >> 1) { - if w.isWrapBack(val, w.highest) { - cycles -= w.fullRange - } - result.PreExtendedHighest = w.extendedHighest - result.ExtendedVal = getExtended(cycles, val) - return - } - - if val-w.start > T(w.fullRange>>1) { - if w.params.IsRestartAllowed { - // out-of-order with existing start => a new start - result.IsRestart = true - if val > w.start { - result.PreExtendedStart = w.fullRange + ET(w.start) - } else { - result.PreExtendedStart = ET(w.start) - } - - if w.isWrapBack(val, w.highest) { - w.cycles = w.fullRange - w.updateExtendedHighest() - cycles = 0 - } - w.start = val - } else { - result.IsUnhandled = true - } - } else { - if w.isWrapBack(val, w.highest) { - cycles -= w.fullRange - } - } - result.PreExtendedHighest = w.extendedHighest - result.ExtendedVal = getExtended(cycles, val) - return -} - -func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool { - return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1) -} - -// ------------------------------------ - -func getExtended[T number, ET extendedNumber](cycles ET, val T) ET { - return cycles + ET(val) -} diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go deleted file mode 100644 index 4f2e505d5..000000000 --- a/pkg/sfu/utils/wraparound_test.go +++ /dev/null @@ -1,679 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestWrapAroundUint16(t *testing.T) { - w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: true}) - testCases := []struct { - name string - input uint16 - updated WrapAroundUpdateResult[uint32] - start uint16 - extendedStart uint32 - highest uint16 - extendedHighest uint32 - }{ - // initialize - { - name: "initialize", - input: 10, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 9, - ExtendedVal: 10, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // an older number without wrap around should reset start point - { - name: "reset start no wrap around", - input: 8, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: true, - PreExtendedStart: 10, - PreExtendedHighest: 10, - ExtendedVal: 8, - }, - start: 8, - extendedStart: 8, - highest: 10, - extendedHighest: 10, - }, - // an older number with wrap around should reset start point - { - name: "reset start wrap around", - input: (1 << 16) - 6, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: true, - PreExtendedStart: (1 << 16) + 8, - PreExtendedHighest: (1 << 16) + 10, - ExtendedVal: (1 << 16) - 6, - }, - start: (1 << 16) - 6, - extendedStart: (1 << 16) - 6, - highest: 10, - extendedHighest: (1 << 16) + 10, - }, - // an older number with wrap around should reset start point again - { - name: "reset start again", - input: (1 << 16) - 12, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: true, - PreExtendedStart: (1 << 16) - 6, - PreExtendedHighest: (1 << 16) + 10, - ExtendedVal: (1 << 16) - 12, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: 10, - extendedHighest: (1 << 16) + 10, - }, - // out of order with highest, wrap back, but no restart - { - name: "out of order - no restart", - input: (1 << 16) - 3, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + 10, - ExtendedVal: (1 << 16) - 3, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: 10, - extendedHighest: (1 << 16) + 10, - }, - // duplicate should return same as highest - { - name: "duplicate", - input: 10, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + 10, - ExtendedVal: (1 << 16) + 10, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: 10, - extendedHighest: (1 << 16) + 10, - }, - // a significant jump in order should not reset start - { - name: "big in-order jump", - input: (1 << 15) - 10, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + 10, - ExtendedVal: (1 << 16) + (1 << 15) - 10, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: (1 << 15) - 10, - extendedHighest: (1 << 16) + (1 << 15) - 10, - }, - // now out-of-order should not reset start as half the range has been seen - { - name: "out-of-order after half range", - input: (1 << 15) - 11, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + (1 << 15) - 10, - ExtendedVal: (1 << 16) + (1 << 15) - 11, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: (1 << 15) - 10, - extendedHighest: (1 << 16) + (1 << 15) - 10, - }, - // wrap back out-of-order - { - name: "wrap back out-of-order after half range", - input: (1 << 16) - 1, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + (1 << 15) - 10, - ExtendedVal: (1 << 16) - 1, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: (1 << 15) - 10, - extendedHighest: (1 << 16) + (1 << 15) - 10, - }, - // in-order, should update highest - { - name: "in-order", - input: (1 << 15) + 3, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 16) + (1 << 15) - 10, - ExtendedVal: (1 << 16) + (1 << 15) + 3, - }, - start: (1 << 16) - 12, - extendedStart: (1 << 16) - 12, - highest: (1 << 15) + 3, - extendedHighest: (1 << 16) + (1 << 15) + 3, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.updated, w.Update(tc.input)) - require.Equal(t, tc.start, w.GetStart()) - require.Equal(t, tc.extendedStart, w.GetExtendedStart()) - require.Equal(t, tc.highest, w.GetHighest()) - require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) - }) - } -} - -func TestWrapAroundUint16NoRestart(t *testing.T) { - w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false}) - testCases := []struct { - name string - input uint16 - updated WrapAroundUpdateResult[uint32] - start uint16 - extendedStart uint32 - highest uint16 - extendedHighest uint32 - }{ - // initialize - { - name: "initialize", - input: 10, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 9, - ExtendedVal: 10, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // an older number without wrap around should not reset start point - { - name: "no reset start no wrap around", - input: 8, - updated: WrapAroundUpdateResult[uint32]{ - IsUnhandled: true, - // the following fields are not valid when `IsUnhandled = true`, but code fills it in - // and they are filled in here for testing purposes - PreExtendedHighest: 10, - ExtendedVal: 8, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // an older number with wrap around should not reset start point - { - name: "no reset start wrap around", - input: (1 << 16) - 6, - updated: WrapAroundUpdateResult[uint32]{ - IsUnhandled: true, - PreExtendedHighest: 10, - ExtendedVal: (1 << 16) - 6, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // yet another older number with wrap around should not reset start point - { - name: "no reset start again", - input: (1 << 16) - 12, - updated: WrapAroundUpdateResult[uint32]{ - IsUnhandled: true, - PreExtendedHighest: 10, - ExtendedVal: (1 << 16) - 12, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // duplicate should return same as highest - { - name: "duplicate", - input: 10, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: 10, - ExtendedVal: 10, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // a significant jump in order should move highest to that - { - name: "big in-order jump", - input: (1 << 15) - 10, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: 10, - ExtendedVal: (1 << 15) - 10, - }, - start: 10, - extendedStart: 10, - highest: (1 << 15) - 10, - extendedHighest: (1 << 15) - 10, - }, - // in-order, should update highest - { - name: "in-order", - input: (1 << 15) + 13, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: (1 << 15) - 10, - ExtendedVal: (1 << 15) + 13, - }, - start: 10, - extendedStart: 10, - highest: (1 << 15) + 13, - extendedHighest: (1 << 15) + 13, - }, - // now out-of-order should not reset start as half the range has been seen - { - name: "out-of-order after half range", - input: (1 << 15) - 11, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: (1 << 15) + 13, - ExtendedVal: (1 << 15) - 11, - }, - start: 10, - extendedStart: 10, - highest: (1 << 15) + 13, - extendedHighest: (1 << 15) + 13, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.updated, w.Update(tc.input)) - require.Equal(t, tc.start, w.GetStart()) - require.Equal(t, tc.extendedStart, w.GetExtendedStart()) - require.Equal(t, tc.highest, w.GetHighest()) - require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) - }) - } -} - -func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) { - w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true}) - - // initialize - w.Update(23) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(23), w.GetHighest()) - require.Equal(t, uint64(23), w.GetExtendedHighest()) - - // an in-order update - w.Update(25) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint64(25), w.GetExtendedHighest()) - - // force restart without wrap - res := w.Update(12) - expectedResult := WrapAroundUpdateResult[uint64]{ - IsRestart: true, - PreExtendedStart: 23, - PreExtendedHighest: 25, - ExtendedVal: 12, - } - require.Equal(t, expectedResult, res) - require.Equal(t, uint16(12), w.GetStart()) - require.Equal(t, uint64(12), w.GetExtendedStart()) - require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint64(25), w.GetExtendedHighest()) - - // roll back restart - w.RollbackRestart(res.PreExtendedStart) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint64(25), w.GetExtendedHighest()) - - // force restart with wrap - res = w.Update(65533) - expectedResult = WrapAroundUpdateResult[uint64]{ - IsRestart: true, - PreExtendedStart: (1 << 16) + 23, - PreExtendedHighest: (1 << 16) + 25, - ExtendedVal: 65533, - } - require.Equal(t, expectedResult, res) - require.Equal(t, uint16(65533), w.GetStart()) - require.Equal(t, uint64(65533), w.GetExtendedStart()) - require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint64(65536+25), w.GetExtendedHighest()) - - // roll back restart - w.RollbackRestart(res.PreExtendedStart) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint64(25), w.GetExtendedHighest()) - - // reset highest - w.ResetHighest(0x1234) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(0x1234), w.GetHighest()) - require.Equal(t, uint64(0x1234), w.GetExtendedHighest()) - - w.ResetHighest(0x7f1234) - require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint64(23), w.GetExtendedStart()) - require.Equal(t, uint16(0x1234), w.GetHighest()) - require.Equal(t, uint64(0x7f1234), w.GetExtendedHighest()) -} - -func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) { - w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true}) - - // initialize - w.Update(65534) - require.Equal(t, uint16(65534), w.GetStart()) - require.Equal(t, uint64(65534), w.GetExtendedStart()) - require.Equal(t, uint16(65534), w.GetHighest()) - require.Equal(t, uint64(65534), w.GetExtendedHighest()) - - // an in-order update with a roll over - w.Update(32) - require.Equal(t, uint16(65534), w.GetStart()) - require.Equal(t, uint64(65534), w.GetExtendedStart()) - require.Equal(t, uint16(32), w.GetHighest()) - require.Equal(t, uint64(65568), w.GetExtendedHighest()) - - // duplicate of start - res := w.Update(65534) - expectedResult := WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 65568, - ExtendedVal: 65534, - } - require.Equal(t, expectedResult, res) - require.Equal(t, uint16(65534), w.GetStart()) - require.Equal(t, uint64(65534), w.GetExtendedStart()) - require.Equal(t, uint16(32), w.GetHighest()) - require.Equal(t, uint64(65568), w.GetExtendedHighest()) - - // duplicate of start - again - res = w.Update(65534) - expectedResult = WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 65568, - ExtendedVal: 65534, - } - require.Equal(t, expectedResult, res) - require.Equal(t, uint16(65534), w.GetStart()) - require.Equal(t, uint64(65534), w.GetExtendedStart()) - require.Equal(t, uint16(32), w.GetHighest()) - require.Equal(t, uint64(65568), w.GetExtendedHighest()) -} - -func TestWrapAroundUint16Rollover(t *testing.T) { - w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false}) - testCases := []struct { - name string - input uint16 - numCycles int - updated WrapAroundUpdateResult[uint32] - start uint16 - extendedStart uint32 - highest uint16 - extendedHighest uint32 - }{ - // initialize - should initialize irrespective of numCycles - { - name: "initialize", - input: 10, - numCycles: 10, - updated: WrapAroundUpdateResult[uint32]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 9, - ExtendedVal: 10, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // negative cycles - should just do an update - { - name: "zero", - input: 8, - numCycles: -1, - updated: WrapAroundUpdateResult[uint32]{ - IsUnhandled: true, - // the following fields are not valid when `IsUnhandled = true`, but code fills it in - // and they are filled in here for testing purposes - PreExtendedHighest: 10, - ExtendedVal: 8, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // one cycle - { - name: "one cycle", - input: (1 << 16) - 6, - numCycles: 1, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: 10, - ExtendedVal: (1 << 16) - 6 + (1 << 16), - }, - start: 10, - extendedStart: 10, - highest: (1 << 16) - 6, - extendedHighest: (1 << 16) - 6 + (1 << 16), - }, - // two cycles - { - name: "two cycles", - input: (1 << 16) - 7, - numCycles: 2, - updated: WrapAroundUpdateResult[uint32]{ - PreExtendedHighest: (1 << 16) - 6 + (1 << 16), - ExtendedVal: (1 << 16) - 7 + 3*(1<<16), - }, - start: 10, - extendedStart: 10, - highest: (1 << 16) - 7, - extendedHighest: (1 << 16) - 7 + 3*(1<<16), - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.updated, w.Rollover(tc.input, tc.numCycles)) - require.Equal(t, tc.start, w.GetStart()) - require.Equal(t, tc.extendedStart, w.GetExtendedStart()) - require.Equal(t, tc.highest, w.GetHighest()) - require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) - }) - } -} - -func TestWrapAroundUint32(t *testing.T) { - w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true}) - testCases := []struct { - name string - input uint32 - updated WrapAroundUpdateResult[uint64] - start uint32 - extendedStart uint64 - highest uint32 - extendedHighest uint64 - }{ - // initialize - { - name: "initialize", - input: 10, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: 9, - ExtendedVal: 10, - }, - start: 10, - extendedStart: 10, - highest: 10, - extendedHighest: 10, - }, - // an older number without wrap around should reset start point - { - name: "reset start no wrap around", - input: 8, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: true, - PreExtendedStart: 10, - PreExtendedHighest: 10, - ExtendedVal: 8, - }, - start: 8, - extendedStart: 8, - highest: 10, - extendedHighest: 10, - }, - // an older number with wrap around should reset start point - { - name: "reset start wrap around", - input: (1 << 32) - 6, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: true, - PreExtendedStart: (1 << 32) + 8, - PreExtendedHighest: (1 << 32) + 10, - ExtendedVal: (1 << 32) - 6, - }, - start: (1 << 32) - 6, - extendedStart: (1 << 32) - 6, - highest: 10, - extendedHighest: (1 << 32) + 10, - }, - // an older number with wrap around should reset start point again - { - name: "reset start again", - input: (1 << 32) - 12, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: true, - PreExtendedStart: (1 << 32) - 6, - PreExtendedHighest: (1 << 32) + 10, - ExtendedVal: (1 << 32) - 12, - }, - start: (1 << 32) - 12, - extendedStart: (1 << 32) - 12, - highest: 10, - extendedHighest: (1 << 32) + 10, - }, - // duplicate should return same as highest - { - name: "duplicate", - input: 10, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 32) + 10, - ExtendedVal: (1 << 32) + 10, - }, - start: (1 << 32) - 12, - extendedStart: (1 << 32) - 12, - highest: 10, - extendedHighest: (1 << 32) + 10, - }, - // a significant jump in order should not reset start - { - name: "big in-order jump", - input: 1 << 31, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 32) + 10, - ExtendedVal: (1 << 32) + (1 << 31), - }, - start: (1 << 32) - 12, - extendedStart: (1 << 32) - 12, - highest: 1 << 31, - extendedHighest: (1 << 32) + (1 << 31), - }, - // now out-of-order should not reset start as half the range has been seen - { - name: "out-of-order after half range", - input: (1 << 31) - 1, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 32) + (1 << 31), - ExtendedVal: (1 << 32) + (1 << 31) - 1, - }, - start: (1 << 32) - 12, - extendedStart: (1 << 32) - 12, - highest: 1 << 31, - extendedHighest: (1 << 32) + (1 << 31), - }, - // in-order, should update highest - { - name: "in-order", - input: (1 << 31) + 3, - updated: WrapAroundUpdateResult[uint64]{ - IsRestart: false, - PreExtendedStart: 0, - PreExtendedHighest: (1 << 32) + (1 << 31), - ExtendedVal: (1 << 32) + (1 << 31) + 3, - }, - start: (1 << 32) - 12, - extendedStart: (1 << 32) - 12, - highest: (1 << 31) + 3, - extendedHighest: (1 << 32) + (1 << 31) + 3, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.updated, w.Update(tc.input)) - require.Equal(t, tc.start, w.GetStart()) - require.Equal(t, tc.extendedStart, w.GetExtendedStart()) - require.Equal(t, tc.highest, w.GetHighest()) - require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) - }) - } -} diff --git a/pkg/sfu/videolayerselector/framenumberwrapper_test.go b/pkg/sfu/videolayerselector/framenumberwrapper_test.go index b18f509db..c00aa89a2 100644 --- a/pkg/sfu/videolayerselector/framenumberwrapper_test.go +++ b/pkg/sfu/videolayerselector/framenumberwrapper_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/mediatransportutil/pkg/utils" "github.com/livekit/protocol/logger" )