Use ParticipantInfo version to ensure consistency (#399)

* Use ParticipantInfo version to ensure consistency

Deprecating time.Time and avoid locking unnecessarily

* properly adjust ulimit. update protocol

* Save initial version from params

* get rid of metadata field, use grants copy

* fix test
This commit is contained in:
David Zhao
2022-02-03 17:10:52 -08:00
committed by GitHub
parent 7bbd238188
commit 6f6d55345b
9 changed files with 59 additions and 53 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88
github.com/livekit/protocol v0.11.12
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

4
go.sum
View File

@@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88 h1:eWawullswNKmyhN5kIDp2Ba5iv2UDIluEQ5/OKilg08=
github.com/livekit/protocol v0.11.12-0.20220129153628-d79c2afbfc88/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.11.12 h1:34ehSXSDUvHAhIw/LKz6TZaSesXnTMBstbdfiIoeah0=
github.com/livekit/protocol v0.11.12/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=

View File

@@ -16,6 +16,7 @@ import (
"runtime"
"sort"
"strings"
"syscall"
"github.com/magefile/mage/mg"
@@ -125,7 +126,6 @@ func Test() error {
// run all tests including integration
func TestAll() error {
exec.Command("ulimit", "-n", "65535").Run()
mg.Deps(generateWire, macULimit)
// "-v", "-race",
cmd := exec.Command("go", "test", "./...", "-count=1", "-timeout=4m", "-v")
@@ -237,14 +237,19 @@ func connectStd(cmd *exec.Cmd) {
cmd.Stderr = os.Stderr
}
func macULimit() {
// raise ulimit if on mac
func macULimit() error {
// raise ulimit if on Mac
if runtime.GOOS != "darwin" {
return
return nil
}
cmd := exec.Command("ulimit", "-n", "10000")
connectStd(cmd)
cmd.Run()
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err
}
rLimit.Max = 10000
rLimit.Cur = 10000
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
}
// A helper checksum library that generates a fast, non-portable checksum over a directory of files

View File

@@ -57,6 +57,7 @@ type ParticipantParams struct {
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
Grants *auth.ClaimGrants
InitialVersion uint32
}
type ParticipantImpl struct {
@@ -105,6 +106,7 @@ type ParticipantImpl struct {
lock sync.RWMutex
once sync.Once
updateLock sync.Mutex
version uint32
// callbacks & handlers
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
@@ -131,6 +133,7 @@ func NewParticipant(params ParticipantParams, perms *livekit.ParticipantPermissi
subscribedTracksSettings: make(map[livekit.TrackID]*livekit.UpdateTrackSettings),
disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID),
connectedAt: time.Now(),
version: params.InitialVersion,
}
p.migrateState.Store(types.MigrateStateInit)
p.state.Store(livekit.ParticipantInfo_JOINING)
@@ -248,8 +251,7 @@ func (p *ParticipantImpl) ConnectedAt() time.Time {
// SetMetadata attaches metadata to the participant
func (p *ParticipantImpl) SetMetadata(metadata string) {
p.lock.Lock()
changed := p.metadata != metadata
p.metadata = metadata
changed := p.params.Grants.Metadata != metadata
p.params.Grants.Metadata = metadata
p.lock.Unlock()
@@ -293,13 +295,16 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
Sid: string(p.params.SID),
Identity: string(p.params.Identity),
Name: string(p.params.Name),
Metadata: p.metadata,
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
Hidden: p.Hidden(),
Recorder: p.IsRecorder(),
Version: atomic.AddUint32(&p.version, 1),
}
info.Tracks = p.UpTrackManager.ToProto()
if p.params.Grants != nil {
info.Metadata = p.params.Grants.Metadata
}
return info
}
@@ -594,26 +599,36 @@ func (p *ParticipantImpl) SendJoinResponse(
})
}
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo, updatedAt time.Time) error {
if len(participantsToUpdate) == 1 {
p.updateLock.Lock()
defer p.updateLock.Unlock()
pi := participantsToUpdate[0]
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error {
p.updateLock.Lock()
validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate))
for _, pi := range participantsToUpdate {
isValid := true
if val, ok := p.updateCache.Get(pi.Sid); ok {
if lastUpdatedAt, ok := val.(time.Time); ok {
if lastVersion, ok := val.(uint32); ok {
// this is a message delivered out of order, a more recent version of the message had already been
// sent.
if lastUpdatedAt.After(updatedAt) {
return nil
if pi.Version < lastVersion {
p.params.Logger.Debugw("skipping outdated participant update", "version", pi.Version, "lastVersion", lastVersion)
isValid = false
}
}
}
p.updateCache.Add(pi.Sid, updatedAt)
if isValid {
p.updateCache.Add(pi.Sid, pi.Version)
validUpdates = append(validUpdates, pi)
}
}
p.updateLock.Unlock()
if len(validUpdates) == 0 {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Update{
Update: &livekit.ParticipantUpdate{
Participants: participantsToUpdate,
Participants: validUpdates,
},
},
})

View File

@@ -172,18 +172,17 @@ func TestOutOfOrderUpdates(t *testing.T) {
Sid: "PA_test2",
Identity: "test2",
Metadata: "123",
Version: 2,
}
earlierTs := time.Now()
time.Sleep(time.Millisecond)
laterTs := time.Now()
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}, laterTs))
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}))
pi = &livekit.ParticipantInfo{
Sid: "PA_test2",
Identity: "test2",
Metadata: "456",
Version: 1,
}
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}, earlierTs))
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}))
// only sent once, and it's the earlier message
require.Equal(t, 1, sink.WriteMessageCallCount())

