Lock room creation to ensure atomic creation & node assignment (#46)

* Lock room creation to ensure atomic creation & node assignment

* more tests
This commit is contained in:
David Zhao
2021-07-12 14:50:14 -07:00
committed by GitHub
parent c0bff83a36
commit d38fc43b89
9 changed files with 287 additions and 7 deletions
+1
View File
@@ -154,6 +154,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k=
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+2
View File
@@ -4,6 +4,8 @@ import "errors"
var (
ErrRoomNotFound = errors.New("requested room does not exist")
ErrRoomLockFailed = errors.New("could not lock room")
ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match")
ErrParticipantNotFound = errors.New("participant does not exist")
ErrTrackNotFound = errors.New("track is not found")
)
+12
View File
@@ -16,6 +16,7 @@ type LocalRoomStore struct {
// map of roomName => { identity: participant }
participants map[string]map[string]*livekit.ParticipantInfo
lock sync.RWMutex
globalLock sync.Mutex
}
func NewLocalRoomStore() *LocalRoomStore {
@@ -80,6 +81,17 @@ func (p *LocalRoomStore) DeleteRoom(idOrName string) error {
return nil
}
func (p *LocalRoomStore) LockRoom(name string, duration time.Duration) (string, error) {
// local rooms lock & unlock globally
p.globalLock.Lock()
return "", nil
}
func (p *LocalRoomStore) UnlockRoom(name string, uid string) error {
p.globalLock.Unlock()
return nil
}
func (p *LocalRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error {
p.lock.Lock()
defer p.lock.Unlock()
+45 -1
View File
@@ -5,6 +5,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
"github.com/livekit/protocol/utils"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
@@ -22,7 +23,8 @@ const (
// a key for each room, with expiration
RoomParticipantsPrefix = "room_participants:"
participantMappingTTL = 24 * time.Hour
// RoomLockPrefix is a simple key containing a provided lock uid
RoomLockPrefix = "room_lock:"
)
type RedisRoomStore struct {
@@ -124,6 +126,48 @@ func (p *RedisRoomStore) DeleteRoom(idOrName string) error {
return err
}
func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string, error) {
token := utils.NewGuid("LOCK")
key := RoomLockPrefix + name
startTime := time.Now()
for {
locked, err := p.rc.SetNX(p.ctx, key, token, duration).Result()
if err != nil {
return "", err
}
if locked {
return token, nil
}
// stop waiting past lock duration
if time.Now().Sub(startTime) > duration {
break
}
time.Sleep(100 * time.Millisecond)
}
return "", ErrRoomLockFailed
}
func (p *RedisRoomStore) UnlockRoom(name string, uid string) error {
key := RoomLockPrefix + name
val, err := p.rc.Get(p.ctx, key).Result()
if err == redis.Nil {
// already unlocked
return nil
} else if err != nil {
return err
}
if val != uid {
return ErrRoomUnlockFailed
}
return p.rc.Del(p.ctx, key).Err()
}
func (p *RedisRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error {
key := RoomParticipantsPrefix + roomName
+53 -5
View File
@@ -1,18 +1,17 @@
package service_test
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/livekit/livekit-server/pkg/service"
livekit "github.com/livekit/livekit-server/proto"
"github.com/stretchr/testify/require"
)
func TestParticipantPersisence(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
func TestParticipantPersistence(t *testing.T) {
rs := service.NewRedisRoomStore(redisClient())
roomName := "room1"
@@ -57,3 +56,52 @@ func TestParticipantPersisence(t *testing.T) {
_, err = rs.GetParticipant(roomName, p.Identity)
require.Equal(t, err, service.ErrParticipantNotFound)
}
func TestRoomLock(t *testing.T) {
rs := service.NewRedisRoomStore(redisClient())
lockInterval := 5 * time.Millisecond
roomName := "myroom"
t.Run("normal locking", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
require.NoError(t, err)
require.NotEmpty(t, token)
require.NoError(t, rs.UnlockRoom(roomName, token))
})
t.Run("waits before acquiring lock", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
require.NoError(t, err)
require.NotEmpty(t, token)
unlocked := uint32(0)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// attempt to lock again
defer wg.Done()
token2, err := rs.LockRoom(roomName, lockInterval)
require.NoError(t, err)
defer rs.UnlockRoom(roomName, token2)
require.Equal(t, uint32(1), atomic.LoadUint32(&unlocked))
}()
// release after 2 ms
time.Sleep(2 * time.Millisecond)
atomic.StoreUint32(&unlocked, 1)
rs.UnlockRoom(roomName, token)
wg.Wait()
})
t.Run("lock expires", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
require.NoError(t, err)
defer rs.UnlockRoom(roomName, token)
time.Sleep(lockInterval + time.Millisecond)
token2, err := rs.LockRoom(roomName, lockInterval)
require.NoError(t, err)
rs.UnlockRoom(roomName, token2)
})
}
+8
View File
@@ -54,6 +54,14 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor fits state, and cleans it up when appropriate
func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.roomStore.LockRoom(req.Name, 5*time.Second)
if err != nil {
return nil, err
}
defer func() {
_ = r.roomStore.UnlockRoom(req.Name, token)
}()
// find existing room and update it
rm, err := r.roomStore.GetRoom(req.Name)
if err == ErrRoomNotFound {
+7
View File
@@ -1,6 +1,8 @@
package service
import (
"time"
livekit "github.com/livekit/livekit-server/proto"
)
@@ -15,6 +17,11 @@ type RoomStore interface {
ListRooms() ([]*livekit.Room, error)
DeleteRoom(idOrName string) error
// enable locking on a specific room to prevent race
// returns a (lock uuid, error)
LockRoom(name string, duration time.Duration) (string, error)
UnlockRoom(name string, uid string) error
PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error
GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error)
+158
View File
@@ -3,6 +3,7 @@ package servicefakes
import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/service"
livekit "github.com/livekit/livekit-server/proto"
@@ -95,6 +96,20 @@ type FakeRoomStore struct {
result1 []*livekit.Room
result2 error
}
LockRoomStub func(string, time.Duration) (string, error)
lockRoomMutex sync.RWMutex
lockRoomArgsForCall []struct {
arg1 string
arg2 time.Duration
}
lockRoomReturns struct {
result1 string
result2 error
}
lockRoomReturnsOnCall map[int]struct {
result1 string
result2 error
}
PersistParticipantStub func(string, *livekit.ParticipantInfo) error
persistParticipantMutex sync.RWMutex
persistParticipantArgsForCall []struct {
@@ -107,6 +122,18 @@ type FakeRoomStore struct {
persistParticipantReturnsOnCall map[int]struct {
result1 error
}
UnlockRoomStub func(string, string) error
unlockRoomMutex sync.RWMutex
unlockRoomArgsForCall []struct {
arg1 string
arg2 string
}
unlockRoomReturns struct {
result1 error
}
unlockRoomReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
@@ -544,6 +571,71 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room
}{result1, result2}
}
func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) {
fake.lockRoomMutex.Lock()
ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)]
fake.lockRoomArgsForCall = append(fake.lockRoomArgsForCall, struct {
arg1 string
arg2 time.Duration
}{arg1, arg2})
stub := fake.LockRoomStub
fakeReturns := fake.lockRoomReturns
fake.recordInvocation("LockRoom", []interface{}{arg1, arg2})
fake.lockRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeRoomStore) LockRoomCallCount() int {
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
return len(fake.lockRoomArgsForCall)
}
func (fake *FakeRoomStore) LockRoomCalls(stub func(string, time.Duration) (string, error)) {
fake.lockRoomMutex.Lock()
defer fake.lockRoomMutex.Unlock()
fake.LockRoomStub = stub
}
func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (string, time.Duration) {
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
argsForCall := fake.lockRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) LockRoomReturns(result1 string, result2 error) {
fake.lockRoomMutex.Lock()
defer fake.lockRoomMutex.Unlock()
fake.LockRoomStub = nil
fake.lockRoomReturns = struct {
result1 string
result2 error
}{result1, result2}
}
func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2 error) {
fake.lockRoomMutex.Lock()
defer fake.lockRoomMutex.Unlock()
fake.LockRoomStub = nil
if fake.lockRoomReturnsOnCall == nil {
fake.lockRoomReturnsOnCall = make(map[int]struct {
result1 string
result2 error
})
}
fake.lockRoomReturnsOnCall[i] = struct {
result1 string
result2 error
}{result1, result2}
}
func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error {
fake.persistParticipantMutex.Lock()
ret, specificReturn := fake.persistParticipantReturnsOnCall[len(fake.persistParticipantArgsForCall)]
@@ -606,6 +698,68 @@ func (fake *FakeRoomStore) PersistParticipantReturnsOnCall(i int, result1 error)
}{result1}
}
func (fake *FakeRoomStore) UnlockRoom(arg1 string, arg2 string) error {
fake.unlockRoomMutex.Lock()
ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)]
fake.unlockRoomArgsForCall = append(fake.unlockRoomArgsForCall, struct {
arg1 string
arg2 string
}{arg1, arg2})
stub := fake.UnlockRoomStub
fakeReturns := fake.unlockRoomReturns
fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2})
fake.unlockRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoomStore) UnlockRoomCallCount() int {
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
return len(fake.unlockRoomArgsForCall)
}
func (fake *FakeRoomStore) UnlockRoomCalls(stub func(string, string) error) {
fake.unlockRoomMutex.Lock()
defer fake.unlockRoomMutex.Unlock()
fake.UnlockRoomStub = stub
}
func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (string, string) {
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
argsForCall := fake.unlockRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) UnlockRoomReturns(result1 error) {
fake.unlockRoomMutex.Lock()
defer fake.unlockRoomMutex.Unlock()
fake.UnlockRoomStub = nil
fake.unlockRoomReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoomStore) UnlockRoomReturnsOnCall(i int, result1 error) {
fake.unlockRoomMutex.Lock()
defer fake.unlockRoomMutex.Unlock()
fake.UnlockRoomStub = nil
if fake.unlockRoomReturnsOnCall == nil {
fake.unlockRoomReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.unlockRoomReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoomStore) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
@@ -623,8 +777,12 @@ func (fake *FakeRoomStore) Invocations() map[string][][]interface{} {
defer fake.listParticipantsMutex.RUnlock()
fake.listRoomsMutex.RLock()
defer fake.listRoomsMutex.RUnlock()
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
fake.persistParticipantMutex.RLock()
defer fake.persistParticipantMutex.RUnlock()
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
+1 -1
View File
@@ -3,7 +3,7 @@
package tools
import (
_ "github.com/google/wire"
_ "github.com/google/wire/cmd/wire"
_ "github.com/maxbrunsfeld/counterfeiter/v6"
_ "github.com/twitchtv/twirp/protoc-gen-twirp"
_ "google.golang.org/protobuf/cmd/protoc-gen-go"