diff --git a/go.mod b/go.mod index f408b7cac..524e8a0f8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88 + github.com/livekit/protocol v0.11.12 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index d7d4fd23e..6ea9a1ed1 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88 h1:eWawullswNKmyhN5kIDp2Ba5iv2UDIluEQ5/OKilg08= -github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.12 h1:34ehSXSDUvHAhIw/LKz6TZaSesXnTMBstbdfiIoeah0= +github.com/livekit/protocol v0.11.12/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/magefile.go b/magefile.go index 0372f0ade..2f91b4505 100644 --- a/magefile.go +++ b/magefile.go @@ -16,6 +16,7 @@ import ( "runtime" "sort" "strings" + "syscall" "github.com/magefile/mage/mg" @@ -125,7 +126,6 @@ func Test() error { // run all tests including integration func TestAll() error { - exec.Command("ulimit", "-n", "65535").Run() mg.Deps(generateWire, macULimit) // "-v", "-race", cmd := exec.Command("go", "test", "./...", "-count=1", "-timeout=4m", "-v") @@ -237,14 +237,19 @@ func connectStd(cmd *exec.Cmd) { cmd.Stderr = os.Stderr } -func macULimit() { - // raise ulimit if on mac +func macULimit() error { + // raise ulimit if on Mac if runtime.GOOS != "darwin" { - return + return nil } - cmd := exec.Command("ulimit", "-n", "10000") - connectStd(cmd) - cmd.Run() + var rLimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + if err != nil { + return err + } + rLimit.Max = 10000 + rLimit.Cur = 10000 + return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) } // A helper checksum library that generates a fast, non-portable checksum over a directory of files diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index c701127f6..541c8798d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -57,6 +57,7 @@ type ParticipantParams struct { Logger logger.Logger SimTracks map[uint32]SimulcastTrackInfo Grants *auth.ClaimGrants + InitialVersion uint32 } type ParticipantImpl struct { @@ -105,6 +106,7 @@ type ParticipantImpl struct { lock sync.RWMutex once sync.Once updateLock sync.Mutex + version uint32 // callbacks & handlers onTrackPublished func(types.LocalParticipant, types.MediaTrack) @@ -131,6 +133,7 @@ func NewParticipant(params ParticipantParams, perms *livekit.ParticipantPermissi subscribedTracksSettings: make(map[livekit.TrackID]*livekit.UpdateTrackSettings), disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID), connectedAt: time.Now(), + version: params.InitialVersion, } p.migrateState.Store(types.MigrateStateInit) p.state.Store(livekit.ParticipantInfo_JOINING) @@ -248,8 +251,7 @@ func (p *ParticipantImpl) ConnectedAt() time.Time { // SetMetadata attaches metadata to the participant func (p *ParticipantImpl) SetMetadata(metadata string) { p.lock.Lock() - changed := p.metadata != metadata - p.metadata = metadata + changed := p.params.Grants.Metadata != metadata p.params.Grants.Metadata = metadata p.lock.Unlock() @@ -293,13 +295,16 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { Sid: string(p.params.SID), Identity: string(p.params.Identity), Name: string(p.params.Name), - Metadata: p.metadata, State: p.State(), JoinedAt: p.ConnectedAt().Unix(), Hidden: p.Hidden(), Recorder: p.IsRecorder(), + Version: atomic.AddUint32(&p.version, 1), } info.Tracks = p.UpTrackManager.ToProto() + if p.params.Grants != nil { + info.Metadata = p.params.Grants.Metadata + } return info } @@ -594,26 +599,36 @@ func (p *ParticipantImpl) SendJoinResponse( }) } -func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo, updatedAt time.Time) error { - if len(participantsToUpdate) == 1 { - p.updateLock.Lock() - defer p.updateLock.Unlock() - pi := participantsToUpdate[0] +func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { + p.updateLock.Lock() + validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate)) + for _, pi := range participantsToUpdate { + isValid := true if val, ok := p.updateCache.Get(pi.Sid); ok { - if lastUpdatedAt, ok := val.(time.Time); ok { + if lastVersion, ok := val.(uint32); ok { // this is a message delivered out of order, a more recent version of the message had already been // sent. - if lastUpdatedAt.After(updatedAt) { - return nil + if pi.Version < lastVersion { + p.params.Logger.Debugw("skipping outdated participant update", "version", pi.Version, "lastVersion", lastVersion) + isValid = false } } } - p.updateCache.Add(pi.Sid, updatedAt) + if isValid { + p.updateCache.Add(pi.Sid, pi.Version) + validUpdates = append(validUpdates, pi) + } } + p.updateLock.Unlock() + + if len(validUpdates) == 0 { + return nil + } + return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Update{ Update: &livekit.ParticipantUpdate{ - Participants: participantsToUpdate, + Participants: validUpdates, }, }, }) diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 58d260a1b..ded70e38a 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -172,18 +172,17 @@ func TestOutOfOrderUpdates(t *testing.T) { Sid: "PA_test2", Identity: "test2", Metadata: "123", + Version: 2, } - earlierTs := time.Now() - time.Sleep(time.Millisecond) - laterTs := time.Now() - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}, laterTs)) + require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})) pi = &livekit.ParticipantInfo{ Sid: "PA_test2", Identity: "test2", Metadata: "456", + Version: 1, } - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}, earlierTs)) + require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})) // only sent once, and it's the earlier message require.Equal(t, 1, sink.WriteMessageCallCount()) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 243560fa8..6187de09e 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -281,7 +281,7 @@ func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing. p.SetResponseSink(responseSink) updates := ToProtoParticipants(r.GetParticipants()) - if err := p.SendParticipantUpdate(updates, time.Now()); err != nil { + if err := p.SendParticipantUpdate(updates); err != nil { return err } @@ -699,19 +699,12 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) int { // broadcast an update about participant p func (r *Room) broadcastParticipantState(p types.LocalParticipant, skipSource bool) { - // - // This is a critical section to ensure that participant update time and - // the corresponding data are paired properly. - // - r.lock.Lock() - updatedAt := time.Now() updates := ToProtoParticipants([]types.LocalParticipant{p}) - r.lock.Unlock() if p.Hidden() { if !skipSource { // send update only to hidden participant - err := p.SendParticipantUpdate(updates, updatedAt) + err := p.SendParticipantUpdate(updates) if err != nil { r.Logger.Errorw("could not send update to participant", err, "participant", p.Identity(), "pID", p.ID()) @@ -727,7 +720,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, skipSource bo continue } - err := op.SendParticipantUpdate(updates, updatedAt) + err := op.SendParticipantUpdate(updates) if err != nil { r.Logger.Errorw("could not send update to participant", err, "participant", p.Identity(), "pID", p.ID()) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 39667493f..9482b61fe 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -126,7 +126,7 @@ type LocalParticipant interface { // server sent messages SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error - SendParticipantUpdate(participants []*livekit.ParticipantInfo, updatedAt time.Time) error + SendParticipantUpdate(participants []*livekit.ParticipantInfo) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error SendRoomUpdate(room *livekit.Room) error diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 4b629c104..09d83a8f7 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -379,11 +379,10 @@ type FakeLocalParticipant struct { sendJoinResponseReturnsOnCall map[int]struct { result1 error } - SendParticipantUpdateStub func([]*livekit.ParticipantInfo, time.Time) error + SendParticipantUpdateStub func([]*livekit.ParticipantInfo) error sendParticipantUpdateMutex sync.RWMutex sendParticipantUpdateArgsForCall []struct { arg1 []*livekit.ParticipantInfo - arg2 time.Time } sendParticipantUpdateReturns struct { result1 error @@ -2582,7 +2581,7 @@ func (fake *FakeLocalParticipant) SendJoinResponseReturnsOnCall(i int, result1 e }{result1} } -func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo, arg2 time.Time) error { +func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo) error { var arg1Copy []*livekit.ParticipantInfo if arg1 != nil { arg1Copy = make([]*livekit.ParticipantInfo, len(arg1)) @@ -2592,14 +2591,13 @@ func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.Particip ret, specificReturn := fake.sendParticipantUpdateReturnsOnCall[len(fake.sendParticipantUpdateArgsForCall)] fake.sendParticipantUpdateArgsForCall = append(fake.sendParticipantUpdateArgsForCall, struct { arg1 []*livekit.ParticipantInfo - arg2 time.Time - }{arg1Copy, arg2}) + }{arg1Copy}) stub := fake.SendParticipantUpdateStub fakeReturns := fake.sendParticipantUpdateReturns - fake.recordInvocation("SendParticipantUpdate", []interface{}{arg1Copy, arg2}) + fake.recordInvocation("SendParticipantUpdate", []interface{}{arg1Copy}) fake.sendParticipantUpdateMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1) } if specificReturn { return ret.result1 @@ -2613,17 +2611,17 @@ func (fake *FakeLocalParticipant) SendParticipantUpdateCallCount() int { return len(fake.sendParticipantUpdateArgsForCall) } -func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo, time.Time) error) { +func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo) error) { fake.sendParticipantUpdateMutex.Lock() defer fake.sendParticipantUpdateMutex.Unlock() fake.SendParticipantUpdateStub = stub } -func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) ([]*livekit.ParticipantInfo, time.Time) { +func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []*livekit.ParticipantInfo { fake.sendParticipantUpdateMutex.RLock() defer fake.sendParticipantUpdateMutex.RUnlock() argsForCall := fake.sendParticipantUpdateArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1 } func (fake *FakeLocalParticipant) SendParticipantUpdateReturns(result1 error) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5977855df..92c892b40 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -254,10 +254,6 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam return } - if pi.Metadata != "" { - participant.SetMetadata(pi.Metadata) - } - // join room opts := rtc.ParticipantOptions{ AutoSubscribe: pi.AutoSubscribe,