Handle SimulateScenario requests (#330)

* Handle SimulateScenario requests

* actually use sendLeave

* fix local test failures due to filehandle limit
This commit is contained in:
David Zhao
2022-01-11 00:37:24 -08:00
committed by GitHub
parent e82b8ea0a0
commit c2ba26eee6
10 changed files with 157 additions and 24 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba
github.com/livekit/protocol v0.11.9
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

4
go.sum
View File

@@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba h1:JFvsAPoALVyLzxyOZbknHZXqkQ867iMUKduBcZiD0D8=
github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.11.9 h1:P4V5u1qlcBUlmVSbnBaL+aoeP/qLBYsFOZmk3gRoitA=
github.com/livekit/protocol v0.11.9/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=

View File

@@ -13,6 +13,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"sort"
"strings"
@@ -116,7 +117,7 @@ func PublishDocker() error {
// run unit tests, skipping integration
func Test() error {
mg.Deps(generateWire)
mg.Deps(generateWire, macULimit)
cmd := exec.Command("go", "test", "-short", "./...")
connectStd(cmd)
return cmd.Run()
@@ -124,7 +125,7 @@ func Test() error {
// run all tests including integration
func TestAll() error {
mg.Deps(generateWire)
mg.Deps(generateWire, macULimit)
// "-v", "-race",
cmd := exec.Command("go", "test", "./...", "-count=1", "-timeout=4m")
connectStd(cmd)
@@ -235,6 +236,16 @@ func connectStd(cmd *exec.Cmd) {
cmd.Stderr = os.Stderr
}
func macULimit() {
// raise ulimit if on mac
if runtime.GOOS != "darwin" {
return
}
cmd := exec.Command("ulimit", "-n", "10000")
connectStd(cmd)
cmd.Run()
}
// A helper checksum library that generates a fast, non-portable checksum over a directory of files
// it's designed as a quick way to bypass
type Checksummer struct {

View File

@@ -399,18 +399,20 @@ func (p *ParticipantImpl) Start() {
})
}
func (p *ParticipantImpl) Close() error {
func (p *ParticipantImpl) Close(sendLeave bool) error {
if !p.isClosed.TrySet(true) {
// already closed
return nil
}
// send leave message
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{},
},
})
if sendLeave {
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{},
},
})
}
p.LocalParticipant.Close()
@@ -1000,7 +1002,7 @@ func (p *ParticipantImpl) handlePrimaryStateChange(state webrtc.PeerConnectionSt
} else if state == webrtc.PeerConnectionStateFailed {
// only close when failed, to allow clients opportunity to reconnect
go func() {
_ = p.Close()
_ = p.Close(false)
}()
}
}

View File

@@ -321,7 +321,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
p.OnDataPacket(nil)
// close participant as well
_ = p.Close()
_ = p.Close(true)
r.lock.RLock()
if len(r.participants) == 0 {
@@ -505,6 +505,34 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) {
r.onMetadataUpdate = f
}
func (r *Room) SimulateScenario(participant types.Participant, simulateScenario *livekit.SimulateScenario) error {
switch scenario := simulateScenario.Scenario.(type) {
case *livekit.SimulateScenario_SpeakerUpdate:
r.Logger.Infow("simulating speaker update", "participant", participant.Identity())
go func() {
<-time.After(time.Duration(scenario.SpeakerUpdate) * time.Second)
r.sendSpeakerChanges([]*livekit.SpeakerInfo{{
Sid: string(participant.ID()),
Active: false,
Level: 0,
}})
}()
r.sendSpeakerChanges([]*livekit.SpeakerInfo{{
Sid: string(participant.ID()),
Active: true,
Level: 0.9,
}})
case *livekit.SimulateScenario_Migration:
case *livekit.SimulateScenario_NodeFailure:
r.Logger.Infow("simulating node failure", "participant", participant.Identity())
// drop participant without necessarily cleaning up
if err := participant.Close(false); err != nil {
return err
}
}
return nil
}
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
func (r *Room) autoSubscribe(participant types.Participant) bool {
if !participant.CanSubscribe() {

View File

@@ -78,7 +78,7 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req
return nil
}
case *livekit.SignalRequest_Leave:
_ = participant.Close()
_ = participant.Close(true)
case *livekit.SignalRequest_SubscriptionPermissions:
err := room.UpdateSubscriptionPermissions(participant, msg.SubscriptionPermissions)
if err != nil {
@@ -91,6 +91,12 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req
pLogger.Warnw("could not sync subscribe state", err,
"state", msg.SyncState)
}
case *livekit.SignalRequest_Simulate:
err := room.SimulateScenario(participant, msg.Simulate)
if err != nil {
pLogger.Warnw("could not simulate scenario", err,
"simulate", msg.Simulate)
}
}
return nil
}

View File

