diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 71d32ddd8..a0ee6a131 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -107,7 +107,7 @@ type ParticipantImpl struct { lock sync.RWMutex once sync.Once updateLock sync.Mutex - version uint32 + version atomic.Uint32 dataTrack *DataTrack @@ -137,8 +137,8 @@ func NewParticipant(params ParticipantParams, perms *livekit.ParticipantPermissi disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), rttUpdatedAt: time.Now(), - version: params.InitialVersion, } + p.version.Store(params.InitialVersion) p.migrateState.Store(types.MigrateStateInit) p.state.Store(livekit.ParticipantInfo_JOINING) p.SetPermission(perms) @@ -307,7 +307,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { JoinedAt: p.ConnectedAt().Unix(), Hidden: p.Hidden(), Recorder: p.IsRecorder(), - Version: atomic.NewUint32(p.version).Inc(), + Version: p.version.Inc(), } info.Tracks = p.UpTrackManager.ToProto() if p.params.Grants != nil { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 7c19a0470..c92f83833 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" @@ -167,27 +168,22 @@ func TestTrackPublishing(t *testing.T) { func TestOutOfOrderUpdates(t *testing.T) { p := newParticipantForTest("test") + p.SetMetadata("initial metadata") sink := p.GetResponseSink().(*routingfakes.FakeMessageSink) - pi := &livekit.ParticipantInfo{ - Sid: "PA_test2", - Identity: "test2", - Metadata: "123", - Version: 2, - } - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})) + pi1 := p.ToProto() + p.SetMetadata("second update") + pi2 := p.ToProto() - pi = &livekit.ParticipantInfo{ - Sid: "PA_test2", - Identity: "test2", - Metadata: "456", - Version: 1, - } - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})) + require.Greater(t, pi2.Version, pi1.Version) + + // send the second update first + require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi2})) + require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi1})) // only sent once, and it's the earlier message require.Equal(t, 1, sink.WriteMessageCallCount()) sent := sink.WriteMessageArgsForCall(0).(*livekit.SignalResponse) - require.Equal(t, "123", sent.GetUpdate().Participants[0].Metadata) + require.Equal(t, "second update", sent.GetUpdate().Participants[0].Metadata) } // after disconnection, things should continue to function and not panic @@ -373,6 +369,9 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p Sink: &routingfakes.FakeMessageSink{}, ProtocolVersion: opts.protocolVersion, PLIThrottleConfig: conf.RTC.PLIThrottle, + Grants: &auth.ClaimGrants{ + Video: &auth.VideoGrant{}, + }, }, opts.permissions) return p } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index e140ea8c4..0b85c353e 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -9,8 +9,6 @@ package service import ( "context" "fmt" - "os" - "github.com/go-redis/redis/v8" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -21,6 +19,7 @@ import ( "github.com/livekit/protocol/webhook" "github.com/pkg/errors" "gopkg.in/yaml.v3" + "os" ) // Injectors from wire.go: