From c2ba26eee61aacee39996fdd42373bb1104cdbe7 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 11 Jan 2022 00:37:24 -0800 Subject: [PATCH] Handle SimulateScenario requests (#330) * Handle SimulateScenario requests * actually use sendLeave * fix local test failures due to filehandle limit --- go.mod | 2 +- go.sum | 4 +- magefile.go | 15 +++- pkg/rtc/participant.go | 16 +++-- pkg/rtc/room.go | 30 +++++++- pkg/rtc/signalhandler.go | 8 ++- pkg/rtc/types/interfaces.go | 3 +- pkg/rtc/types/typesfakes/fake_participant.go | 21 ++++-- pkg/rtc/types/typesfakes/fake_room.go | 76 ++++++++++++++++++++ pkg/service/roommanager.go | 6 +- 10 files changed, 157 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 6f20faf77..99a7c4740 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba + github.com/livekit/protocol v0.11.9 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 9fbb09ff7..76fecb7d2 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba h1:JFvsAPoALVyLzxyOZbknHZXqkQ867iMUKduBcZiD0D8= -github.com/livekit/protocol v0.11.8-0.20220108052220-a7f937bb7bba/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.9 h1:P4V5u1qlcBUlmVSbnBaL+aoeP/qLBYsFOZmk3gRoitA= +github.com/livekit/protocol v0.11.9/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/magefile.go b/magefile.go index d5e208118..2558a2cc3 100644 --- a/magefile.go +++ b/magefile.go @@ -13,6 +13,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "sort" "strings" @@ -116,7 +117,7 @@ func PublishDocker() error { // run unit tests, skipping integration func Test() error { - mg.Deps(generateWire) + mg.Deps(generateWire, macULimit) cmd := exec.Command("go", "test", "-short", "./...") connectStd(cmd) return cmd.Run() @@ -124,7 +125,7 @@ func Test() error { // run all tests including integration func TestAll() error { - mg.Deps(generateWire) + mg.Deps(generateWire, macULimit) // "-v", "-race", cmd := exec.Command("go", "test", "./...", "-count=1", "-timeout=4m") connectStd(cmd) @@ -235,6 +236,16 @@ func connectStd(cmd *exec.Cmd) { cmd.Stderr = os.Stderr } +func macULimit() { + // raise ulimit if on mac + if runtime.GOOS != "darwin" { + return + } + cmd := exec.Command("ulimit", "-n", "10000") + connectStd(cmd) + cmd.Run() +} + // A helper checksum library that generates a fast, non-portable checksum over a directory of files // it's designed as a quick way to bypass type Checksummer struct { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 733045be3..08e7631be 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -399,18 +399,20 @@ func (p *ParticipantImpl) Start() { }) } -func (p *ParticipantImpl) Close() error { +func (p *ParticipantImpl) Close(sendLeave bool) error { if !p.isClosed.TrySet(true) { // already closed return nil } // send leave message - _ = p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Leave{ - Leave: &livekit.LeaveRequest{}, - }, - }) + if sendLeave { + _ = p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_Leave{ + Leave: &livekit.LeaveRequest{}, + }, + }) + } p.LocalParticipant.Close() @@ -1000,7 +1002,7 @@ func (p *ParticipantImpl) handlePrimaryStateChange(state webrtc.PeerConnectionSt } else if state == webrtc.PeerConnectionStateFailed { // only close when failed, to allow clients opportunity to reconnect go func() { - _ = p.Close() + _ = p.Close(false) }() } } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 2854ad1b9..194755b78 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -321,7 +321,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) { p.OnDataPacket(nil) // close participant as well - _ = p.Close() + _ = p.Close(true) r.lock.RLock() if len(r.participants) == 0 { @@ -505,6 +505,34 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) { r.onMetadataUpdate = f } +func (r *Room) SimulateScenario(participant types.Participant, simulateScenario *livekit.SimulateScenario) error { + switch scenario := simulateScenario.Scenario.(type) { + case *livekit.SimulateScenario_SpeakerUpdate: + r.Logger.Infow("simulating speaker update", "participant", participant.Identity()) + go func() { + <-time.After(time.Duration(scenario.SpeakerUpdate) * time.Second) + r.sendSpeakerChanges([]*livekit.SpeakerInfo{{ + Sid: string(participant.ID()), + Active: false, + Level: 0, + }}) + }() + r.sendSpeakerChanges([]*livekit.SpeakerInfo{{ + Sid: string(participant.ID()), + Active: true, + Level: 0.9, + }}) + case *livekit.SimulateScenario_Migration: + case *livekit.SimulateScenario_NodeFailure: + r.Logger.Infow("simulating node failure", "participant", participant.Identity()) + // drop participant without necessarily cleaning up + if err := participant.Close(false); err != nil { + return err + } + } + return nil +} + // checks if participant should be autosubscribed to new tracks, assumes lock is already acquired func (r *Room) autoSubscribe(participant types.Participant) bool { if !participant.CanSubscribe() { diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 2210ae980..00351eb5f 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -78,7 +78,7 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req return nil } case *livekit.SignalRequest_Leave: - _ = participant.Close() + _ = participant.Close(true) case *livekit.SignalRequest_SubscriptionPermissions: err := room.UpdateSubscriptionPermissions(participant, msg.SubscriptionPermissions) if err != nil { @@ -91,6 +91,12 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req pLogger.Warnw("could not sync subscribe state", err, "state", msg.SyncState) } + case *livekit.SignalRequest_Simulate: + err := room.SimulateScenario(participant, msg.Simulate) + if err != nil { + pLogger.Warnw("could not simulate scenario", err, + "simulate", msg.Simulate) + } } return nil } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 533a73969..032b93204 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -126,7 +126,7 @@ type Participant interface { Hidden() bool Start() - Close() error + Close(sendLeave bool) error // callbacks // OnTrackPublished - remote added a remoteTrack @@ -145,6 +145,7 @@ type Room interface { UpdateSubscriptions(participant Participant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error UpdateSubscriptionPermissions(participant Participant, permissions *livekit.UpdateSubscriptionPermissions) error SyncState(participant Participant, state *livekit.SyncState) error + SimulateScenario(participant Participant, scenario *livekit.SimulateScenario) error UpdateVideoLayers(participant Participant, updateVideoLayers *livekit.UpdateVideoLayers) error } diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 964178d98..a3dae0d79 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -84,9 +84,10 @@ type FakeParticipant struct { canSubscribeReturnsOnCall map[int]struct { result1 bool } - CloseStub func() error + CloseStub func(bool) error closeMutex sync.RWMutex closeArgsForCall []struct { + arg1 bool } closeReturns struct { result1 error @@ -945,17 +946,18 @@ func (fake *FakeParticipant) CanSubscribeReturnsOnCall(i int, result1 bool) { }{result1} } -func (fake *FakeParticipant) Close() error { +func (fake *FakeParticipant) Close(arg1 bool) error { fake.closeMutex.Lock() ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] fake.closeArgsForCall = append(fake.closeArgsForCall, struct { - }{}) + arg1 bool + }{arg1}) stub := fake.CloseStub fakeReturns := fake.closeReturns - fake.recordInvocation("Close", []interface{}{}) + fake.recordInvocation("Close", []interface{}{arg1}) fake.closeMutex.Unlock() if stub != nil { - return stub() + return stub(arg1) } if specificReturn { return ret.result1 @@ -969,12 +971,19 @@ func (fake *FakeParticipant) CloseCallCount() int { return len(fake.closeArgsForCall) } -func (fake *FakeParticipant) CloseCalls(stub func() error) { +func (fake *FakeParticipant) CloseCalls(stub func(bool) error) { fake.closeMutex.Lock() defer fake.closeMutex.Unlock() fake.CloseStub = stub } +func (fake *FakeParticipant) CloseArgsForCall(i int) bool { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + argsForCall := fake.closeArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeParticipant) CloseReturns(result1 error) { fake.closeMutex.Lock() defer fake.closeMutex.Unlock() diff --git a/pkg/rtc/types/typesfakes/fake_room.go b/pkg/rtc/types/typesfakes/fake_room.go index 56b7b0eaa..5c447c890 100644 --- a/pkg/rtc/types/typesfakes/fake_room.go +++ b/pkg/rtc/types/typesfakes/fake_room.go @@ -19,6 +19,18 @@ type FakeRoom struct { nameReturnsOnCall map[int]struct { result1 livekit.RoomName } + SimulateScenarioStub func(types.Participant, *livekit.SimulateScenario) error + simulateScenarioMutex sync.RWMutex + simulateScenarioArgsForCall []struct { + arg1 types.Participant + arg2 *livekit.SimulateScenario + } + simulateScenarioReturns struct { + result1 error + } + simulateScenarioReturnsOnCall map[int]struct { + result1 error + } SyncStateStub func(types.Participant, *livekit.SyncState) error syncStateMutex sync.RWMutex syncStateArgsForCall []struct { @@ -126,6 +138,68 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) { }{result1} } +func (fake *FakeRoom) SimulateScenario(arg1 types.Participant, arg2 *livekit.SimulateScenario) error { + fake.simulateScenarioMutex.Lock() + ret, specificReturn := fake.simulateScenarioReturnsOnCall[len(fake.simulateScenarioArgsForCall)] + fake.simulateScenarioArgsForCall = append(fake.simulateScenarioArgsForCall, struct { + arg1 types.Participant + arg2 *livekit.SimulateScenario + }{arg1, arg2}) + stub := fake.SimulateScenarioStub + fakeReturns := fake.simulateScenarioReturns + fake.recordInvocation("SimulateScenario", []interface{}{arg1, arg2}) + fake.simulateScenarioMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRoom) SimulateScenarioCallCount() int { + fake.simulateScenarioMutex.RLock() + defer fake.simulateScenarioMutex.RUnlock() + return len(fake.simulateScenarioArgsForCall) +} + +func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.Participant, *livekit.SimulateScenario) error) { + fake.simulateScenarioMutex.Lock() + defer fake.simulateScenarioMutex.Unlock() + fake.SimulateScenarioStub = stub +} + +func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.Participant, *livekit.SimulateScenario) { + fake.simulateScenarioMutex.RLock() + defer fake.simulateScenarioMutex.RUnlock() + argsForCall := fake.simulateScenarioArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRoom) SimulateScenarioReturns(result1 error) { + fake.simulateScenarioMutex.Lock() + defer fake.simulateScenarioMutex.Unlock() + fake.SimulateScenarioStub = nil + fake.simulateScenarioReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRoom) SimulateScenarioReturnsOnCall(i int, result1 error) { + fake.simulateScenarioMutex.Lock() + defer fake.simulateScenarioMutex.Unlock() + fake.SimulateScenarioStub = nil + if fake.simulateScenarioReturnsOnCall == nil { + fake.simulateScenarioReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.simulateScenarioReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeRoom) SyncState(arg1 types.Participant, arg2 *livekit.SyncState) error { fake.syncStateMutex.Lock() ret, specificReturn := fake.syncStateReturnsOnCall[len(fake.syncStateArgsForCall)] @@ -391,6 +465,8 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.nameMutex.RLock() defer fake.nameMutex.RUnlock() + fake.simulateScenarioMutex.RLock() + defer fake.simulateScenarioMutex.RUnlock() fake.syncStateMutex.RLock() defer fake.syncStateMutex.RUnlock() fake.updateSubscriptionPermissionsMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 49138b325..7240308c3 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -157,7 +157,7 @@ func (r *RoomManager) Stop() { for _, room := range rooms { for _, p := range room.GetParticipants() { - _ = p.Close() + _ = p.Close(true) } room.Close() } @@ -352,7 +352,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici "room", room.Room.Name, "roomID", room.Room.Sid, ) - _ = participant.Close() + _ = participant.Close(true) }() defer rtc.Recover() @@ -434,7 +434,7 @@ func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomN } case *livekit.RTCNodeMessage_DeleteRoom: for _, p := range room.GetParticipants() { - _ = p.Close() + _ = p.Close(true) } room.Close() case *livekit.RTCNodeMessage_UpdateSubscriptions: