Deleting unused code in stream allocator (#447)

This commit is contained in:
Raja Subramanian
2022-02-18 21:10:52 +05:30
committed by GitHub
parent 92d47ec421
commit dd4cec7724
+54 -206
View File
@@ -38,16 +38,6 @@ const (
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe
GratuitousProbePct = 10
GratuitousProbeMinBps = 100 * 1000 // 100 kbps
GratuitousProbeMaxBps = 300 * 1000 // 300 kbps
GratuitousProbeMinDuration = 500 * time.Millisecond
GratuitousProbeMaxDuration = 600 * time.Millisecond
AudioLossWeight = 0.75
VideoLossWeight = 0.25
)
type State int
@@ -75,7 +65,6 @@ const (
SignalRemoveTrack
SignalEstimate
SignalTargetBitrate
SignalReceiverReport
SignalAvailableLayersChange
SignalSubscriptionChange
SignalSubscribedLayersChange
@@ -94,8 +83,6 @@ func (s Signal) String() string {
return "ESTIMATE"
case SignalTargetBitrate:
return "TARGET_BITRATE"
case SignalReceiverReport:
return "RECEIVER_REPORT"
case SignalSubscriptionChange:
return "SUBSCRIPTION_CHANGE"
case SignalSubscribedLayersChange:
@@ -151,7 +138,6 @@ type StreamAllocator struct {
channelObserver *ChannelObserver
audioTracks map[livekit.TrackID]*Track
videoTracks map[livekit.TrackID]*Track
exemptVideoTracksSorted TrackSorter
managedVideoTracksSorted TrackSorter
@@ -171,7 +157,6 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
Logger: params.Logger,
}),
channelObserver: NewChannelObserver("non-probe", params.Logger, NumRequiredEstimatesNonProbe, NackRatioThresholdNonProbe),
audioTracks: make(map[livekit.TrackID]*Track),
videoTracks: make(map[livekit.TrackID]*Track),
eventCh: make(chan Event, 20),
}
@@ -232,7 +217,6 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged)
downTrack.OnPacketSent(s.onPacketSent)
}
downTrack.AddReceiverReportListener(s.onReceiverReport)
}
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) {
@@ -272,15 +256,6 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) {
})
}
// called when a new RTCP Receiver Report is received
func (s *StreamAllocator) onReceiverReport(downTrack *DownTrack, rr *rtcp.ReceiverReport) {
s.postEvent(Event{
Signal: SignalReceiverReport,
DownTrack: downTrack,
Data: rr,
})
}
// called when feeding track's layer availability changes
func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack) {
s.postEvent(Event{
@@ -369,8 +344,6 @@ func (s *StreamAllocator) handleEvent(event *Event) {
s.handleSignalEstimate(event)
case SignalTargetBitrate:
s.handleSignalTargetBitrate(event)
case SignalReceiverReport:
s.handleSignalReceiverReport(event)
case SignalAvailableLayersChange:
s.handleSignalAvailableLayersChange(event)
case SignalSubscriptionChange:
@@ -387,77 +360,71 @@ func (s *StreamAllocator) handleEvent(event *Event) {
}
func (s *StreamAllocator) handleSignalAddTrack(event *Event) {
if event.DownTrack.Kind() != webrtc.RTPCodecTypeVideo {
return
}
params, _ := event.Data.(AddTrackParams)
isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast
track := newTrack(event.DownTrack, isManaged, params.PublisherID, s.params.Logger)
trackID := livekit.TrackID(event.DownTrack.ID())
switch event.DownTrack.Kind() {
case webrtc.RTPCodecTypeAudio:
s.audioTracks[trackID] = track
case webrtc.RTPCodecTypeVideo:
s.videoTracks[trackID] = track
s.videoTracks[trackID] = track
if isManaged {
s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track)
sort.Sort(s.managedVideoTracksSorted)
} else {
s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track)
sort.Sort(s.exemptVideoTracksSorted)
}
s.allocateTrack(track)
if isManaged {
s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track)
sort.Sort(s.managedVideoTracksSorted)
} else {
s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track)
sort.Sort(s.exemptVideoTracksSorted)
}
s.allocateTrack(track)
}
func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) {
trackID := livekit.TrackID(event.DownTrack.ID())
switch event.DownTrack.Kind() {
case webrtc.RTPCodecTypeAudio:
if _, ok := s.audioTracks[trackID]; !ok {
return
}
delete(s.audioTracks, trackID)
case webrtc.RTPCodecTypeVideo:
track, ok := s.videoTracks[trackID]
if !ok {
return
}
delete(s.videoTracks, trackID)
if track.IsManaged() {
n := len(s.managedVideoTracksSorted)
for idx, videoTrack := range s.managedVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1]
s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.managedVideoTracksSorted)
} else {
n := len(s.exemptVideoTracksSorted)
for idx, videoTrack := range s.exemptVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1]
s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.exemptVideoTracksSorted)
}
// re-initialize estimate if all managed tracks are removed, let it get a fresh start
if len(s.managedVideoTracksSorted) == 0 {
s.resetState()
return
}
// LK-TODO: use any saved bandwidth to re-distribute
s.adjustState()
if event.DownTrack.Kind() != webrtc.RTPCodecTypeVideo {
return
}
trackID := livekit.TrackID(event.DownTrack.ID())
track, ok := s.videoTracks[trackID]
if !ok {
return
}
delete(s.videoTracks, trackID)
if track.IsManaged() {
n := len(s.managedVideoTracksSorted)
for idx, videoTrack := range s.managedVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1]
s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.managedVideoTracksSorted)
} else {
n := len(s.exemptVideoTracksSorted)
for idx, videoTrack := range s.exemptVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1]
s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.exemptVideoTracksSorted)
}
// re-initialize estimate if all managed tracks are removed, let it get a fresh start
if len(s.managedVideoTracksSorted) == 0 {
s.resetState()
return
}
// LK-TODO: use any saved bandwidth to re-distribute
s.adjustState()
}
func (s *StreamAllocator) handleSignalEstimate(event *Event) {
@@ -530,35 +497,6 @@ func (s *StreamAllocator) handleSignalTargetBitrate(event *Event) {
s.handleNewEstimate(int64(receivedEstimate))
}
// LK-TODO-START
// Receiver report stats are not used in the current implementation.
//
// The idea is to use a loss/rtt based estimator and compare against REMB like outlined here
// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6
//
// But the implementation could get quite tricky. So, a separate PR dedicated effort for that
// is required. Something like from Chrome, but hopefully much less complicated :-)
// https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/modules/congestion_controller/goog_cc/loss_based_bandwidth_estimation.cc;bpv=0;bpt=1
// LK-TODO-END
func (s *StreamAllocator) handleSignalReceiverReport(event *Event) {
var track *Track
ok := false
trackID := livekit.TrackID(event.DownTrack.ID())
switch event.DownTrack.Kind() {
case webrtc.RTPCodecTypeAudio:
track, ok = s.audioTracks[trackID]
case webrtc.RTPCodecTypeVideo:
track, ok = s.videoTracks[trackID]
}
if !ok {
return
}
rr, _ := event.Data.(*rtcp.ReceiverReport)
track.UpdatePacketStats(rr)
}
func (s *StreamAllocator) handleSignalAvailableLayersChange(event *Event) {
track, ok := s.videoTracks[livekit.TrackID(event.DownTrack.ID())]
if !ok {
@@ -1056,39 +994,6 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
return aggPacketDelta, aggRepeatedNackDelta
}
// LK-TODO: unused till loss based estimation is done, but just a sample impl of weighting audio higher
func (s *StreamAllocator) calculateLoss() float32 {
packetsAudio := uint32(0)
packetsLostAudio := uint32(0)
for _, track := range s.audioTracks {
packets, packetsLost := track.GetPacketStats()
packetsAudio += packets
packetsLostAudio += packetsLost
}
audioLossPct := float32(0.0)
if packetsAudio != 0 {
audioLossPct = (float32(packetsLostAudio) * 100.0) / float32(packetsAudio)
}
packetsVideo := uint32(0)
packetsLostVideo := uint32(0)
for _, track := range s.videoTracks {
packets, packetsLost := track.GetPacketStats()
packetsVideo += packets
packetsLostVideo += packetsLost
}
videoLossPct := float32(0.0)
if packetsVideo != 0 {
videoLossPct = (float32(packetsLostVideo) * 100.0) / float32(packetsVideo)
}
return AudioLossWeight*audioLossPct + VideoLossWeight*videoLossPct
}
func (s *StreamAllocator) initProbe(goalBps int64) {
s.lastProbeStartTime = time.Now()
@@ -1222,39 +1127,6 @@ func (s *StreamAllocator) maybeProbeWithPadding() {
}
}
func (s *StreamAllocator) maybeGratuitousProbe() bool {
// don't gratuitously probe too often
if time.Since(s.lastProbeStartTime) < s.probeInterval || len(s.managedVideoTracksSorted) == 0 || s.probeClusterId != ProbeClusterIdInvalid {
return false
}
// use last received estimate for gratuitous probing base as
// more updates may have been received since the last commit
expectedRateBps := s.getExpectedBandwidthUsage()
headroomBps := s.lastReceivedEstimate - expectedRateBps
if headroomBps > GratuitousProbeHeadroomBps {
return false
}
probeRateBps := (s.lastReceivedEstimate * GratuitousProbePct) / 100
if probeRateBps < GratuitousProbeMinBps {
probeRateBps = GratuitousProbeMinBps
}
if probeRateBps > GratuitousProbeMaxBps {
probeRateBps = GratuitousProbeMaxBps
}
s.initProbe(expectedRateBps + probeRateBps)
s.probeClusterId = s.prober.AddCluster(
int(s.lastReceivedEstimate+probeRateBps),
int(expectedRateBps),
GratuitousProbeMinDuration,
GratuitousProbeMaxDuration,
)
return true
}
// ------------------------------------------------
type StreamState int
@@ -1307,11 +1179,6 @@ type Track struct {
publisherID livekit.ParticipantID
logger logger.Logger
highestSN uint32
packetsLost uint32
lastHighestSN uint32
lastPacketsLost uint32
maxLayers VideoLayers
totalPackets uint32
@@ -1346,29 +1213,10 @@ func (t *Track) PublisherID() livekit.ParticipantID {
return t.publisherID
}
// LK-TODO this should probably be maintained in downTrack and this module can query what it needs
func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) {
t.lastHighestSN = t.highestSN
t.lastPacketsLost = t.packetsLost
for _, report := range rr.Reports {
if report.LastSequenceNumber > t.highestSN {
t.highestSN = report.LastSequenceNumber
}
if report.TotalLost > t.packetsLost {
t.packetsLost = report.TotalLost
}
}
}
func (t *Track) UpdateMaxLayers(layers VideoLayers) {
t.maxLayers = layers
}
func (t *Track) GetPacketStats() (uint32, uint32) {
return t.highestSN - t.lastHighestSN, t.packetsLost - t.lastPacketsLost
}
func (t *Track) WritePaddingRTP(bytesToSend int) int {
return t.downTrack.WritePaddingRTP(bytesToSend)
}