From d2bf8f0ba1cf1d309185ceeba1ca9adb9271adcc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 13 Apr 2023 13:59:24 +0530 Subject: [PATCH] Support simulating subscriber bandwidth. (#1609) * Support simualting subscriber bandwidth. When non-zero, a full allocation is triggered. Also, probes are stopped. When set to zero, normal probing mechanism should catch up. Adding `allowPause` override which can be a connection option. * fix log * allowPause in participant params --- pkg/rtc/participant.go | 3 + pkg/rtc/room.go | 10 ++- pkg/rtc/transport.go | 16 ++++ pkg/rtc/transportmanager.go | 8 ++ pkg/rtc/types/interfaces.go | 4 + .../typesfakes/fake_local_participant.go | 78 +++++++++++++++++++ pkg/service/roommanager.go | 1 + pkg/sfu/streamallocator/streamallocator.go | 77 ++++++++++++++++-- 8 files changed, 187 insertions(+), 10 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7f92c2017..a6147c781 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -88,6 +88,7 @@ type ParticipantParams struct { VersionGenerator utils.TimedVersionGenerator TrackResolver types.MediaTrackResolver DisableDynacast bool + SubscriberAllowPause bool } type ParticipantImpl struct { @@ -1035,6 +1036,8 @@ func (p *ParticipantImpl) setupTransportManager() error { tm.OnAnyTransportNegotiationFailed(p.onAnyTransportNegotiationFailed) tm.OnDataMessage(p.onDataMessage) + + tm.SetSubscriberAllowPause(p.params.SubscriberAllowPause) p.TransportManager = tm return nil } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index b1109fab7..911191027 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -692,7 +692,7 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) { func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error { switch scenario := simulateScenario.Scenario.(type) { case *livekit.SimulateScenario_SpeakerUpdate: - r.Logger.Infow("simulating speaker update", "participant", participant.Identity()) + r.Logger.Infow("simulating speaker update", "participant", participant.Identity(), "duration", scenario.SpeakerUpdate) go func() { <-time.After(time.Duration(scenario.SpeakerUpdate) * time.Second) r.sendSpeakerChanges([]*livekit.SpeakerInfo{{ @@ -723,13 +723,19 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave); err != nil { return err } - case *livekit.SimulateScenario_SwitchCandidateProtocol: r.Logger.Infow("simulating switch candidate protocol", "participant", participant.Identity()) participant.ICERestart(&livekit.ICEConfig{ PreferenceSubscriber: livekit.ICECandidateType(scenario.SwitchCandidateProtocol), PreferencePublisher: livekit.ICECandidateType(scenario.SwitchCandidateProtocol), }) + case *livekit.SimulateScenario_SubscriberBandwidth: + if scenario.SubscriberBandwidth > 0 { + r.Logger.Infow("simulating subscriber bandwidth start", "participant", participant.Identity(), "bandwidth", scenario.SubscriberBandwidth) + } else { + r.Logger.Infow("simulating subscriber bandwidth end", "participant", participant.Identity()) + } + participant.SetSubscriberChannelCapacity(scenario.SubscriberBandwidth) } return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index c98f6241d..00a7da629 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1118,6 +1118,22 @@ func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTr t.streamAllocator.RemoveTrack(subTrack.DownTrack()) } +func (t *PCTransport) SetAllowPauseOfStreamAllocator(allowPause bool) { + if t.streamAllocator == nil { + return + } + + t.streamAllocator.SetAllowPause(allowPause) +} + +func (t *PCTransport) SetChannelCapacityOfStreamAllocator(channelCapacity int64) { + if t.streamAllocator == nil { + return + } + + t.streamAllocator.SetChannelCapacity(channelCapacity) +} + func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType { unknown := types.ICEConnectionTypeUnknown if t.pc == nil { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 113fd99f0..c7dfcc57d 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -782,3 +782,11 @@ func (t *TransportManager) SetSignalSourceValid(valid bool) { t.signalSourceValid.Store(valid) t.params.Logger.Debugw("signal source valid", "valid", valid) } + +func (t *TransportManager) SetSubscriberAllowPause(allowPause bool) { + t.subscriber.SetAllowPauseOfStreamAllocator(allowPause) +} + +func (t *TransportManager) SetSubscriberChannelCapacity(channelCapacity int64) { + t.subscriber.SetChannelCapacityOfStreamAllocator(channelCapacity) +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 0dcd9af88..d2dfe1f3a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -333,6 +333,10 @@ type LocalParticipant interface { UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error + + // down stream bandwidth management + SetSubscriberAllowPause(allowPause bool) + SetSubscriberChannelCapacity(channelCapacity int64) } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index b63f167c4..d9e73aa64 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -659,6 +659,16 @@ type FakeLocalParticipant struct { setSignalSourceValidArgsForCall []struct { arg1 bool } + SetSubscriberAllowPauseStub func(bool) + setSubscriberAllowPauseMutex sync.RWMutex + setSubscriberAllowPauseArgsForCall []struct { + arg1 bool + } + SetSubscriberChannelCapacityStub func(int64) + setSubscriberChannelCapacityMutex sync.RWMutex + setSubscriberChannelCapacityArgsForCall []struct { + arg1 int64 + } SetTrackMutedStub func(livekit.TrackID, bool, bool) setTrackMutedMutex sync.RWMutex setTrackMutedArgsForCall []struct { @@ -4343,6 +4353,70 @@ func (fake *FakeLocalParticipant) SetSignalSourceValidArgsForCall(i int) bool { return argsForCall.arg1 } +func (fake *FakeLocalParticipant) SetSubscriberAllowPause(arg1 bool) { + fake.setSubscriberAllowPauseMutex.Lock() + fake.setSubscriberAllowPauseArgsForCall = append(fake.setSubscriberAllowPauseArgsForCall, struct { + arg1 bool + }{arg1}) + stub := fake.SetSubscriberAllowPauseStub + fake.recordInvocation("SetSubscriberAllowPause", []interface{}{arg1}) + fake.setSubscriberAllowPauseMutex.Unlock() + if stub != nil { + fake.SetSubscriberAllowPauseStub(arg1) + } +} + +func (fake *FakeLocalParticipant) SetSubscriberAllowPauseCallCount() int { + fake.setSubscriberAllowPauseMutex.RLock() + defer fake.setSubscriberAllowPauseMutex.RUnlock() + return len(fake.setSubscriberAllowPauseArgsForCall) +} + +func (fake *FakeLocalParticipant) SetSubscriberAllowPauseCalls(stub func(bool)) { + fake.setSubscriberAllowPauseMutex.Lock() + defer fake.setSubscriberAllowPauseMutex.Unlock() + fake.SetSubscriberAllowPauseStub = stub +} + +func (fake *FakeLocalParticipant) SetSubscriberAllowPauseArgsForCall(i int) bool { + fake.setSubscriberAllowPauseMutex.RLock() + defer fake.setSubscriberAllowPauseMutex.RUnlock() + argsForCall := fake.setSubscriberAllowPauseArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) SetSubscriberChannelCapacity(arg1 int64) { + fake.setSubscriberChannelCapacityMutex.Lock() + fake.setSubscriberChannelCapacityArgsForCall = append(fake.setSubscriberChannelCapacityArgsForCall, struct { + arg1 int64 + }{arg1}) + stub := fake.SetSubscriberChannelCapacityStub + fake.recordInvocation("SetSubscriberChannelCapacity", []interface{}{arg1}) + fake.setSubscriberChannelCapacityMutex.Unlock() + if stub != nil { + fake.SetSubscriberChannelCapacityStub(arg1) + } +} + +func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityCallCount() int { + fake.setSubscriberChannelCapacityMutex.RLock() + defer fake.setSubscriberChannelCapacityMutex.RUnlock() + return len(fake.setSubscriberChannelCapacityArgsForCall) +} + +func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityCalls(stub func(int64)) { + fake.setSubscriberChannelCapacityMutex.Lock() + defer fake.setSubscriberChannelCapacityMutex.Unlock() + fake.SetSubscriberChannelCapacityStub = stub +} + +func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityArgsForCall(i int) int64 { + fake.setSubscriberChannelCapacityMutex.RLock() + defer fake.setSubscriberChannelCapacityMutex.RUnlock() + argsForCall := fake.setSubscriberChannelCapacityArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) { fake.setTrackMutedMutex.Lock() fake.setTrackMutedArgsForCall = append(fake.setTrackMutedArgsForCall, struct { @@ -5368,6 +5442,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setResponseSinkMutex.RUnlock() fake.setSignalSourceValidMutex.RLock() defer fake.setSignalSourceValidMutex.RUnlock() + fake.setSubscriberAllowPauseMutex.RLock() + defer fake.setSubscriberAllowPauseMutex.RUnlock() + fake.setSubscriberChannelCapacityMutex.RLock() + defer fake.setSubscriberChannelCapacityMutex.RUnlock() fake.setTrackMutedMutex.RLock() defer fake.setTrackMutedMutex.RUnlock() fake.startMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e44ff78dd..52778c916 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -336,6 +336,7 @@ func (r *RoomManager) StartSession( ReconnectOnSubscriptionError: reconnectOnSubscriptionError, VersionGenerator: r.versionGenerator, TrackResolver: room.ResolveMediaTrackForSubscriber, + SubscriberAllowPause: r.config.RTC.CongestionControl.AllowPause, }) if err != nil { return err diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index bc7b51276..a65834ed7 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -48,6 +48,8 @@ const ( FlagAllowOvershootInCatchup = true ) +// --------------------------------------------------------------------------- + var ( ChannelObserverParamsProbe = ChannelObserverParams{ Name: "probe", @@ -70,6 +72,8 @@ var ( } ) +// --------------------------------------------------------------------------- + type streamAllocatorState int const ( @@ -88,6 +92,8 @@ func (s streamAllocatorState) String() string { } } +// --------------------------------------------------------------------------- + type streamAllocatorSignal int const ( @@ -99,6 +105,8 @@ const ( streamAllocatorSignalSendProbe streamAllocatorSignalProbeClusterDone streamAllocatorSignalResume + streamAllocatorSignalSetAllowPause + streamAllocatorSignalSetChannelCapacity ) func (s streamAllocatorSignal) String() string { @@ -119,11 +127,17 @@ func (s streamAllocatorSignal) String() string { return "PROBE_CLUSTER_DONE" case streamAllocatorSignalResume: return "RESUME" + case streamAllocatorSignalSetAllowPause: + return "SET_ALLOW_PAUSE" + case streamAllocatorSignalSetChannelCapacity: + return "SET_CHANNEL_CAPACITY" default: return fmt.Sprintf("%d", int(s)) } } +// --------------------------------------------------------------------------- + type Event struct { Signal streamAllocatorSignal TrackID livekit.TrackID @@ -134,6 +148,8 @@ func (e Event) String() string { return fmt.Sprintf("StreamAllocator:Event{signal: %s, trackID: %s, data: %+v}", e.Signal, e.TrackID, e.Data) } +// --------------------------------------------------------------------------- + type StreamAllocatorParams struct { Config config.CongestionControlConfig Logger logger.Logger @@ -146,8 +162,11 @@ type StreamAllocator struct { bwe cc.BandwidthEstimator - lastReceivedEstimate int64 - committedChannelCapacity int64 + allowPause bool + + lastReceivedEstimate int64 + committedChannelCapacity int64 + overriddenChannelCapacity int64 probeInterval time.Duration lastProbeStartTime time.Time @@ -176,7 +195,8 @@ type StreamAllocator struct { func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { s := &StreamAllocator{ - params: params, + params: params, + allowPause: params.Config.AllowPause, prober: NewProber(ProberParams{ Logger: params.Logger, }), @@ -274,6 +294,20 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority ui s.videoTracksMu.Unlock() } +func (s *StreamAllocator) SetAllowPause(allowPause bool) { + s.postEvent(Event{ + Signal: streamAllocatorSignalSetAllowPause, + Data: allowPause, + }) +} + +func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) { + s.postEvent(Event{ + Signal: streamAllocatorSignalSetChannelCapacity, + Data: channelCapacity, + }) +} + func (s *StreamAllocator) resetState() { s.channelObserver = s.newChannelObserverNonProbe() s.resetProbe() @@ -536,6 +570,10 @@ func (s *StreamAllocator) handleEvent(event *Event) { s.handleSignalProbeClusterDone(event) case streamAllocatorSignalResume: s.handleSignalResume(event) + case streamAllocatorSignalSetAllowPause: + s.handleSignalSetAllowPause(event) + case streamAllocatorSignalSetChannelCapacity: + s.handleSignalSetChannelCapacity(event) } } @@ -649,6 +687,20 @@ func (s *StreamAllocator) handleSignalResume(event *Event) { } } +func (s *StreamAllocator) handleSignalSetAllowPause(event *Event) { + s.allowPause = event.Data.(bool) +} + +func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) { + s.overriddenChannelCapacity = event.Data.(int64) + if s.overriddenChannelCapacity > 0 { + s.params.Logger.Infow("allocating on override channel capacity", "override", s.overriddenChannelCapacity) + s.allocateAllTracks() + } else { + s.params.Logger.Infow("clearing override channel capacity") + } +} + func (s *StreamAllocator) setState(state streamAllocatorState) { if s.state == state { return @@ -846,7 +898,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { } // commit the track that needs change if enough could be acquired or pause not allowed - if !s.params.Config.AllowPause || bandwidthAcquired >= transition.BandwidthDelta { + if !s.allowPause || bandwidthAcquired >= transition.BandwidthDelta { allocation := track.ProvisionalAllocateCommit() if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) { update.HandleStreamingChange(true, track) @@ -958,6 +1010,14 @@ func (s *StreamAllocator) allocateAllTracks() { availableChannelCapacity := s.committedChannelCapacity if s.params.Config.MinChannelCapacity > availableChannelCapacity { availableChannelCapacity = s.params.Config.MinChannelCapacity + s.params.Logger.Debugw( + "stream allocator: overriding channel capacity with min channel capacity", + "actual", s.committedChannelCapacity, + "override", availableChannelCapacity, + ) + } + if s.overriddenChannelCapacity > 0 { + availableChannelCapacity = s.overriddenChannelCapacity s.params.Logger.Debugw( "stream allocator: overriding channel capacity", "actual", s.committedChannelCapacity, @@ -987,7 +1047,7 @@ func (s *StreamAllocator) allocateAllTracks() { if availableChannelCapacity < 0 { availableChannelCapacity = 0 } - if availableChannelCapacity == 0 && s.params.Config.AllowPause { + if availableChannelCapacity == 0 && s.allowPause { // nothing left for managed tracks, pause them all for _, track := range videoTracks { if !track.IsManaged() { @@ -1013,7 +1073,7 @@ func (s *StreamAllocator) allocateAllTracks() { } for _, track := range sorted { - usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.params.Config.AllowPause, FlagAllowOvershootWhileDeficient) + usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { availableChannelCapacity = 0 @@ -1155,7 +1215,8 @@ func (s *StreamAllocator) isInProbe() bool { } func (s *StreamAllocator) maybeProbe() { - if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid { + if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid || s.overriddenChannelCapacity > 0 { + // do not probe if channel capacity is overridden return } @@ -1207,7 +1268,7 @@ func (s *StreamAllocator) maybeProbeWithPadding() { func (s *StreamAllocator) getTracks() []*Track { s.videoTracksMu.RLock() - var tracks []*Track + tracks := make([]*Track, 0, len(s.videoTracks)) for _, track := range s.videoTracks { tracks = append(tracks, track) }