Support simulating subscriber bandwidth. (#1609)

* Support simualting subscriber bandwidth.

When non-zero, a full allocation is triggered.
Also, probes are stopped.

When set to zero, normal probing mechanism should catch up.

Adding `allowPause` override which can be a connection option.

* fix log

* allowPause in participant params
This commit is contained in:
Raja Subramanian
2023-04-13 13:59:24 +05:30
committed by GitHub
parent f8a94c2125
commit d2bf8f0ba1
8 changed files with 187 additions and 10 deletions

View File

@@ -88,6 +88,7 @@ type ParticipantParams struct {
VersionGenerator utils.TimedVersionGenerator
TrackResolver types.MediaTrackResolver
DisableDynacast bool
SubscriberAllowPause bool
}
type ParticipantImpl struct {
@@ -1035,6 +1036,8 @@ func (p *ParticipantImpl) setupTransportManager() error {
tm.OnAnyTransportNegotiationFailed(p.onAnyTransportNegotiationFailed)
tm.OnDataMessage(p.onDataMessage)
tm.SetSubscriberAllowPause(p.params.SubscriberAllowPause)
p.TransportManager = tm
return nil
}

View File

@@ -692,7 +692,7 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) {
func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error {
switch scenario := simulateScenario.Scenario.(type) {
case *livekit.SimulateScenario_SpeakerUpdate:
r.Logger.Infow("simulating speaker update", "participant", participant.Identity())
r.Logger.Infow("simulating speaker update", "participant", participant.Identity(), "duration", scenario.SpeakerUpdate)
go func() {
<-time.After(time.Duration(scenario.SpeakerUpdate) * time.Second)
r.sendSpeakerChanges([]*livekit.SpeakerInfo{{
@@ -723,13 +723,19 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
if err := participant.Close(true, types.ParticipantCloseReasonSimulateServerLeave); err != nil {
return err
}
case *livekit.SimulateScenario_SwitchCandidateProtocol:
r.Logger.Infow("simulating switch candidate protocol", "participant", participant.Identity())
participant.ICERestart(&livekit.ICEConfig{
PreferenceSubscriber: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
PreferencePublisher: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
})
case *livekit.SimulateScenario_SubscriberBandwidth:
if scenario.SubscriberBandwidth > 0 {
r.Logger.Infow("simulating subscriber bandwidth start", "participant", participant.Identity(), "bandwidth", scenario.SubscriberBandwidth)
} else {
r.Logger.Infow("simulating subscriber bandwidth end", "participant", participant.Identity())
}
participant.SetSubscriberChannelCapacity(scenario.SubscriberBandwidth)
}
return nil
}

View File

@@ -1118,6 +1118,22 @@ func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTr
t.streamAllocator.RemoveTrack(subTrack.DownTrack())
}
func (t *PCTransport) SetAllowPauseOfStreamAllocator(allowPause bool) {
if t.streamAllocator == nil {
return
}
t.streamAllocator.SetAllowPause(allowPause)
}
func (t *PCTransport) SetChannelCapacityOfStreamAllocator(channelCapacity int64) {
if t.streamAllocator == nil {
return
}
t.streamAllocator.SetChannelCapacity(channelCapacity)
}
func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType {
unknown := types.ICEConnectionTypeUnknown
if t.pc == nil {

View File

@@ -782,3 +782,11 @@ func (t *TransportManager) SetSignalSourceValid(valid bool) {
t.signalSourceValid.Store(valid)
t.params.Logger.Debugw("signal source valid", "valid", valid)
}
func (t *TransportManager) SetSubscriberAllowPause(allowPause bool) {
t.subscriber.SetAllowPauseOfStreamAllocator(allowPause)
}
func (t *TransportManager) SetSubscriberChannelCapacity(channelCapacity int64) {
t.subscriber.SetChannelCapacityOfStreamAllocator(channelCapacity)
}

View File

@@ -333,6 +333,10 @@ type LocalParticipant interface {
UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error
UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error
// down stream bandwidth management
SetSubscriberAllowPause(allowPause bool)
SetSubscriberChannelCapacity(channelCapacity int64)
}
// Room is a container of participants, and can provide room-level actions

View File

@@ -659,6 +659,16 @@ type FakeLocalParticipant struct {
setSignalSourceValidArgsForCall []struct {
arg1 bool
}
SetSubscriberAllowPauseStub func(bool)
setSubscriberAllowPauseMutex sync.RWMutex
setSubscriberAllowPauseArgsForCall []struct {
arg1 bool
}
SetSubscriberChannelCapacityStub func(int64)
setSubscriberChannelCapacityMutex sync.RWMutex
setSubscriberChannelCapacityArgsForCall []struct {
arg1 int64
}
SetTrackMutedStub func(livekit.TrackID, bool, bool)
setTrackMutedMutex sync.RWMutex
setTrackMutedArgsForCall []struct {
@@ -4343,6 +4353,70 @@ func (fake *FakeLocalParticipant) SetSignalSourceValidArgsForCall(i int) bool {
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetSubscriberAllowPause(arg1 bool) {
fake.setSubscriberAllowPauseMutex.Lock()
fake.setSubscriberAllowPauseArgsForCall = append(fake.setSubscriberAllowPauseArgsForCall, struct {
arg1 bool
}{arg1})
stub := fake.SetSubscriberAllowPauseStub
fake.recordInvocation("SetSubscriberAllowPause", []interface{}{arg1})
fake.setSubscriberAllowPauseMutex.Unlock()
if stub != nil {
fake.SetSubscriberAllowPauseStub(arg1)
}
}
func (fake *FakeLocalParticipant) SetSubscriberAllowPauseCallCount() int {
fake.setSubscriberAllowPauseMutex.RLock()
defer fake.setSubscriberAllowPauseMutex.RUnlock()
return len(fake.setSubscriberAllowPauseArgsForCall)
}
func (fake *FakeLocalParticipant) SetSubscriberAllowPauseCalls(stub func(bool)) {
fake.setSubscriberAllowPauseMutex.Lock()
defer fake.setSubscriberAllowPauseMutex.Unlock()
fake.SetSubscriberAllowPauseStub = stub
}
func (fake *FakeLocalParticipant) SetSubscriberAllowPauseArgsForCall(i int) bool {
fake.setSubscriberAllowPauseMutex.RLock()
defer fake.setSubscriberAllowPauseMutex.RUnlock()
argsForCall := fake.setSubscriberAllowPauseArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetSubscriberChannelCapacity(arg1 int64) {
fake.setSubscriberChannelCapacityMutex.Lock()
fake.setSubscriberChannelCapacityArgsForCall = append(fake.setSubscriberChannelCapacityArgsForCall, struct {
arg1 int64
}{arg1})
stub := fake.SetSubscriberChannelCapacityStub
fake.recordInvocation("SetSubscriberChannelCapacity", []interface{}{arg1})
fake.setSubscriberChannelCapacityMutex.Unlock()
if stub != nil {
fake.SetSubscriberChannelCapacityStub(arg1)
}
}
func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityCallCount() int {
fake.setSubscriberChannelCapacityMutex.RLock()
defer fake.setSubscriberChannelCapacityMutex.RUnlock()
return len(fake.setSubscriberChannelCapacityArgsForCall)
}
func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityCalls(stub func(int64)) {
fake.setSubscriberChannelCapacityMutex.Lock()
defer fake.setSubscriberChannelCapacityMutex.Unlock()
fake.SetSubscriberChannelCapacityStub = stub
}
func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityArgsForCall(i int) int64 {
fake.setSubscriberChannelCapacityMutex.RLock()
defer fake.setSubscriberChannelCapacityMutex.RUnlock()
argsForCall := fake.setSubscriberChannelCapacityArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) {
fake.setTrackMutedMutex.Lock()
fake.setTrackMutedArgsForCall = append(fake.setTrackMutedArgsForCall, struct {
@@ -5368,6 +5442,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setResponseSinkMutex.RUnlock()
fake.setSignalSourceValidMutex.RLock()
defer fake.setSignalSourceValidMutex.RUnlock()
fake.setSubscriberAllowPauseMutex.RLock()
defer fake.setSubscriberAllowPauseMutex.RUnlock()
fake.setSubscriberChannelCapacityMutex.RLock()
defer fake.setSubscriberChannelCapacityMutex.RUnlock()
fake.setTrackMutedMutex.RLock()
defer fake.setTrackMutedMutex.RUnlock()
fake.startMutex.RLock()

View File

@@ -336,6 +336,7 @@ func (r *RoomManager) StartSession(
ReconnectOnSubscriptionError: reconnectOnSubscriptionError,
VersionGenerator: r.versionGenerator,
TrackResolver: room.ResolveMediaTrackForSubscriber,
SubscriberAllowPause: r.config.RTC.CongestionControl.AllowPause,
})
if err != nil {
return err

View File

@@ -48,6 +48,8 @@ const (
FlagAllowOvershootInCatchup = true
)
// ---------------------------------------------------------------------------
var (
ChannelObserverParamsProbe = ChannelObserverParams{
Name: "probe",
@@ -70,6 +72,8 @@ var (
}
)
// ---------------------------------------------------------------------------
type streamAllocatorState int
const (
@@ -88,6 +92,8 @@ func (s streamAllocatorState) String() string {
}
}
// ---------------------------------------------------------------------------
type streamAllocatorSignal int
const (
@@ -99,6 +105,8 @@ const (
streamAllocatorSignalSendProbe
streamAllocatorSignalProbeClusterDone
streamAllocatorSignalResume
streamAllocatorSignalSetAllowPause
streamAllocatorSignalSetChannelCapacity
)
func (s streamAllocatorSignal) String() string {
@@ -119,11 +127,17 @@ func (s streamAllocatorSignal) String() string {
return "PROBE_CLUSTER_DONE"
case streamAllocatorSignalResume:
return "RESUME"
case streamAllocatorSignalSetAllowPause:
return "SET_ALLOW_PAUSE"
case streamAllocatorSignalSetChannelCapacity:
return "SET_CHANNEL_CAPACITY"
default:
return fmt.Sprintf("%d", int(s))
}
}
// ---------------------------------------------------------------------------
type Event struct {
Signal streamAllocatorSignal
TrackID livekit.TrackID
@@ -134,6 +148,8 @@ func (e Event) String() string {
return fmt.Sprintf("StreamAllocator:Event{signal: %s, trackID: %s, data: %+v}", e.Signal, e.TrackID, e.Data)
}
// ---------------------------------------------------------------------------
type StreamAllocatorParams struct {
Config config.CongestionControlConfig
Logger logger.Logger
@@ -146,8 +162,11 @@ type StreamAllocator struct {
bwe cc.BandwidthEstimator
lastReceivedEstimate int64
committedChannelCapacity int64
allowPause bool
lastReceivedEstimate int64
committedChannelCapacity int64
overriddenChannelCapacity int64
probeInterval time.Duration
lastProbeStartTime time.Time
@@ -176,7 +195,8 @@ type StreamAllocator struct {
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
s := &StreamAllocator{
params: params,
params: params,
allowPause: params.Config.AllowPause,
prober: NewProber(ProberParams{
Logger: params.Logger,
}),
@@ -274,6 +294,20 @@ func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority ui
s.videoTracksMu.Unlock()
}
func (s *StreamAllocator) SetAllowPause(allowPause bool) {
s.postEvent(Event{
Signal: streamAllocatorSignalSetAllowPause,
Data: allowPause,
})
}
func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64) {
s.postEvent(Event{
Signal: streamAllocatorSignalSetChannelCapacity,
Data: channelCapacity,
})
}
func (s *StreamAllocator) resetState() {
s.channelObserver = s.newChannelObserverNonProbe()
s.resetProbe()
@@ -536,6 +570,10 @@ func (s *StreamAllocator) handleEvent(event *Event) {
s.handleSignalProbeClusterDone(event)
case streamAllocatorSignalResume:
s.handleSignalResume(event)
case streamAllocatorSignalSetAllowPause:
s.handleSignalSetAllowPause(event)
case streamAllocatorSignalSetChannelCapacity:
s.handleSignalSetChannelCapacity(event)
}
}
@@ -649,6 +687,20 @@ func (s *StreamAllocator) handleSignalResume(event *Event) {
}
}
func (s *StreamAllocator) handleSignalSetAllowPause(event *Event) {
s.allowPause = event.Data.(bool)
}
func (s *StreamAllocator) handleSignalSetChannelCapacity(event *Event) {
s.overriddenChannelCapacity = event.Data.(int64)
if s.overriddenChannelCapacity > 0 {
s.params.Logger.Infow("allocating on override channel capacity", "override", s.overriddenChannelCapacity)
s.allocateAllTracks()
} else {
s.params.Logger.Infow("clearing override channel capacity")
}
}
func (s *StreamAllocator) setState(state streamAllocatorState) {
if s.state == state {
return
@@ -846,7 +898,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
}
// commit the track that needs change if enough could be acquired or pause not allowed
if !s.params.Config.AllowPause || bandwidthAcquired >= transition.BandwidthDelta {
if !s.allowPause || bandwidthAcquired >= transition.BandwidthDelta {
allocation := track.ProvisionalAllocateCommit()
if allocation.PauseReason == sfu.VideoPauseReasonBandwidth && track.SetPaused(true) {
update.HandleStreamingChange(true, track)
@@ -958,6 +1010,14 @@ func (s *StreamAllocator) allocateAllTracks() {
availableChannelCapacity := s.committedChannelCapacity
if s.params.Config.MinChannelCapacity > availableChannelCapacity {
availableChannelCapacity = s.params.Config.MinChannelCapacity
s.params.Logger.Debugw(
"stream allocator: overriding channel capacity with min channel capacity",
"actual", s.committedChannelCapacity,
"override", availableChannelCapacity,
)
}
if s.overriddenChannelCapacity > 0 {
availableChannelCapacity = s.overriddenChannelCapacity
s.params.Logger.Debugw(
"stream allocator: overriding channel capacity",
"actual", s.committedChannelCapacity,
@@ -987,7 +1047,7 @@ func (s *StreamAllocator) allocateAllTracks() {
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
}
if availableChannelCapacity == 0 && s.params.Config.AllowPause {
if availableChannelCapacity == 0 && s.allowPause {
// nothing left for managed tracks, pause them all
for _, track := range videoTracks {
if !track.IsManaged() {
@@ -1013,7 +1073,7 @@ func (s *StreamAllocator) allocateAllTracks() {
}
for _, track := range sorted {
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.params.Config.AllowPause, FlagAllowOvershootWhileDeficient)
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
availableChannelCapacity = 0
@@ -1155,7 +1215,8 @@ func (s *StreamAllocator) isInProbe() bool {
}
func (s *StreamAllocator) maybeProbe() {
if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid {
if time.Since(s.lastProbeStartTime) < s.probeInterval || s.probeClusterId != ProbeClusterIdInvalid || s.overriddenChannelCapacity > 0 {
// do not probe if channel capacity is overridden
return
}
@@ -1207,7 +1268,7 @@ func (s *StreamAllocator) maybeProbeWithPadding() {
func (s *StreamAllocator) getTracks() []*Track {
s.videoTracksMu.RLock()
var tracks []*Track
tracks := make([]*Track, 0, len(s.videoTracks))
for _, track := range s.videoTracks {
tracks = append(tracks, track)
}