View File

@@ -281,7 +281,7 @@ func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.
p.SetResponseSink(responseSink)
updates := ToProtoParticipants(r.GetParticipants())
if err := p.SendParticipantUpdate(updates, time.Now()); err != nil {
if err := p.SendParticipantUpdate(updates); err != nil {
return err
}
@@ -699,19 +699,12 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) int {
// broadcast an update about participant p
func (r *Room) broadcastParticipantState(p types.LocalParticipant, skipSource bool) {
//
// This is a critical section to ensure that participant update time and
// the corresponding data are paired properly.
//
r.lock.Lock()
updatedAt := time.Now()
updates := ToProtoParticipants([]types.LocalParticipant{p})
r.lock.Unlock()
if p.Hidden() {
if !skipSource {
// send update only to hidden participant
err := p.SendParticipantUpdate(updates, updatedAt)
err := p.SendParticipantUpdate(updates)
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
@@ -727,7 +720,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, skipSource bo
continue
}
err := op.SendParticipantUpdate(updates, updatedAt)
err := op.SendParticipantUpdate(updates)
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())

View File

@@ -126,7 +126,7 @@ type LocalParticipant interface {
// server sent messages
SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo, updatedAt time.Time) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
SendRoomUpdate(room *livekit.Room) error

View File

@@ -379,11 +379,10 @@ type FakeLocalParticipant struct {
sendJoinResponseReturnsOnCall map[int]struct {
result1 error
}
SendParticipantUpdateStub func([]*livekit.ParticipantInfo, time.Time) error
SendParticipantUpdateStub func([]*livekit.ParticipantInfo) error
sendParticipantUpdateMutex sync.RWMutex
sendParticipantUpdateArgsForCall []struct {
arg1 []*livekit.ParticipantInfo
arg2 time.Time
}
sendParticipantUpdateReturns struct {
result1 error
@@ -2582,7 +2581,7 @@ func (fake *FakeLocalParticipant) SendJoinResponseReturnsOnCall(i int, result1 e
}{result1}
}
func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo, arg2 time.Time) error {
func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo) error {
var arg1Copy []*livekit.ParticipantInfo
if arg1 != nil {
arg1Copy = make([]*livekit.ParticipantInfo, len(arg1))
@@ -2592,14 +2591,13 @@ func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.Particip
ret, specificReturn := fake.sendParticipantUpdateReturnsOnCall[len(fake.sendParticipantUpdateArgsForCall)]
fake.sendParticipantUpdateArgsForCall = append(fake.sendParticipantUpdateArgsForCall, struct {
arg1 []*livekit.ParticipantInfo
arg2 time.Time
}{arg1Copy, arg2})
}{arg1Copy})
stub := fake.SendParticipantUpdateStub
fakeReturns := fake.sendParticipantUpdateReturns
fake.recordInvocation("SendParticipantUpdate", []interface{}{arg1Copy, arg2})
fake.recordInvocation("SendParticipantUpdate", []interface{}{arg1Copy})
fake.sendParticipantUpdateMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -2613,17 +2611,17 @@ func (fake *FakeLocalParticipant) SendParticipantUpdateCallCount() int {
return len(fake.sendParticipantUpdateArgsForCall)
}
func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo, time.Time) error) {
func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo) error) {
fake.sendParticipantUpdateMutex.Lock()
defer fake.sendParticipantUpdateMutex.Unlock()
fake.SendParticipantUpdateStub = stub
}
func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) ([]*livekit.ParticipantInfo, time.Time) {
func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []*livekit.ParticipantInfo {
fake.sendParticipantUpdateMutex.RLock()
defer fake.sendParticipantUpdateMutex.RUnlock()
argsForCall := fake.sendParticipantUpdateArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SendParticipantUpdateReturns(result1 error) {

View File

@@ -254,10 +254,6 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam
return
}
if pi.Metadata != "" {
participant.SetMetadata(pi.Metadata)
}
// join room
opts := rtc.ParticipantOptions{
AutoSubscribe: pi.AutoSubscribe,