From 1e459e91ccc0c986b5fb38e9e31663b85c48bfbc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 19 Feb 2022 13:17:55 +0530 Subject: [PATCH] Stream priority (#448) --- pkg/sfu/streamallocator.go | 279 +++++++++++++++++++++++-------------- 1 file changed, 174 insertions(+), 105 deletions(-) diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 567e9fdbb..fa5fc7306 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -38,6 +38,11 @@ const ( ProbeMinBps = 200 * 1000 // 200 kbps ProbeMinDuration = 20 * time.Second ProbeMaxDuration = 21 * time.Second + + PriorityMin = uint8(1) + PriorityMax = uint8(255) + PriorityDefaultScreenshare = PriorityMax + PriorityDefaultVideo = PriorityMin ) type State int @@ -63,6 +68,7 @@ type Signal int const ( SignalAddTrack Signal = iota SignalRemoveTrack + SignalSetTrackPriority SignalEstimate SignalTargetBitrate SignalAvailableLayersChange @@ -79,6 +85,8 @@ func (s Signal) String() string { return "ADD_TRACK" case SignalRemoveTrack: return "REMOVE_TRACK" + case SignalSetTrackPriority: + return "SET_TRACK_PRIORITY" case SignalEstimate: return "ESTIMATE" case SignalTargetBitrate: @@ -138,9 +146,7 @@ type StreamAllocator struct { channelObserver *ChannelObserver - videoTracks map[livekit.TrackID]*Track - exemptVideoTracksSorted TrackSorter - managedVideoTracksSorted TrackSorter + videoTracks map[livekit.TrackID]*Track state State @@ -198,6 +204,7 @@ func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator) { type AddTrackParams struct { Source livekit.TrackSource + Priority uint8 IsSimulcast bool PublisherID livekit.ParticipantID } @@ -226,6 +233,14 @@ func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) { }) } +func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8) { + s.postEvent(Event{ + Signal: SignalSetTrackPriority, + DownTrack: downTrack, + Data: priority, + }) +} + func (s *StreamAllocator) resetState() { s.resetProbe() @@ -340,6 +355,8 @@ func (s *StreamAllocator) handleEvent(event *Event) { s.handleSignalAddTrack(event) case SignalRemoveTrack: s.handleSignalRemoveTrack(event) + case SignalSetTrackPriority: + s.handleSignalSetTrackPriority(event) case SignalEstimate: s.handleSignalEstimate(event) case SignalTargetBitrate: @@ -365,20 +382,12 @@ func (s *StreamAllocator) handleSignalAddTrack(event *Event) { } params, _ := event.Data.(AddTrackParams) - isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast - track := newTrack(event.DownTrack, isManaged, params.PublisherID, s.params.Logger) + track := newTrack(event.DownTrack, params.Source, params.IsSimulcast, params.PublisherID, s.params.Logger) + track.SetPriority(params.Priority) trackID := livekit.TrackID(event.DownTrack.ID()) s.videoTracks[trackID] = track - if isManaged { - s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track) - sort.Sort(s.managedVideoTracksSorted) - } else { - s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track) - sort.Sort(s.exemptVideoTracksSorted) - } - s.allocateTrack(track) } @@ -388,37 +397,10 @@ func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) { } trackID := livekit.TrackID(event.DownTrack.ID()) - track, ok := s.videoTracks[trackID] - if !ok { - return - } - delete(s.videoTracks, trackID) - if track.IsManaged() { - n := len(s.managedVideoTracksSorted) - for idx, videoTrack := range s.managedVideoTracksSorted { - if videoTrack.DownTrack() == event.DownTrack { - s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1] - s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1] - break - } - } - sort.Sort(s.managedVideoTracksSorted) - } else { - n := len(s.exemptVideoTracksSorted) - for idx, videoTrack := range s.exemptVideoTracksSorted { - if videoTrack.DownTrack() == event.DownTrack { - s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1] - s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1] - break - } - } - sort.Sort(s.exemptVideoTracksSorted) - } - // re-initialize estimate if all managed tracks are removed, let it get a fresh start - if len(s.managedVideoTracksSorted) == 0 { + if len(s.getSorted()) == 0 { s.resetState() return } @@ -427,6 +409,27 @@ func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) { s.adjustState() } +func (s *StreamAllocator) handleSignalSetTrackPriority(event *Event) { + trackID := livekit.TrackID(event.DownTrack.ID()) + track, ok := s.videoTracks[trackID] + if !ok { + return + } + + priority, _ := event.Data.(uint8) + changed := track.SetPriority(priority) + if changed && s.state == StateDeficient { + // do a full allocation on a track priority change to keep it simple + // LK-TODO-START + // When in a large room, subscriber could be adjusting priority of + // a lot of tracks in quick succession. That could trigger allocation burst. + // Find ways to avoid it. + // LK-TODO-END + s.allocateAllTracks() + } + +} + func (s *StreamAllocator) handleSignalEstimate(event *Event) { // // Channel capacity is estimated at a peer connection level. All down tracks @@ -452,7 +455,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { // // if there are no video tracks, ignore any straggler REMB - if len(s.managedVideoTracksSorted) == 0 { + if len(s.videoTracks) == 0 { return } @@ -523,11 +526,6 @@ func (s *StreamAllocator) handleSignalSubscribedLayersChange(event *Event) { layers := event.Data.(VideoLayers) track.UpdateMaxLayers(layers) - if track.IsManaged() { - sort.Sort(s.managedVideoTracksSorted) - } else { - sort.Sort(s.exemptVideoTracksSorted) - } s.allocateTrack(track) } @@ -605,8 +603,8 @@ func (s *StreamAllocator) setState(state State) { } func (s *StreamAllocator) adjustState() { - for _, videoTrack := range s.managedVideoTracksSorted { - if videoTrack.IsDeficient() { + for _, track := range s.videoTracks { + if track.IsDeficient() { s.setState(StateDeficient) return } @@ -745,17 +743,10 @@ func (s *StreamAllocator) allocateTrack(track *Track) { // This track is currently not streaming and needs bits to start. // Try to redistribute starting with tracks that are closest to their desired. // - var minDistanceSorted MinDistanceSorter - for _, t := range s.managedVideoTracksSorted { - if t != track { - minDistanceSorted = append(minDistanceSorted, t) - } - } - sort.Sort(minDistanceSorted) - bandwidthAcquired := int64(0) var contributingTracks []*Track + minDistanceSorted := s.getMinDistanceSorted(track) for _, t := range minDistanceSorted { t.ProvisionalAllocatePrepare() } @@ -840,19 +831,9 @@ func (s *StreamAllocator) finalizeProbe() { return } - var maxDistanceSorted MaxDistanceSorter - for _, track := range s.managedVideoTracksSorted { - maxDistanceSorted = append(maxDistanceSorted, track) - } - sort.Sort(maxDistanceSorted) - update := NewStreamStateUpdate() - for _, track := range maxDistanceSorted { - if !track.IsDeficient() { - continue - } - + for _, track := range s.getMaxDistanceSortedDeficient() { allocation, boosted := track.AllocateNextHigher(availableChannelCapacity) if !boosted { continue @@ -894,10 +875,14 @@ func (s *StreamAllocator) allocateAllTracks() { availableChannelCapacity := s.committedChannelCapacity // - // This pass is just to find out if there is any leftover channel capacity. + // This pass is find out if there is any leftover channel capacity after allocating exempt tracks. // Infinite channel capacity is given so that exempt tracks do not stall // - for _, track := range s.exemptVideoTracksSorted { + for _, track := range s.videoTracks { + if track.IsManaged() { + continue + } + allocation := track.Allocate(ChannelCapacityInfinity, s.params.Config.AllowPause) update.HandleStreamingChange(allocation.change, track) @@ -910,12 +895,17 @@ func (s *StreamAllocator) allocateAllTracks() { } if availableChannelCapacity == 0 && s.params.Config.AllowPause { // nothing left for managed tracks, pause them all - for _, track := range s.managedVideoTracksSorted { + for _, track := range s.videoTracks { + if !track.IsManaged() { + continue + } + allocation := track.Pause() update.HandleStreamingChange(allocation.change, track) } } else { - for _, track := range s.managedVideoTracksSorted { + sorted := s.getSorted() + for _, track := range sorted { track.ProvisionalAllocatePrepare() } @@ -926,7 +916,7 @@ func (s *StreamAllocator) allocateAllTracks() { temporal: temporal, } - for _, track := range s.managedVideoTracksSorted { + for _, track := range sorted { usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layers, s.params.Config.AllowPause) availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { @@ -936,7 +926,7 @@ func (s *StreamAllocator) allocateAllTracks() { } } - for _, track := range s.managedVideoTracksSorted { + for _, track := range sorted { allocation := track.ProvisionalAllocateCommit() update.HandleStreamingChange(allocation.change, track) } @@ -962,11 +952,7 @@ func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) { } func (s *StreamAllocator) finalizeTracks() { - for _, t := range s.exemptVideoTracksSorted { - t.FinalizeAllocate() - } - - for _, t := range s.managedVideoTracksSorted { + for _, t := range s.videoTracks { t.FinalizeAllocate() } @@ -1067,18 +1053,8 @@ func (s *StreamAllocator) maybeProbe() { } func (s *StreamAllocator) maybeProbeWithMedia() { - var maxDistanceSorted MaxDistanceSorter - for _, track := range s.managedVideoTracksSorted { - maxDistanceSorted = append(maxDistanceSorted, track) - } - sort.Sort(maxDistanceSorted) - // boost deficient track farthest from desired layers - for _, track := range maxDistanceSorted { - if !track.IsDeficient() { - continue - } - + for _, track := range s.getMaxDistanceSortedDeficient() { allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity) if !boosted { continue @@ -1094,18 +1070,8 @@ func (s *StreamAllocator) maybeProbeWithMedia() { } func (s *StreamAllocator) maybeProbeWithPadding() { - var maxDistanceSorted MaxDistanceSorter - for _, track := range s.managedVideoTracksSorted { - maxDistanceSorted = append(maxDistanceSorted, track) - } - sort.Sort(maxDistanceSorted) - // use deficient track farthest from desired layers to find how much to probe - for _, track := range maxDistanceSorted { - if !track.IsDeficient() { - continue - } - + for _, track := range s.getMaxDistanceSortedDeficient() { transition, available := track.GetNextHigherTransition() if !available || transition.bandwidthDelta < 0 { continue @@ -1127,6 +1093,51 @@ func (s *StreamAllocator) maybeProbeWithPadding() { } } +func (s *StreamAllocator) getSorted() TrackSorter { + var trackSorter TrackSorter + for _, track := range s.videoTracks { + if !track.IsManaged() { + continue + } + + trackSorter = append(trackSorter, track) + } + + sort.Sort(trackSorter) + + return trackSorter +} + +func (s *StreamAllocator) getMinDistanceSorted(exclude *Track) MinDistanceSorter { + var minDistanceSorter MinDistanceSorter + for _, track := range s.videoTracks { + if !track.IsManaged() || track == exclude { + continue + } + + minDistanceSorter = append(minDistanceSorter, track) + } + + sort.Sort(minDistanceSorter) + + return minDistanceSorter +} + +func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter { + var maxDistanceSorter MaxDistanceSorter + for _, track := range s.videoTracks { + if !track.IsManaged() || !track.IsDeficient() { + continue + } + + maxDistanceSorter = append(maxDistanceSorter, track) + } + + sort.Sort(maxDistanceSorter) + + return maxDistanceSorter +} + // ------------------------------------------------ type StreamState int @@ -1175,7 +1186,9 @@ func (s *StreamStateUpdate) Empty() bool { type Track struct { downTrack *DownTrack - isManaged bool + source livekit.TrackSource + isSimulcast bool + priority uint8 publisherID livekit.ParticipantID logger logger.Logger @@ -1185,24 +1198,56 @@ type Track struct { totalRepeatedNacks uint32 } -func newTrack(downTrack *DownTrack, isManaged bool, publisherID livekit.ParticipantID, logger logger.Logger) *Track { +func newTrack( + downTrack *DownTrack, + source livekit.TrackSource, + isSimulcast bool, + publisherID livekit.ParticipantID, + logger logger.Logger, +) *Track { t := &Track{ downTrack: downTrack, - isManaged: isManaged, + source: source, + isSimulcast: isSimulcast, publisherID: publisherID, logger: logger, } + t.SetPriority(0) t.UpdateMaxLayers(downTrack.MaxLayers()) return t } +func (t *Track) SetPriority(priority uint8) bool { + if priority == 0 { + switch t.source { + case livekit.TrackSource_SCREEN_SHARE: + priority = PriorityDefaultScreenshare + case livekit.TrackSource_SCREEN_SHARE_AUDIO: + priority = PriorityDefaultScreenshare + default: + priority = PriorityDefaultVideo + } + } + + if t.priority == priority { + return false + } + + t.priority = priority + return true +} + +func (t *Track) Priority() uint8 { + return t.priority +} + func (t *Track) DownTrack() *DownTrack { return t.downTrack } func (t *Track) IsManaged() bool { - return t.isManaged + return (t.source != livekit.TrackSource_SCREEN_SHARE && t.source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || t.isSimulcast } func (t *Track) ID() livekit.TrackID { @@ -1298,6 +1343,14 @@ func (t TrackSorter) Swap(i, j int) { } func (t TrackSorter) Less(i, j int) bool { + // + // TrackSorter is used to allocate layer-by-layer. + // So, higher priority track should come earlier so that it gets an earlier shot at each layer + // + if t[i].priority != t[j].priority { + return t[i].priority > t[j].priority + } + if t[i].maxLayers.spatial != t[j].maxLayers.spatial { return t[i].maxLayers.spatial > t[j].maxLayers.spatial } @@ -1318,6 +1371,14 @@ func (m MaxDistanceSorter) Swap(i, j int) { } func (m MaxDistanceSorter) Less(i, j int) bool { + // + // MaxDistanceSorter is used to find a deficient track to use for probing during recovery from congestion. + // So, higher priority track should come earlier so that they have a chance to recover sooner. + // + if m[i].priority != m[j].priority { + return m[i].priority > m[j].priority + } + return m[i].DistanceToDesired() > m[j].DistanceToDesired() } @@ -1334,6 +1395,14 @@ func (m MinDistanceSorter) Swap(i, j int) { } func (m MinDistanceSorter) Less(i, j int) bool { + // + // MinDistanceSorter is used to find excess bandwidth in cooperative allocation. + // So, lower priority track should come earlier so that they contribute bandwidth to higher priority tracks. + // + if m[i].priority != m[j].priority { + return m[i].priority < m[j].priority + } + return m[i].DistanceToDesired() < m[j].DistanceToDesired() }