diff --git a/go.mod b/go.mod index 135eb5bcf..b15f7bf03 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 - github.com/livekit/protocol v1.12.0 + github.com/livekit/protocol v1.12.1-0.20240321094538-0d9caadf760e github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 2c88567ea..670d00671 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.0 h1:B7qsqq5xf9MmyG9WEk9/gMsfMVXuyLNxX5cO6TQil6s= -github.com/livekit/protocol v1.12.0/go.mod h1:G7Pa985GhZv2MCC3UnUocBhZfi3DsWA6WmlSkkpQYTM= +github.com/livekit/protocol v1.12.1-0.20240321094538-0d9caadf760e h1:XR7vPLN7c/R6R87UARoBW2csVKd7RuTXwG+XsjczbT0= +github.com/livekit/protocol v1.12.1-0.20240321094538-0d9caadf760e/go.mod h1:G7Pa985GhZv2MCC3UnUocBhZfi3DsWA6WmlSkkpQYTM= github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30 h1:3GEU6vP+KLTTOEqsFKW+PgIUp+i+s0jaUqogQc/hb7M= github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6617e056a..5f8f7e60e 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -265,7 +265,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { } p.closeReason.Store(types.ParticipantCloseReasonNone) p.version.Store(params.InitialVersion) - p.timedVersion.Update(params.VersionGenerator.New()) + p.timedVersion.Update(params.VersionGenerator.Next()) p.migrateState.Store(types.MigrateStateInit) p.state.Store(livekit.ParticipantInfo_JOINING) p.grants = params.Grants @@ -495,16 +495,20 @@ func (p *ParticipantImpl) CanSkipBroadcast() bool { } func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) { - v := p.version.Load() - piv := p.timedVersion.Load() - if p.dirty.Swap(false) { - v = p.version.Inc() - piv = p.params.VersionGenerator.Next() - p.timedVersion.Update(&piv) + if p.dirty.Load() { + p.lock.Lock() + if p.dirty.Swap(false) { + p.version.Inc() + p.timedVersion.Update(p.params.VersionGenerator.Next()) + } + p.lock.Unlock() } grants := p.ClaimGrants() p.lock.RLock() + v := p.version.Load() + piv := p.timedVersion + pi := &livekit.ParticipantInfo{ Sid: string(p.params.SID), Identity: string(p.params.Identity), @@ -1988,9 +1992,9 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei } ti.MimeType = track.Codec().MimeType - if utils.NewTimedVersionFromProto(ti.Version).IsZero() { + if utils.TimedVersionFromProto(ti.Version).IsZero() { // only assign version on a fresh publish, i. e. avoid updating version in scenarios like migration - ti.Version = p.params.VersionGenerator.New().ToProto() + ti.Version = p.params.VersionGenerator.Next().ToProto() } mt = p.addMediaTrack(signalCid, track.ID(), ti) newTrack = true diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5cfe8f5da..e50e84b83 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -710,7 +710,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync } func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error { - if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipantByID); err != nil { + if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion(0), r.GetParticipantByID); err != nil { return err } for _, track := range participant.GetPublishedTracks() { diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 3cdf12d4e..eff9b8358 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -56,7 +56,7 @@ type SubscribedTrack struct { versionGenerator utils.TimedVersionGenerator settingsLock sync.Mutex settings *livekit.UpdateTrackSettings - settingsVersion *utils.TimedVersion + settingsVersion utils.TimedVersion bindLock sync.Mutex bound bool @@ -243,7 +243,7 @@ func (t *SubscribedTrack) applySettings() { } t.logger.Debugw("updating subscriber track settings", "settings", logger.Proto(t.settings)) - t.settingsVersion = t.versionGenerator.New() + t.settingsVersion = t.versionGenerator.Next() settingsVersion := t.settingsVersion t.settingsLock.Unlock() @@ -264,7 +264,7 @@ func (t *SubscribedTrack) applySettings() { } t.settingsLock.Lock() - if settingsVersion.Compare(t.settingsVersion) != 0 { + if settingsVersion != t.settingsVersion { // a newer settings has superceded this one t.settingsLock.Unlock() return diff --git a/pkg/rtc/testutils.go b/pkg/rtc/testutils.go index b63c372a8..e233fa5f3 100644 --- a/pkg/rtc/testutils.go +++ b/pkg/rtc/testutils.go @@ -45,7 +45,7 @@ func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro Identity: string(identity), State: livekit.ParticipantInfo_JOINED, IsPublisher: publisher, - }, utils.TimedVersion{}) + }, utils.TimedVersion(0)) p.SetMetadataCalls(func(m string) { var f func(participant types.LocalParticipant) diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 904b3ea66..57cf372ac 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -166,7 +166,7 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( // we do not want to initialize subscriptionPermissionVersion too early since if another machine is the // owner for the data, we'd prefer to use their TimedVersion // ignore older version - if !timedVersion.After(&u.subscriptionPermissionVersion) { + if !timedVersion.After(u.subscriptionPermissionVersion) { u.params.Logger.Debugw( "skipping older subscription permission version", "existingValue", logger.Proto(u.subscriptionPermission), @@ -177,10 +177,10 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( u.lock.Unlock() return nil } - u.subscriptionPermissionVersion.Update(&timedVersion) + u.subscriptionPermissionVersion.Update(timedVersion) } else { // for requests coming from the current node, use local versions - u.subscriptionPermissionVersion.Update(u.params.VersionGenerator.New()) + u.subscriptionPermissionVersion.Update(u.params.VersionGenerator.Next()) } // store as is for use when migrating @@ -188,7 +188,7 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( if subscriptionPermission == nil { u.params.Logger.Debugw( "updating subscription permission, setting to nil", - "version", &u.subscriptionPermissionVersion, + "version", u.subscriptionPermissionVersion, ) // possible to get a nil when migrating u.lock.Unlock() @@ -198,7 +198,7 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( u.params.Logger.Debugw( "updating subscription permission", "permissions", logger.Proto(u.subscriptionPermission), - "version", &u.subscriptionPermissionVersion, + "version", u.subscriptionPermissionVersion, ) if err := u.parseSubscriptionPermissionsLocked(subscriptionPermission, func(pID livekit.ParticipantID) types.LocalParticipant { u.lock.Unlock() diff --git a/pkg/rtc/uptrackmanager_test.go b/pkg/rtc/uptrackmanager_test.go index 5556bc023..2bee32e38 100644 --- a/pkg/rtc/uptrackmanager_test.go +++ b/pkg/rtc/uptrackmanager_test.go @@ -211,8 +211,8 @@ func TestUpdateSubscriptionPermission(t *testing.T) { um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v0, nil) require.Equal(t, v2.Load(), um.subscriptionPermissionVersion.Load(), "out of order updates should be ignored") - um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion{}, nil) - require.True(t, um.subscriptionPermissionVersion.After(&v2), "zero version in updates should use next local version") + um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion(0), nil) + require.True(t, um.subscriptionPermissionVersion.After(v2), "zero version in updates should use next local version") }) }