From a6338992e83e86ad3fe6860dbbd9402c4a582ddf Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 11 Feb 2022 09:17:53 +0530 Subject: [PATCH] Stop forwarding on congestion (#429) * WIP commit * comment out debug stuff --- pkg/config/config.go | 9 ++++----- pkg/rtc/config.go | 2 +- pkg/sfu/buffer/buffer.go | 5 +++++ pkg/sfu/downtrack.go | 2 ++ pkg/sfu/forwarder.go | 17 ++++++++++------- pkg/sfu/receiver.go | 1 + pkg/sfu/streamallocator.go | 20 ++++++++++++++------ 7 files changed, 37 insertions(+), 19 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 560960ea8..3170fffdc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -65,10 +65,8 @@ type RTCConfig struct { // Throttle periods for pli/fir rtcp packets PLIThrottle PLIThrottleConfig `yaml:"pli_throttle,omitempty"` - // Which side runs bandwidth estimation - UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` - CongestionControl CongestionControlConfig `yaml:"congestion_control,omitempty"` + // for testing, disable UDP ForceTCP bool `yaml:"force_tcp,omitempty"` } @@ -88,8 +86,9 @@ type PLIThrottleConfig struct { } type CongestionControlConfig struct { - Enabled bool `yaml:"enabled"` - AllowPause bool `yaml:"allow_pause"` + Enabled bool `yaml:"enabled"` + AllowPause bool `yaml:"allow_pause"` + UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"` } type AudioConfig struct { diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index c58ea7a94..188b13aa5 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -168,7 +168,7 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err }, }, } - if rtcConf.UseSendSideBWE { + if rtcConf.CongestionControl.UseSendSideBWE { subscriberConfig.RTPHeaderExtension.Video = append(subscriberConfig.RTPHeaderExtension.Video, sdp.TransportCCURI) subscriberConfig.RTCPFeedback.Video = append(subscriberConfig.RTCPFeedback.Video, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBTransportCC}) } else { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 45c6f1c95..4bd866cdd 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -510,6 +510,11 @@ func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTi ep.Payload = vp8Packet ep.KeyFrame = vp8Packet.IsKeyFrame temporalLayer = int32(vp8Packet.TID) + /* + if ep.KeyFrame { + b.logger.Debugw("SA_DEBUG key frame", "ssrc", b.mediaSSRC) // REMOVE + } + */ case "video/h264": ep.KeyFrame = IsH264Keyframe(rtpPacket.Payload) } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e41ad9d30..7a47e027b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -277,6 +277,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { tp, err := d.forwarder.GetTranslationParams(extPkt, layer) if tp.shouldSendPLI { + //d.logger.Debugw("SA_DEBUG Forwarder SendPLI", "layer", layer) // REMOVE d.lastPli.set(time.Now().UnixNano()) d.receiver.SendPLI(layer) } @@ -851,6 +852,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { targetLayers := d.forwarder.TargetLayers() if targetLayers != InvalidLayers { d.lastPli.set(time.Now().UnixNano()) + //d.logger.Debugw("SA_DEBUG Subscriber RTCP SendPLI", "layer", targetLayers.spatial) // REMOVE d.receiver.SendPLI(targetLayers.spatial) pliOnce = false } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 9a423341d..4e8b7c54f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -16,7 +16,7 @@ import ( // Forwarder // const ( - FlagPauseOnDowngrade = false + FlagPauseOnDowngrade = true ) type ForwardingStatus int @@ -1122,6 +1122,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in if f.targetLayers.spatial == layer { if extPkt.KeyFrame { // lock to target layer + //f.logger.Debugw("SA_DEBUG locking to target", "current", f.currentLayers, "target", f.targetLayers) // REMOVE f.currentLayers.spatial = f.targetLayers.spatial if f.currentLayers.spatial == f.maxLayers.spatial { tp.isSwitchingToMaxLayer = true @@ -1137,7 +1138,10 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } - if FlagPauseOnDowngrade && f.targetLayers.spatial < f.currentLayers.spatial && f.targetLayers.spatial < f.maxLayers.spatial { + if FlagPauseOnDowngrade && + f.targetLayers.spatial < f.currentLayers.spatial && + f.targetLayers.spatial < f.maxLayers.spatial && + f.lastAllocation.state == VideoAllocationStateDeficient { // // If target layer is lower than both the current and // maximum subscribed layer, it is due to bandwidth @@ -1151,11 +1155,10 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in // // Note that in the case of client subscription layer restriction // coinciding with server restriction due to bandwidth limitation, - // this will take client subscription as the winning vote and - // continue to stream current spatial layer till switch point. - // That could lead to congesting the channel. - // LK-TODO: Improve the above case, i.e. distinguish server - // applied restriction from client requested restriction. + // In the case of subscription change, higher should continue streaming + // to ensure smooth transition. + // + // To differentiate, drop only when in DEFICIENT state. // tp.shouldDrop = true tp.isDroppingRelevant = true diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index f7c88e511..628a1eea3 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -387,6 +387,7 @@ func (w *WebRTCReceiver) SendPLI(layer int32) { return } + //w.logger.Debugw("SA_DEBUG, PLI request", "layer", layer) // REMOVE buff.SendPLI() } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 3e123c5dd..36624d670 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -365,7 +365,7 @@ func (s *StreamAllocator) handleEvent(event *Event) { func (s *StreamAllocator) handleSignalAddTrack(event *Event) { params, _ := event.Data.(AddTrackParams) isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast - track := newTrack(event.DownTrack, isManaged) + track := newTrack(event.DownTrack, isManaged, s.params.Logger) trackID := livekit.TrackID(event.DownTrack.ID()) switch event.DownTrack.Kind() { @@ -518,7 +518,7 @@ func (s *StreamAllocator) handleSignalTargetBitrate(event *Event) { s.receivedEstimate = int64(receivedEstimate) /* if s.prevReceivedEstimate != s.receivedEstimate { - s.params.Logger.Debugw("received new estimate", + s.params.Logger.Debugw("received new send side estimate", "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate, ) @@ -1073,6 +1073,7 @@ func (s *StreamStateUpdate) Empty() bool { type Track struct { downTrack *DownTrack isManaged bool + logger logger.Logger highestSN uint32 packetsLost uint32 @@ -1082,10 +1083,11 @@ type Track struct { maxLayers VideoLayers } -func newTrack(downTrack *DownTrack, isManaged bool) *Track { +func newTrack(downTrack *DownTrack, isManaged bool, logger logger.Logger) *Track { t := &Track{ downTrack: downTrack, isManaged: isManaged, + logger: logger, } t.UpdateMaxLayers(downTrack.MaxLayers()) @@ -1136,7 +1138,9 @@ func (t *Track) WritePaddingRTP(bytesToSend int) int { } func (t *Track) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation { - return t.downTrack.Allocate(availableChannelCapacity, allowPause) + allocation := t.downTrack.Allocate(availableChannelCapacity, allowPause) + //t.logger.Debugw("SA_DEBUG Capacity allocation", "available", availableChannelCapacity, "alloc", allocation) // REMOVE + return allocation } func (t *Track) ProvisionalAllocatePrepare() { @@ -1156,11 +1160,15 @@ func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { } func (t *Track) ProvisionalAllocateCommit() VideoAllocation { - return t.downTrack.ProvisionalAllocateCommit() + allocation := t.downTrack.ProvisionalAllocateCommit() + //t.logger.Debugw("SA_DEBUG Provisional commit", "alloc", allocation) // REMOVE + return allocation } func (t *Track) AllocateNextHigher() (VideoAllocation, bool) { - return t.downTrack.AllocateNextHigher() + allocation, boosted := t.downTrack.AllocateNextHigher() + //t.logger.Debugw("SA_DEBUG Probe next higher layer", "alloc", allocation, "boosted", boosted) // REMOVE + return allocation, boosted } func (t *Track) FinalizeAllocate() {