diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 4b8a89179..567e9fdbb 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -38,16 +38,6 @@ const ( ProbeMinBps = 200 * 1000 // 200 kbps ProbeMinDuration = 20 * time.Second ProbeMaxDuration = 21 * time.Second - - GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe - GratuitousProbePct = 10 - GratuitousProbeMinBps = 100 * 1000 // 100 kbps - GratuitousProbeMaxBps = 300 * 1000 // 300 kbps - GratuitousProbeMinDuration = 500 * time.Millisecond - GratuitousProbeMaxDuration = 600 * time.Millisecond - - AudioLossWeight = 0.75 - VideoLossWeight = 0.25 ) type State int @@ -75,7 +65,6 @@ const ( SignalRemoveTrack SignalEstimate SignalTargetBitrate - SignalReceiverReport SignalAvailableLayersChange SignalSubscriptionChange SignalSubscribedLayersChange @@ -94,8 +83,6 @@ func (s Signal) String() string { return "ESTIMATE" case SignalTargetBitrate: return "TARGET_BITRATE" - case SignalReceiverReport: - return "RECEIVER_REPORT" case SignalSubscriptionChange: return "SUBSCRIPTION_CHANGE" case SignalSubscribedLayersChange: @@ -151,7 +138,6 @@ type StreamAllocator struct { channelObserver *ChannelObserver - audioTracks map[livekit.TrackID]*Track videoTracks map[livekit.TrackID]*Track exemptVideoTracksSorted TrackSorter managedVideoTracksSorted TrackSorter @@ -171,7 +157,6 @@ func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { Logger: params.Logger, }), channelObserver: NewChannelObserver("non-probe", params.Logger, NumRequiredEstimatesNonProbe, NackRatioThresholdNonProbe), - audioTracks: make(map[livekit.TrackID]*Track), videoTracks: make(map[livekit.TrackID]*Track), eventCh: make(chan Event, 20), } @@ -232,7 +217,6 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams) downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged) downTrack.OnPacketSent(s.onPacketSent) } - downTrack.AddReceiverReportListener(s.onReceiverReport) } func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) { @@ -272,15 +256,6 @@ func (s *StreamAllocator) onTargetBitrateChange(bitrate int) { }) } -// called when a new RTCP Receiver Report is received -func (s *StreamAllocator) onReceiverReport(downTrack *DownTrack, rr *rtcp.ReceiverReport) { - s.postEvent(Event{ - Signal: SignalReceiverReport, - DownTrack: downTrack, - Data: rr, - }) -} - // called when feeding track's layer availability changes func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack) { s.postEvent(Event{ @@ -369,8 +344,6 @@ func (s *StreamAllocator) handleEvent(event *Event) { s.handleSignalEstimate(event) case SignalTargetBitrate: s.handleSignalTargetBitrate(event) - case SignalReceiverReport: - s.handleSignalReceiverReport(event) case SignalAvailableLayersChange: s.handleSignalAvailableLayersChange(event) case SignalSubscriptionChange: @@ -387,77 +360,71 @@ func (s *StreamAllocator) handleEvent(event *Event) { } func (s *StreamAllocator) handleSignalAddTrack(event *Event) { + if event.DownTrack.Kind() != webrtc.RTPCodecTypeVideo { + return + } + params, _ := event.Data.(AddTrackParams) isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast track := newTrack(event.DownTrack, isManaged, params.PublisherID, s.params.Logger) trackID := livekit.TrackID(event.DownTrack.ID()) - switch event.DownTrack.Kind() { - case webrtc.RTPCodecTypeAudio: - s.audioTracks[trackID] = track - case webrtc.RTPCodecTypeVideo: - s.videoTracks[trackID] = track + s.videoTracks[trackID] = track - if isManaged { - s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track) - sort.Sort(s.managedVideoTracksSorted) - } else { - s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track) - sort.Sort(s.exemptVideoTracksSorted) - } - - s.allocateTrack(track) + if isManaged { + s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track) + sort.Sort(s.managedVideoTracksSorted) + } else { + s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track) + sort.Sort(s.exemptVideoTracksSorted) } + + s.allocateTrack(track) } func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) { - trackID := livekit.TrackID(event.DownTrack.ID()) - switch event.DownTrack.Kind() { - case webrtc.RTPCodecTypeAudio: - if _, ok := s.audioTracks[trackID]; !ok { - return - } - - delete(s.audioTracks, trackID) - case webrtc.RTPCodecTypeVideo: - track, ok := s.videoTracks[trackID] - if !ok { - return - } - - delete(s.videoTracks, trackID) - - if track.IsManaged() { - n := len(s.managedVideoTracksSorted) - for idx, videoTrack := range s.managedVideoTracksSorted { - if videoTrack.DownTrack() == event.DownTrack { - s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1] - s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1] - break - } - } - sort.Sort(s.managedVideoTracksSorted) - } else { - n := len(s.exemptVideoTracksSorted) - for idx, videoTrack := range s.exemptVideoTracksSorted { - if videoTrack.DownTrack() == event.DownTrack { - s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1] - s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1] - break - } - } - sort.Sort(s.exemptVideoTracksSorted) - } - - // re-initialize estimate if all managed tracks are removed, let it get a fresh start - if len(s.managedVideoTracksSorted) == 0 { - s.resetState() - return - } - - // LK-TODO: use any saved bandwidth to re-distribute - s.adjustState() + if event.DownTrack.Kind() != webrtc.RTPCodecTypeVideo { + return } + + trackID := livekit.TrackID(event.DownTrack.ID()) + track, ok := s.videoTracks[trackID] + if !ok { + return + } + + delete(s.videoTracks, trackID) + + if track.IsManaged() { + n := len(s.managedVideoTracksSorted) + for idx, videoTrack := range s.managedVideoTracksSorted { + if videoTrack.DownTrack() == event.DownTrack { + s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1] + s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1] + break + } + } + sort.Sort(s.managedVideoTracksSorted) + } else { + n := len(s.exemptVideoTracksSorted) + for idx, videoTrack := range s.exemptVideoTracksSorted { + if videoTrack.DownTrack() == event.DownTrack { + s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1] + s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1] + break + } + } + sort.Sort(s.exemptVideoTracksSorted) + } + + // re-initialize estimate if all managed tracks are removed, let it get a fresh start + if len(s.managedVideoTracksSorted) == 0 { + s.resetState() + return + } + + // LK-TODO: use any saved bandwidth to re-distribute + s.adjustState() } func (s *StreamAllocator) handleSignalEstimate(event *Event) { @@ -530,35 +497,6 @@ func (s *StreamAllocator) handleSignalTargetBitrate(event *Event) { s.handleNewEstimate(int64(receivedEstimate)) } -// LK-TODO-START -// Receiver report stats are not used in the current implementation. -// -// The idea is to use a loss/rtt based estimator and compare against REMB like outlined here -// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6 -// -// But the implementation could get quite tricky. So, a separate PR dedicated effort for that -// is required. Something like from Chrome, but hopefully much less complicated :-) -// https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/modules/congestion_controller/goog_cc/loss_based_bandwidth_estimation.cc;bpv=0;bpt=1 -// LK-TODO-END -func (s *StreamAllocator) handleSignalReceiverReport(event *Event) { - var track *Track - ok := false - - trackID := livekit.TrackID(event.DownTrack.ID()) - switch event.DownTrack.Kind() { - case webrtc.RTPCodecTypeAudio: - track, ok = s.audioTracks[trackID] - case webrtc.RTPCodecTypeVideo: - track, ok = s.videoTracks[trackID] - } - if !ok { - return - } - - rr, _ := event.Data.(*rtcp.ReceiverReport) - track.UpdatePacketStats(rr) -} - func (s *StreamAllocator) handleSignalAvailableLayersChange(event *Event) { track, ok := s.videoTracks[livekit.TrackID(event.DownTrack.ID())] if !ok { @@ -1056,39 +994,6 @@ func (s *StreamAllocator) getNackDelta() (uint32, uint32) { return aggPacketDelta, aggRepeatedNackDelta } -// LK-TODO: unused till loss based estimation is done, but just a sample impl of weighting audio higher -func (s *StreamAllocator) calculateLoss() float32 { - packetsAudio := uint32(0) - packetsLostAudio := uint32(0) - for _, track := range s.audioTracks { - packets, packetsLost := track.GetPacketStats() - - packetsAudio += packets - packetsLostAudio += packetsLost - } - - audioLossPct := float32(0.0) - if packetsAudio != 0 { - audioLossPct = (float32(packetsLostAudio) * 100.0) / float32(packetsAudio) - } - - packetsVideo := uint32(0) - packetsLostVideo := uint32(0) - for _, track := range s.videoTracks { - packets, packetsLost := track.GetPacketStats() - - packetsVideo += packets - packetsLostVideo += packetsLost - } - - videoLossPct := float32(0.0) - if packetsVideo != 0 { - videoLossPct = (float32(packetsLostVideo) * 100.0) / float32(packetsVideo) - } - - return AudioLossWeight*audioLossPct + VideoLossWeight*videoLossPct -} - func (s *StreamAllocator) initProbe(goalBps int64) { s.lastProbeStartTime = time.Now() @@ -1222,39 +1127,6 @@ func (s *StreamAllocator) maybeProbeWithPadding() { } } -func (s *StreamAllocator) maybeGratuitousProbe() bool { - // don't gratuitously probe too often - if time.Since(s.lastProbeStartTime) < s.probeInterval || len(s.managedVideoTracksSorted) == 0 || s.probeClusterId != ProbeClusterIdInvalid { - return false - } - - // use last received estimate for gratuitous probing base as - // more updates may have been received since the last commit - expectedRateBps := s.getExpectedBandwidthUsage() - headroomBps := s.lastReceivedEstimate - expectedRateBps - if headroomBps > GratuitousProbeHeadroomBps { - return false - } - - probeRateBps := (s.lastReceivedEstimate * GratuitousProbePct) / 100 - if probeRateBps < GratuitousProbeMinBps { - probeRateBps = GratuitousProbeMinBps - } - if probeRateBps > GratuitousProbeMaxBps { - probeRateBps = GratuitousProbeMaxBps - } - - s.initProbe(expectedRateBps + probeRateBps) - s.probeClusterId = s.prober.AddCluster( - int(s.lastReceivedEstimate+probeRateBps), - int(expectedRateBps), - GratuitousProbeMinDuration, - GratuitousProbeMaxDuration, - ) - - return true -} - // ------------------------------------------------ type StreamState int @@ -1307,11 +1179,6 @@ type Track struct { publisherID livekit.ParticipantID logger logger.Logger - highestSN uint32 - packetsLost uint32 - lastHighestSN uint32 - lastPacketsLost uint32 - maxLayers VideoLayers totalPackets uint32 @@ -1346,29 +1213,10 @@ func (t *Track) PublisherID() livekit.ParticipantID { return t.publisherID } -// LK-TODO this should probably be maintained in downTrack and this module can query what it needs -func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) { - t.lastHighestSN = t.highestSN - t.lastPacketsLost = t.packetsLost - - for _, report := range rr.Reports { - if report.LastSequenceNumber > t.highestSN { - t.highestSN = report.LastSequenceNumber - } - if report.TotalLost > t.packetsLost { - t.packetsLost = report.TotalLost - } - } -} - func (t *Track) UpdateMaxLayers(layers VideoLayers) { t.maxLayers = layers } -func (t *Track) GetPacketStats() (uint32, uint32) { - return t.highestSN - t.lastHighestSN, t.packetsLost - t.lastPacketsLost -} - func (t *Track) WritePaddingRTP(bytesToSend int) int { return t.downTrack.WritePaddingRTP(bytesToSend) }