@@ -126,7 +126,7 @@ type Participant interface {
Hidden() bool
Start()
Close() error
Close(sendLeave bool) error
// callbacks
// OnTrackPublished - remote added a remoteTrack
@@ -145,6 +145,7 @@ type Room interface {
UpdateSubscriptions(participant Participant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error
UpdateSubscriptionPermissions(participant Participant, permissions *livekit.UpdateSubscriptionPermissions) error
SyncState(participant Participant, state *livekit.SyncState) error
SimulateScenario(participant Participant, scenario *livekit.SimulateScenario) error
UpdateVideoLayers(participant Participant, updateVideoLayers *livekit.UpdateVideoLayers) error
}

View File

@@ -84,9 +84,10 @@ type FakeParticipant struct {
canSubscribeReturnsOnCall map[int]struct {
result1 bool
}
CloseStub func() error
CloseStub func(bool) error
closeMutex sync.RWMutex
closeArgsForCall []struct {
arg1 bool
}
closeReturns struct {
result1 error
@@ -945,17 +946,18 @@ func (fake *FakeParticipant) CanSubscribeReturnsOnCall(i int, result1 bool) {
}{result1}
}
func (fake *FakeParticipant) Close() error {
func (fake *FakeParticipant) Close(arg1 bool) error {
fake.closeMutex.Lock()
ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)]
fake.closeArgsForCall = append(fake.closeArgsForCall, struct {
}{})
arg1 bool
}{arg1})
stub := fake.CloseStub
fakeReturns := fake.closeReturns
fake.recordInvocation("Close", []interface{}{})
fake.recordInvocation("Close", []interface{}{arg1})
fake.closeMutex.Unlock()
if stub != nil {
return stub()
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -969,12 +971,19 @@ func (fake *FakeParticipant) CloseCallCount() int {
return len(fake.closeArgsForCall)
}
func (fake *FakeParticipant) CloseCalls(stub func() error) {
func (fake *FakeParticipant) CloseCalls(stub func(bool) error) {
fake.closeMutex.Lock()
defer fake.closeMutex.Unlock()
fake.CloseStub = stub
}
func (fake *FakeParticipant) CloseArgsForCall(i int) bool {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
argsForCall := fake.closeArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) CloseReturns(result1 error) {
fake.closeMutex.Lock()
defer fake.closeMutex.Unlock()

View File

@@ -19,6 +19,18 @@ type FakeRoom struct {
nameReturnsOnCall map[int]struct {
result1 livekit.RoomName
}
SimulateScenarioStub func(types.Participant, *livekit.SimulateScenario) error
simulateScenarioMutex sync.RWMutex
simulateScenarioArgsForCall []struct {
arg1 types.Participant
arg2 *livekit.SimulateScenario
}
simulateScenarioReturns struct {
result1 error
}
simulateScenarioReturnsOnCall map[int]struct {
result1 error
}
SyncStateStub func(types.Participant, *livekit.SyncState) error
syncStateMutex sync.RWMutex
syncStateArgsForCall []struct {
@@ -126,6 +138,68 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) {
}{result1}
}
func (fake *FakeRoom) SimulateScenario(arg1 types.Participant, arg2 *livekit.SimulateScenario) error {
fake.simulateScenarioMutex.Lock()
ret, specificReturn := fake.simulateScenarioReturnsOnCall[len(fake.simulateScenarioArgsForCall)]
fake.simulateScenarioArgsForCall = append(fake.simulateScenarioArgsForCall, struct {
arg1 types.Participant
arg2 *livekit.SimulateScenario
}{arg1, arg2})
stub := fake.SimulateScenarioStub
fakeReturns := fake.simulateScenarioReturns
fake.recordInvocation("SimulateScenario", []interface{}{arg1, arg2})
fake.simulateScenarioMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoom) SimulateScenarioCallCount() int {
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
return len(fake.simulateScenarioArgsForCall)
}
func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.Participant, *livekit.SimulateScenario) error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = stub
}
func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.Participant, *livekit.SimulateScenario) {
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
argsForCall := fake.simulateScenarioArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoom) SimulateScenarioReturns(result1 error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = nil
fake.simulateScenarioReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) SimulateScenarioReturnsOnCall(i int, result1 error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = nil
if fake.simulateScenarioReturnsOnCall == nil {
fake.simulateScenarioReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.simulateScenarioReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) SyncState(arg1 types.Participant, arg2 *livekit.SyncState) error {
fake.syncStateMutex.Lock()
ret, specificReturn := fake.syncStateReturnsOnCall[len(fake.syncStateArgsForCall)]
@@ -391,6 +465,8 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.nameMutex.RLock()
defer fake.nameMutex.RUnlock()
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
fake.syncStateMutex.RLock()
defer fake.syncStateMutex.RUnlock()
fake.updateSubscriptionPermissionsMutex.RLock()

View File

@@ -157,7 +157,7 @@ func (r *RoomManager) Stop() {
for _, room := range rooms {
for _, p := range room.GetParticipants() {
_ = p.Close()
_ = p.Close(true)
}
room.Close()
}
@@ -352,7 +352,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
"room", room.Room.Name,
"roomID", room.Room.Sid,
)
_ = participant.Close()
_ = participant.Close(true)
}()
defer rtc.Recover()
@@ -434,7 +434,7 @@ func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomN
}
case *livekit.RTCNodeMessage_DeleteRoom:
for _, p := range room.GetParticipants() {
_ = p.Close()
_ = p.Close(true)
}
room.Close()
case *livekit.RTCNodeMessage_UpdateSubscriptions: