finish timed version migration (#1443)

* finish timed version migration

* update protocol dep
This commit is contained in:
Paul Wells
2023-02-18 12:08:08 -08:00
committed by GitHub
parent 49475b93ca
commit b35d64ae86
10 changed files with 10 additions and 174 deletions

2
go.mod
View File

@@ -17,7 +17,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a
github.com/livekit/protocol v1.4.2
github.com/livekit/protocol v1.4.3-0.20230218193429-26f188cb8404
github.com/livekit/psrpc v0.2.7
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3

4
go.sum
View File

@@ -232,8 +232,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.4.2 h1:XmobZ9eQmTSThWUmkko48tGf0AWfPMVWzpWzt3uoj+4=
github.com/livekit/protocol v1.4.2/go.mod h1:mVzmVesPCIgk2gg/jMr6PWtHu8dfRdhaAJ6okFs5nQw=
github.com/livekit/protocol v1.4.3-0.20230218193429-26f188cb8404 h1:kVrJYB9o8Cu5MpzIf4+e22auWMSlLhhdDVG3TZw1QQw=
github.com/livekit/protocol v1.4.3-0.20230218193429-26f188cb8404/go.mod h1:mVzmVesPCIgk2gg/jMr6PWtHu8dfRdhaAJ6okFs5nQw=
github.com/livekit/psrpc v0.2.7 h1:j8ns7+t/7LJxTH/jnD9Ds7qn3VkNOV/qCUbt2nbirfU=
github.com/livekit/psrpc v0.2.7/go.mod h1:2wtOo1F03vub2qIjx0rAPpVplg873670/LN08o/yopM=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=

View File

@@ -25,7 +25,6 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/telemetry"
utils2 "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
@@ -78,7 +77,7 @@ type ParticipantParams struct {
GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo
ReconnectOnPublicationError bool
ReconnectOnSubscriptionError bool
VersionGenerator utils2.TimedVersionGenerator
VersionGenerator utils.TimedVersionGenerator
TrackResolver types.MediaTrackResolver
DisableDynacast bool
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/utils"
)
var (

View File

@@ -5,9 +5,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"

View File

@@ -10,7 +10,6 @@ import (
"github.com/pkg/errors"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
serverutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
@@ -51,7 +50,7 @@ type RoomManager struct {
telemetry telemetry.TelemetryService
clientConfManager clientconfiguration.ClientConfigurationManager
egressLauncher rtc.EgressLauncher
versionGenerator serverutils.TimedVersionGenerator
versionGenerator utils.TimedVersionGenerator
rooms map[livekit.RoomName]*rtc.Room
@@ -66,7 +65,7 @@ func NewLocalRoomManager(
telemetry telemetry.TelemetryService,
clientConfManager clientconfiguration.ClientConfigurationManager,
egressLauncher rtc.EgressLauncher,
versionGenerator serverutils.TimedVersionGenerator,
versionGenerator utils.TimedVersionGenerator,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
if err != nil {

View File

@@ -17,12 +17,12 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/livekit/psrpc"
)

View File

@@ -12,12 +12,12 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
redis2 "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/livekit/psrpc"
"github.com/pion/turn/v2"

View File

@@ -1,119 +0,0 @@
package utils
import (
"fmt"
"sync"
"time"
"github.com/livekit/protocol/livekit"
)
type TimedVersionGenerator interface {
New() *TimedVersion
}
func NewDefaultTimedVersionGenerator() TimedVersionGenerator {
return &timedVersionGenerator{}
}
type timedVersionGenerator struct {
lock sync.Mutex
ts int64
ticks int32
}
func (g *timedVersionGenerator) New() *TimedVersion {
ts := time.Now().UnixMicro()
var ticks int32
g.lock.Lock()
if ts <= g.ts {
g.ticks++
ts = g.ts
ticks = g.ticks
} else {
g.ts = ts
g.ticks = 0
}
g.lock.Unlock()
return &TimedVersion{
ts: ts,
ticks: ticks,
}
}
type TimedVersion struct {
lock sync.RWMutex
ts int64
ticks int32
}
func NewTimedVersionFromProto(ptv *livekit.TimedVersion) *TimedVersion {
return &TimedVersion{
ts: ptv.UnixMicro,
ticks: ptv.Ticks,
}
}
func NewTimedVersionFromTime(t time.Time) *TimedVersion {
return &TimedVersion{
ts: t.UnixMicro(),
}
}
func (t *TimedVersion) Update(other *TimedVersion) {
t.lock.Lock()
if other.After(t) {
t.ts = other.ts
t.ticks = other.ticks
}
t.lock.Unlock()
}
func (t *TimedVersion) After(other *TimedVersion) bool {
t.lock.RLock()
defer t.lock.RUnlock()
if t.ts == other.ts {
return t.ticks > other.ticks
}
return t.ts > other.ts
}
func (t *TimedVersion) Compare(other *TimedVersion) int {
t.lock.RLock()
defer t.lock.RUnlock()
if t.ts == other.ts {
if t.ticks == other.ticks {
return 0
}
if t.ticks > other.ticks {
return 1
}
return -1
}
if t.ts > other.ts {
return 1
}
return -1
}
func (t *TimedVersion) ToProto() *livekit.TimedVersion {
t.lock.RLock()
defer t.lock.RUnlock()
return &livekit.TimedVersion{
UnixMicro: t.ts,
Ticks: t.ticks,
}
}
func (t *TimedVersion) String() string {
t.lock.RLock()
defer t.lock.RUnlock()
return fmt.Sprintf("%d.%d", t.ts, t.ticks)
}

View File

@@ -1,43 +0,0 @@
/*
* Copyright 2022 LiveKit, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestTimedVersion(t *testing.T) {
gen := NewDefaultTimedVersionGenerator()
tv1 := gen.New()
tv2 := gen.New()
tv3 := gen.New()
assert.True(t, tv3.After(tv1))
assert.True(t, tv3.After(tv2))
assert.True(t, tv2.After(tv1))
tv2.Update(tv3)
assert.True(t, tv2.After(tv1))
// tv3 and tv2 are equivalent after update
assert.False(t, tv3.After(tv2))
assert.Equal(t, 0, tv1.Compare(tv1))
assert.Equal(t, -1, tv1.Compare(tv2))
assert.Equal(t, 1, tv2.Compare(tv1))
}