Stop forwarding on congestion (#429)

* WIP commit

* comment out debug stuff
This commit is contained in:
Raja Subramanian
2022-02-11 09:17:53 +05:30
committed by GitHub
parent 8680f6fd23
commit a6338992e8
7 changed files with 37 additions and 19 deletions
+4 -5
View File
@@ -65,10 +65,8 @@ type RTCConfig struct {
// Throttle periods for pli/fir rtcp packets
PLIThrottle PLIThrottleConfig `yaml:"pli_throttle,omitempty"`
// Which side runs bandwidth estimation
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
CongestionControl CongestionControlConfig `yaml:"congestion_control,omitempty"`
// for testing, disable UDP
ForceTCP bool `yaml:"force_tcp,omitempty"`
}
@@ -88,8 +86,9 @@ type PLIThrottleConfig struct {
}
type CongestionControlConfig struct {
Enabled bool `yaml:"enabled"`
AllowPause bool `yaml:"allow_pause"`
Enabled bool `yaml:"enabled"`
AllowPause bool `yaml:"allow_pause"`
UseSendSideBWE bool `yaml:"send_side_bandwidth_estimation,omitempty"`
}
type AudioConfig struct {
+1 -1
View File
@@ -168,7 +168,7 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err
},
},
}
if rtcConf.UseSendSideBWE {
if rtcConf.CongestionControl.UseSendSideBWE {
subscriberConfig.RTPHeaderExtension.Video = append(subscriberConfig.RTPHeaderExtension.Video, sdp.TransportCCURI)
subscriberConfig.RTCPFeedback.Video = append(subscriberConfig.RTCPFeedback.Video, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBTransportCC})
} else {
+5
View File
@@ -510,6 +510,11 @@ func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTi
ep.Payload = vp8Packet
ep.KeyFrame = vp8Packet.IsKeyFrame
temporalLayer = int32(vp8Packet.TID)
/*
if ep.KeyFrame {
b.logger.Debugw("SA_DEBUG key frame", "ssrc", b.mediaSSRC) // REMOVE
}
*/
case "video/h264":
ep.KeyFrame = IsH264Keyframe(rtpPacket.Payload)
}
+2
View File
@@ -277,6 +277,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
if tp.shouldSendPLI {
//d.logger.Debugw("SA_DEBUG Forwarder SendPLI", "layer", layer) // REMOVE
d.lastPli.set(time.Now().UnixNano())
d.receiver.SendPLI(layer)
}
@@ -851,6 +852,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
targetLayers := d.forwarder.TargetLayers()
if targetLayers != InvalidLayers {
d.lastPli.set(time.Now().UnixNano())
//d.logger.Debugw("SA_DEBUG Subscriber RTCP SendPLI", "layer", targetLayers.spatial) // REMOVE
d.receiver.SendPLI(targetLayers.spatial)
pliOnce = false
}
+10 -7
View File
@@ -16,7 +16,7 @@ import (
// Forwarder
//
const (
FlagPauseOnDowngrade = false
FlagPauseOnDowngrade = true
)
type ForwardingStatus int
@@ -1122,6 +1122,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
if f.targetLayers.spatial == layer {
if extPkt.KeyFrame {
// lock to target layer
//f.logger.Debugw("SA_DEBUG locking to target", "current", f.currentLayers, "target", f.targetLayers) // REMOVE
f.currentLayers.spatial = f.targetLayers.spatial
if f.currentLayers.spatial == f.maxLayers.spatial {
tp.isSwitchingToMaxLayer = true
@@ -1137,7 +1138,10 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
return tp, nil
}
if FlagPauseOnDowngrade && f.targetLayers.spatial < f.currentLayers.spatial && f.targetLayers.spatial < f.maxLayers.spatial {
if FlagPauseOnDowngrade &&
f.targetLayers.spatial < f.currentLayers.spatial &&
f.targetLayers.spatial < f.maxLayers.spatial &&
f.lastAllocation.state == VideoAllocationStateDeficient {
//
// If target layer is lower than both the current and
// maximum subscribed layer, it is due to bandwidth
@@ -1151,11 +1155,10 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
//
// Note that in the case of client subscription layer restriction
// coinciding with server restriction due to bandwidth limitation,
// this will take client subscription as the winning vote and
// continue to stream current spatial layer till switch point.
// That could lead to congesting the channel.
// LK-TODO: Improve the above case, i.e. distinguish server
// applied restriction from client requested restriction.
// In the case of subscription change, higher should continue streaming
// to ensure smooth transition.
//
// To differentiate, drop only when in DEFICIENT state.
//
tp.shouldDrop = true
tp.isDroppingRelevant = true
+1
View File
@@ -387,6 +387,7 @@ func (w *WebRTCReceiver) SendPLI(layer int32) {
return
}
//w.logger.Debugw("SA_DEBUG, PLI request", "layer", layer) // REMOVE
buff.SendPLI()
}
+14 -6
View File
@@ -365,7 +365,7 @@ func (s *StreamAllocator) handleEvent(event *Event) {
func (s *StreamAllocator) handleSignalAddTrack(event *Event) {
params, _ := event.Data.(AddTrackParams)
isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast
track := newTrack(event.DownTrack, isManaged)
track := newTrack(event.DownTrack, isManaged, s.params.Logger)
trackID := livekit.TrackID(event.DownTrack.ID())
switch event.DownTrack.Kind() {
@@ -518,7 +518,7 @@ func (s *StreamAllocator) handleSignalTargetBitrate(event *Event) {
s.receivedEstimate = int64(receivedEstimate)
/*
if s.prevReceivedEstimate != s.receivedEstimate {
s.params.Logger.Debugw("received new estimate",
s.params.Logger.Debugw("received new send side estimate",
"old(bps)", s.prevReceivedEstimate,
"new(bps)", s.receivedEstimate,
)
@@ -1073,6 +1073,7 @@ func (s *StreamStateUpdate) Empty() bool {
type Track struct {
downTrack *DownTrack
isManaged bool
logger logger.Logger
highestSN uint32
packetsLost uint32
@@ -1082,10 +1083,11 @@ type Track struct {
maxLayers VideoLayers
}
func newTrack(downTrack *DownTrack, isManaged bool) *Track {
func newTrack(downTrack *DownTrack, isManaged bool, logger logger.Logger) *Track {
t := &Track{
downTrack: downTrack,
isManaged: isManaged,
logger: logger,
}
t.UpdateMaxLayers(downTrack.MaxLayers())
@@ -1136,7 +1138,9 @@ func (t *Track) WritePaddingRTP(bytesToSend int) int {
}
func (t *Track) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation {
return t.downTrack.Allocate(availableChannelCapacity, allowPause)
allocation := t.downTrack.Allocate(availableChannelCapacity, allowPause)
//t.logger.Debugw("SA_DEBUG Capacity allocation", "available", availableChannelCapacity, "alloc", allocation) // REMOVE
return allocation
}
func (t *Track) ProvisionalAllocatePrepare() {
@@ -1156,11 +1160,15 @@ func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
}
func (t *Track) ProvisionalAllocateCommit() VideoAllocation {
return t.downTrack.ProvisionalAllocateCommit()
allocation := t.downTrack.ProvisionalAllocateCommit()
//t.logger.Debugw("SA_DEBUG Provisional commit", "alloc", allocation) // REMOVE
return allocation
}
func (t *Track) AllocateNextHigher() (VideoAllocation, bool) {
return t.downTrack.AllocateNextHigher()
allocation, boosted := t.downTrack.AllocateNextHigher()
//t.logger.Debugw("SA_DEBUG Probe next higher layer", "alloc", allocation, "boosted", boosted) // REMOVE
return allocation, boosted
}
func (t *Track) FinalizeAllocate() {