From d38fc43b895d4b7c714748c65ebc06a9e157d96b Mon Sep 17 00:00:00 2001 From: David Zhao Date: Mon, 12 Jul 2021 14:50:14 -0700 Subject: [PATCH] Lock room creation to ensure atomic creation & node assignment (#46) * Lock room creation to ensure atomic creation & node assignment * more tests --- go.sum | 1 + pkg/service/errors.go | 2 + pkg/service/localroomstore.go | 12 ++ pkg/service/redisroomstore.go | 46 +++++- pkg/service/redisroomstore_test.go | 58 ++++++- pkg/service/roommanager.go | 8 + pkg/service/roomstore.go | 7 + pkg/service/servicefakes/fake_room_store.go | 158 ++++++++++++++++++++ tools/tools.go | 2 +- 9 files changed, 287 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index 96b04150b..5f73b6f32 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/pkg/service/errors.go b/pkg/service/errors.go index bfd5de8cf..1b157cae4 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -4,6 +4,8 @@ import "errors" var ( ErrRoomNotFound = errors.New("requested room does not exist") + ErrRoomLockFailed = errors.New("could not lock room") + ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match") ErrParticipantNotFound = errors.New("participant does not exist") ErrTrackNotFound = errors.New("track is not found") ) diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index 12227e88a..cb9f8ba1f 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -16,6 +16,7 @@ type LocalRoomStore struct { // map of roomName => { identity: participant } participants map[string]map[string]*livekit.ParticipantInfo lock sync.RWMutex + globalLock sync.Mutex } func NewLocalRoomStore() *LocalRoomStore { @@ -80,6 +81,17 @@ func (p *LocalRoomStore) DeleteRoom(idOrName string) error { return nil } +func (p *LocalRoomStore) LockRoom(name string, duration time.Duration) (string, error) { + // local rooms lock & unlock globally + p.globalLock.Lock() + return "", nil +} + +func (p *LocalRoomStore) UnlockRoom(name string, uid string) error { + p.globalLock.Unlock() + return nil +} + func (p *LocalRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error { p.lock.Lock() defer p.lock.Unlock() diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index 32b80d450..3951d8dff 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -5,6 +5,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/livekit/protocol/utils" "github.com/pkg/errors" "google.golang.org/protobuf/proto" @@ -22,7 +23,8 @@ const ( // a key for each room, with expiration RoomParticipantsPrefix = "room_participants:" - participantMappingTTL = 24 * time.Hour + // RoomLockPrefix is a simple key containing a provided lock uid + RoomLockPrefix = "room_lock:" ) type RedisRoomStore struct { @@ -124,6 +126,48 @@ func (p *RedisRoomStore) DeleteRoom(idOrName string) error { return err } +func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string, error) { + token := utils.NewGuid("LOCK") + key := RoomLockPrefix + name + + startTime := time.Now() + for { + locked, err := p.rc.SetNX(p.ctx, key, token, duration).Result() + if err != nil { + return "", err + } + if locked { + return token, nil + } + + // stop waiting past lock duration + if time.Now().Sub(startTime) > duration { + break + } + + time.Sleep(100 * time.Millisecond) + } + + return "", ErrRoomLockFailed +} + +func (p *RedisRoomStore) UnlockRoom(name string, uid string) error { + key := RoomLockPrefix + name + + val, err := p.rc.Get(p.ctx, key).Result() + if err == redis.Nil { + // already unlocked + return nil + } else if err != nil { + return err + } + + if val != uid { + return ErrRoomUnlockFailed + } + return p.rc.Del(p.ctx, key).Err() +} + func (p *RedisRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error { key := RoomParticipantsPrefix + roomName diff --git a/pkg/service/redisroomstore_test.go b/pkg/service/redisroomstore_test.go index 77f4f135d..a76321de4 100644 --- a/pkg/service/redisroomstore_test.go +++ b/pkg/service/redisroomstore_test.go @@ -1,18 +1,17 @@ package service_test import ( + "sync" + "sync/atomic" "testing" + "time" "github.com/livekit/livekit-server/pkg/service" livekit "github.com/livekit/livekit-server/proto" "github.com/stretchr/testify/require" ) -func TestParticipantPersisence(t *testing.T) { - if testing.Short() { - t.SkipNow() - return - } +func TestParticipantPersistence(t *testing.T) { rs := service.NewRedisRoomStore(redisClient()) roomName := "room1" @@ -57,3 +56,52 @@ func TestParticipantPersisence(t *testing.T) { _, err = rs.GetParticipant(roomName, p.Identity) require.Equal(t, err, service.ErrParticipantNotFound) } + +func TestRoomLock(t *testing.T) { + rs := service.NewRedisRoomStore(redisClient()) + lockInterval := 5 * time.Millisecond + roomName := "myroom" + + t.Run("normal locking", func(t *testing.T) { + token, err := rs.LockRoom(roomName, lockInterval) + require.NoError(t, err) + require.NotEmpty(t, token) + require.NoError(t, rs.UnlockRoom(roomName, token)) + }) + + t.Run("waits before acquiring lock", func(t *testing.T) { + token, err := rs.LockRoom(roomName, lockInterval) + require.NoError(t, err) + require.NotEmpty(t, token) + unlocked := uint32(0) + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + // attempt to lock again + defer wg.Done() + token2, err := rs.LockRoom(roomName, lockInterval) + require.NoError(t, err) + defer rs.UnlockRoom(roomName, token2) + require.Equal(t, uint32(1), atomic.LoadUint32(&unlocked)) + }() + + // release after 2 ms + time.Sleep(2 * time.Millisecond) + atomic.StoreUint32(&unlocked, 1) + rs.UnlockRoom(roomName, token) + + wg.Wait() + }) + + t.Run("lock expires", func(t *testing.T) { + token, err := rs.LockRoom(roomName, lockInterval) + require.NoError(t, err) + defer rs.UnlockRoom(roomName, token) + + time.Sleep(lockInterval + time.Millisecond) + token2, err := rs.LockRoom(roomName, lockInterval) + require.NoError(t, err) + rs.UnlockRoom(roomName, token2) + }) +} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 78bd5ddcc..4e5e84ff2 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -54,6 +54,14 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor fits state, and cleans it up when appropriate func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) { + token, err := r.roomStore.LockRoom(req.Name, 5*time.Second) + if err != nil { + return nil, err + } + defer func() { + _ = r.roomStore.UnlockRoom(req.Name, token) + }() + // find existing room and update it rm, err := r.roomStore.GetRoom(req.Name) if err == ErrRoomNotFound { diff --git a/pkg/service/roomstore.go b/pkg/service/roomstore.go index c0c75ed80..41dc3f146 100644 --- a/pkg/service/roomstore.go +++ b/pkg/service/roomstore.go @@ -1,6 +1,8 @@ package service import ( + "time" + livekit "github.com/livekit/livekit-server/proto" ) @@ -15,6 +17,11 @@ type RoomStore interface { ListRooms() ([]*livekit.Room, error) DeleteRoom(idOrName string) error + // enable locking on a specific room to prevent race + // returns a (lock uuid, error) + LockRoom(name string, duration time.Duration) (string, error) + UnlockRoom(name string, uid string) error + PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) diff --git a/pkg/service/servicefakes/fake_room_store.go b/pkg/service/servicefakes/fake_room_store.go index a71e71510..0ad87d894 100644 --- a/pkg/service/servicefakes/fake_room_store.go +++ b/pkg/service/servicefakes/fake_room_store.go @@ -3,6 +3,7 @@ package servicefakes import ( "sync" + "time" "github.com/livekit/livekit-server/pkg/service" livekit "github.com/livekit/livekit-server/proto" @@ -95,6 +96,20 @@ type FakeRoomStore struct { result1 []*livekit.Room result2 error } + LockRoomStub func(string, time.Duration) (string, error) + lockRoomMutex sync.RWMutex + lockRoomArgsForCall []struct { + arg1 string + arg2 time.Duration + } + lockRoomReturns struct { + result1 string + result2 error + } + lockRoomReturnsOnCall map[int]struct { + result1 string + result2 error + } PersistParticipantStub func(string, *livekit.ParticipantInfo) error persistParticipantMutex sync.RWMutex persistParticipantArgsForCall []struct { @@ -107,6 +122,18 @@ type FakeRoomStore struct { persistParticipantReturnsOnCall map[int]struct { result1 error } + UnlockRoomStub func(string, string) error + unlockRoomMutex sync.RWMutex + unlockRoomArgsForCall []struct { + arg1 string + arg2 string + } + unlockRoomReturns struct { + result1 error + } + unlockRoomReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -544,6 +571,71 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room }{result1, result2} } +func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) { + fake.lockRoomMutex.Lock() + ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)] + fake.lockRoomArgsForCall = append(fake.lockRoomArgsForCall, struct { + arg1 string + arg2 time.Duration + }{arg1, arg2}) + stub := fake.LockRoomStub + fakeReturns := fake.lockRoomReturns + fake.recordInvocation("LockRoom", []interface{}{arg1, arg2}) + fake.lockRoomMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRoomStore) LockRoomCallCount() int { + fake.lockRoomMutex.RLock() + defer fake.lockRoomMutex.RUnlock() + return len(fake.lockRoomArgsForCall) +} + +func (fake *FakeRoomStore) LockRoomCalls(stub func(string, time.Duration) (string, error)) { + fake.lockRoomMutex.Lock() + defer fake.lockRoomMutex.Unlock() + fake.LockRoomStub = stub +} + +func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (string, time.Duration) { + fake.lockRoomMutex.RLock() + defer fake.lockRoomMutex.RUnlock() + argsForCall := fake.lockRoomArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRoomStore) LockRoomReturns(result1 string, result2 error) { + fake.lockRoomMutex.Lock() + defer fake.lockRoomMutex.Unlock() + fake.LockRoomStub = nil + fake.lockRoomReturns = struct { + result1 string + result2 error + }{result1, result2} +} + +func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2 error) { + fake.lockRoomMutex.Lock() + defer fake.lockRoomMutex.Unlock() + fake.LockRoomStub = nil + if fake.lockRoomReturnsOnCall == nil { + fake.lockRoomReturnsOnCall = make(map[int]struct { + result1 string + result2 error + }) + } + fake.lockRoomReturnsOnCall[i] = struct { + result1 string + result2 error + }{result1, result2} +} + func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error { fake.persistParticipantMutex.Lock() ret, specificReturn := fake.persistParticipantReturnsOnCall[len(fake.persistParticipantArgsForCall)] @@ -606,6 +698,68 @@ func (fake *FakeRoomStore) PersistParticipantReturnsOnCall(i int, result1 error) }{result1} } +func (fake *FakeRoomStore) UnlockRoom(arg1 string, arg2 string) error { + fake.unlockRoomMutex.Lock() + ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)] + fake.unlockRoomArgsForCall = append(fake.unlockRoomArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.UnlockRoomStub + fakeReturns := fake.unlockRoomReturns + fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2}) + fake.unlockRoomMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRoomStore) UnlockRoomCallCount() int { + fake.unlockRoomMutex.RLock() + defer fake.unlockRoomMutex.RUnlock() + return len(fake.unlockRoomArgsForCall) +} + +func (fake *FakeRoomStore) UnlockRoomCalls(stub func(string, string) error) { + fake.unlockRoomMutex.Lock() + defer fake.unlockRoomMutex.Unlock() + fake.UnlockRoomStub = stub +} + +func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (string, string) { + fake.unlockRoomMutex.RLock() + defer fake.unlockRoomMutex.RUnlock() + argsForCall := fake.unlockRoomArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRoomStore) UnlockRoomReturns(result1 error) { + fake.unlockRoomMutex.Lock() + defer fake.unlockRoomMutex.Unlock() + fake.UnlockRoomStub = nil + fake.unlockRoomReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRoomStore) UnlockRoomReturnsOnCall(i int, result1 error) { + fake.unlockRoomMutex.Lock() + defer fake.unlockRoomMutex.Unlock() + fake.UnlockRoomStub = nil + if fake.unlockRoomReturnsOnCall == nil { + fake.unlockRoomReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.unlockRoomReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeRoomStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -623,8 +777,12 @@ func (fake *FakeRoomStore) Invocations() map[string][][]interface{} { defer fake.listParticipantsMutex.RUnlock() fake.listRoomsMutex.RLock() defer fake.listRoomsMutex.RUnlock() + fake.lockRoomMutex.RLock() + defer fake.lockRoomMutex.RUnlock() fake.persistParticipantMutex.RLock() defer fake.persistParticipantMutex.RUnlock() + fake.unlockRoomMutex.RLock() + defer fake.unlockRoomMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/tools/tools.go b/tools/tools.go index ea857b376..2fcffa217 100644 --- a/tools/tools.go +++ b/tools/tools.go @@ -3,7 +3,7 @@ package tools import ( - _ "github.com/google/wire" + _ "github.com/google/wire/cmd/wire" _ "github.com/maxbrunsfeld/counterfeiter/v6" _ "github.com/twitchtv/twirp/protoc-gen-twirp" _ "google.golang.org/protobuf/cmd/protoc-gen-go"