Stream priority (#448)

This commit is contained in:
Raja Subramanian
2022-02-19 13:17:55 +05:30
committed by GitHub
parent c20fb237d5
commit 1e459e91cc

View File

@@ -38,6 +38,11 @@ const (
ProbeMinBps = 200 * 1000 // 200 kbps
ProbeMinDuration = 20 * time.Second
ProbeMaxDuration = 21 * time.Second
PriorityMin = uint8(1)
PriorityMax = uint8(255)
PriorityDefaultScreenshare = PriorityMax
PriorityDefaultVideo = PriorityMin
)
type State int
@@ -63,6 +68,7 @@ type Signal int
const (
SignalAddTrack Signal = iota
SignalRemoveTrack
SignalSetTrackPriority
SignalEstimate
SignalTargetBitrate
SignalAvailableLayersChange
@@ -79,6 +85,8 @@ func (s Signal) String() string {
return "ADD_TRACK"
case SignalRemoveTrack:
return "REMOVE_TRACK"
case SignalSetTrackPriority:
return "SET_TRACK_PRIORITY"
case SignalEstimate:
return "ESTIMATE"
case SignalTargetBitrate:
@@ -138,9 +146,7 @@ type StreamAllocator struct {
channelObserver *ChannelObserver
videoTracks map[livekit.TrackID]*Track
exemptVideoTracksSorted TrackSorter
managedVideoTracksSorted TrackSorter
videoTracks map[livekit.TrackID]*Track
state State
@@ -198,6 +204,7 @@ func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator) {
type AddTrackParams struct {
Source livekit.TrackSource
Priority uint8
IsSimulcast bool
PublisherID livekit.ParticipantID
}
@@ -226,6 +233,14 @@ func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) {
})
}
func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8) {
s.postEvent(Event{
Signal: SignalSetTrackPriority,
DownTrack: downTrack,
Data: priority,
})
}
func (s *StreamAllocator) resetState() {
s.resetProbe()
@@ -340,6 +355,8 @@ func (s *StreamAllocator) handleEvent(event *Event) {
s.handleSignalAddTrack(event)
case SignalRemoveTrack:
s.handleSignalRemoveTrack(event)
case SignalSetTrackPriority:
s.handleSignalSetTrackPriority(event)
case SignalEstimate:
s.handleSignalEstimate(event)
case SignalTargetBitrate:
@@ -365,20 +382,12 @@ func (s *StreamAllocator) handleSignalAddTrack(event *Event) {
}
params, _ := event.Data.(AddTrackParams)
isManaged := (params.Source != livekit.TrackSource_SCREEN_SHARE && params.Source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || params.IsSimulcast
track := newTrack(event.DownTrack, isManaged, params.PublisherID, s.params.Logger)
track := newTrack(event.DownTrack, params.Source, params.IsSimulcast, params.PublisherID, s.params.Logger)
track.SetPriority(params.Priority)
trackID := livekit.TrackID(event.DownTrack.ID())
s.videoTracks[trackID] = track
if isManaged {
s.managedVideoTracksSorted = append(s.managedVideoTracksSorted, track)
sort.Sort(s.managedVideoTracksSorted)
} else {
s.exemptVideoTracksSorted = append(s.exemptVideoTracksSorted, track)
sort.Sort(s.exemptVideoTracksSorted)
}
s.allocateTrack(track)
}
@@ -388,37 +397,10 @@ func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) {
}
trackID := livekit.TrackID(event.DownTrack.ID())
track, ok := s.videoTracks[trackID]
if !ok {
return
}
delete(s.videoTracks, trackID)
if track.IsManaged() {
n := len(s.managedVideoTracksSorted)
for idx, videoTrack := range s.managedVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.managedVideoTracksSorted[idx] = s.managedVideoTracksSorted[n-1]
s.managedVideoTracksSorted = s.managedVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.managedVideoTracksSorted)
} else {
n := len(s.exemptVideoTracksSorted)
for idx, videoTrack := range s.exemptVideoTracksSorted {
if videoTrack.DownTrack() == event.DownTrack {
s.exemptVideoTracksSorted[idx] = s.exemptVideoTracksSorted[n-1]
s.exemptVideoTracksSorted = s.exemptVideoTracksSorted[:n-1]
break
}
}
sort.Sort(s.exemptVideoTracksSorted)
}
// re-initialize estimate if all managed tracks are removed, let it get a fresh start
if len(s.managedVideoTracksSorted) == 0 {
if len(s.getSorted()) == 0 {
s.resetState()
return
}
@@ -427,6 +409,27 @@ func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) {
s.adjustState()
}
func (s *StreamAllocator) handleSignalSetTrackPriority(event *Event) {
trackID := livekit.TrackID(event.DownTrack.ID())
track, ok := s.videoTracks[trackID]
if !ok {
return
}
priority, _ := event.Data.(uint8)
changed := track.SetPriority(priority)
if changed && s.state == StateDeficient {
// do a full allocation on a track priority change to keep it simple
// LK-TODO-START
// When in a large room, subscriber could be adjusting priority of
// a lot of tracks in quick succession. That could trigger allocation burst.
// Find ways to avoid it.
// LK-TODO-END
s.allocateAllTracks()
}
}
func (s *StreamAllocator) handleSignalEstimate(event *Event) {
//
// Channel capacity is estimated at a peer connection level. All down tracks
@@ -452,7 +455,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) {
//
// if there are no video tracks, ignore any straggler REMB
if len(s.managedVideoTracksSorted) == 0 {
if len(s.videoTracks) == 0 {
return
}
@@ -523,11 +526,6 @@ func (s *StreamAllocator) handleSignalSubscribedLayersChange(event *Event) {
layers := event.Data.(VideoLayers)
track.UpdateMaxLayers(layers)
if track.IsManaged() {
sort.Sort(s.managedVideoTracksSorted)
} else {
sort.Sort(s.exemptVideoTracksSorted)
}
s.allocateTrack(track)
}
@@ -605,8 +603,8 @@ func (s *StreamAllocator) setState(state State) {
}
func (s *StreamAllocator) adjustState() {
for _, videoTrack := range s.managedVideoTracksSorted {
if videoTrack.IsDeficient() {
for _, track := range s.videoTracks {
if track.IsDeficient() {
s.setState(StateDeficient)
return
}
@@ -745,17 +743,10 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
// This track is currently not streaming and needs bits to start.
// Try to redistribute starting with tracks that are closest to their desired.
//
var minDistanceSorted MinDistanceSorter
for _, t := range s.managedVideoTracksSorted {
if t != track {
minDistanceSorted = append(minDistanceSorted, t)
}
}
sort.Sort(minDistanceSorted)
bandwidthAcquired := int64(0)
var contributingTracks []*Track
minDistanceSorted := s.getMinDistanceSorted(track)
for _, t := range minDistanceSorted {
t.ProvisionalAllocatePrepare()
}
@@ -840,19 +831,9 @@ func (s *StreamAllocator) finalizeProbe() {
return
}
var maxDistanceSorted MaxDistanceSorter
for _, track := range s.managedVideoTracksSorted {
maxDistanceSorted = append(maxDistanceSorted, track)
}
sort.Sort(maxDistanceSorted)
update := NewStreamStateUpdate()
for _, track := range maxDistanceSorted {
if !track.IsDeficient() {
continue
}
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity)
if !boosted {
continue
@@ -894,10 +875,14 @@ func (s *StreamAllocator) allocateAllTracks() {
availableChannelCapacity := s.committedChannelCapacity
//
// This pass is just to find out if there is any leftover channel capacity.
// This pass is find out if there is any leftover channel capacity after allocating exempt tracks.
// Infinite channel capacity is given so that exempt tracks do not stall
//
for _, track := range s.exemptVideoTracksSorted {
for _, track := range s.videoTracks {
if track.IsManaged() {
continue
}
allocation := track.Allocate(ChannelCapacityInfinity, s.params.Config.AllowPause)
update.HandleStreamingChange(allocation.change, track)
@@ -910,12 +895,17 @@ func (s *StreamAllocator) allocateAllTracks() {
}
if availableChannelCapacity == 0 && s.params.Config.AllowPause {
// nothing left for managed tracks, pause them all
for _, track := range s.managedVideoTracksSorted {
for _, track := range s.videoTracks {
if !track.IsManaged() {
continue
}
allocation := track.Pause()
update.HandleStreamingChange(allocation.change, track)
}
} else {
for _, track := range s.managedVideoTracksSorted {
sorted := s.getSorted()
for _, track := range sorted {
track.ProvisionalAllocatePrepare()
}
@@ -926,7 +916,7 @@ func (s *StreamAllocator) allocateAllTracks() {
temporal: temporal,
}
for _, track := range s.managedVideoTracksSorted {
for _, track := range sorted {
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layers, s.params.Config.AllowPause)
availableChannelCapacity -= usedChannelCapacity
if availableChannelCapacity < 0 {
@@ -936,7 +926,7 @@ func (s *StreamAllocator) allocateAllTracks() {
}
}
for _, track := range s.managedVideoTracksSorted {
for _, track := range sorted {
allocation := track.ProvisionalAllocateCommit()
update.HandleStreamingChange(allocation.change, track)
}
@@ -962,11 +952,7 @@ func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
}
func (s *StreamAllocator) finalizeTracks() {
for _, t := range s.exemptVideoTracksSorted {
t.FinalizeAllocate()
}
for _, t := range s.managedVideoTracksSorted {
for _, t := range s.videoTracks {
t.FinalizeAllocate()
}
@@ -1067,18 +1053,8 @@ func (s *StreamAllocator) maybeProbe() {
}
func (s *StreamAllocator) maybeProbeWithMedia() {
var maxDistanceSorted MaxDistanceSorter
for _, track := range s.managedVideoTracksSorted {
maxDistanceSorted = append(maxDistanceSorted, track)
}
sort.Sort(maxDistanceSorted)
// boost deficient track farthest from desired layers
for _, track := range maxDistanceSorted {
if !track.IsDeficient() {
continue
}
for _, track := range s.getMaxDistanceSortedDeficient() {
allocation, boosted := track.AllocateNextHigher(ChannelCapacityInfinity)
if !boosted {
continue
@@ -1094,18 +1070,8 @@ func (s *StreamAllocator) maybeProbeWithMedia() {
}
func (s *StreamAllocator) maybeProbeWithPadding() {
var maxDistanceSorted MaxDistanceSorter
for _, track := range s.managedVideoTracksSorted {
maxDistanceSorted = append(maxDistanceSorted, track)
}
sort.Sort(maxDistanceSorted)
// use deficient track farthest from desired layers to find how much to probe
for _, track := range maxDistanceSorted {
if !track.IsDeficient() {
continue
}
for _, track := range s.getMaxDistanceSortedDeficient() {
transition, available := track.GetNextHigherTransition()
if !available || transition.bandwidthDelta < 0 {
continue
@@ -1127,6 +1093,51 @@ func (s *StreamAllocator) maybeProbeWithPadding() {
}
}
func (s *StreamAllocator) getSorted() TrackSorter {
var trackSorter TrackSorter
for _, track := range s.videoTracks {
if !track.IsManaged() {
continue
}
trackSorter = append(trackSorter, track)
}
sort.Sort(trackSorter)
return trackSorter
}
func (s *StreamAllocator) getMinDistanceSorted(exclude *Track) MinDistanceSorter {
var minDistanceSorter MinDistanceSorter
for _, track := range s.videoTracks {
if !track.IsManaged() || track == exclude {
continue
}
minDistanceSorter = append(minDistanceSorter, track)
}
sort.Sort(minDistanceSorter)
return minDistanceSorter
}
func (s *StreamAllocator) getMaxDistanceSortedDeficient() MaxDistanceSorter {
var maxDistanceSorter MaxDistanceSorter
for _, track := range s.videoTracks {
if !track.IsManaged() || !track.IsDeficient() {
continue
}
maxDistanceSorter = append(maxDistanceSorter, track)
}
sort.Sort(maxDistanceSorter)
return maxDistanceSorter
}
// ------------------------------------------------
type StreamState int
@@ -1175,7 +1186,9 @@ func (s *StreamStateUpdate) Empty() bool {
type Track struct {
downTrack *DownTrack
isManaged bool
source livekit.TrackSource
isSimulcast bool
priority uint8
publisherID livekit.ParticipantID
logger logger.Logger
@@ -1185,24 +1198,56 @@ type Track struct {
totalRepeatedNacks uint32
}
func newTrack(downTrack *DownTrack, isManaged bool, publisherID livekit.ParticipantID, logger logger.Logger) *Track {
func newTrack(
downTrack *DownTrack,
source livekit.TrackSource,
isSimulcast bool,
publisherID livekit.ParticipantID,
logger logger.Logger,
) *Track {
t := &Track{
downTrack: downTrack,
isManaged: isManaged,
source: source,
isSimulcast: isSimulcast,
publisherID: publisherID,
logger: logger,
}
t.SetPriority(0)
t.UpdateMaxLayers(downTrack.MaxLayers())
return t
}
func (t *Track) SetPriority(priority uint8) bool {
if priority == 0 {
switch t.source {
case livekit.TrackSource_SCREEN_SHARE:
priority = PriorityDefaultScreenshare
case livekit.TrackSource_SCREEN_SHARE_AUDIO:
priority = PriorityDefaultScreenshare
default:
priority = PriorityDefaultVideo
}
}
if t.priority == priority {
return false
}
t.priority = priority
return true
}
func (t *Track) Priority() uint8 {
return t.priority
}
func (t *Track) DownTrack() *DownTrack {
return t.downTrack
}
func (t *Track) IsManaged() bool {
return t.isManaged
return (t.source != livekit.TrackSource_SCREEN_SHARE && t.source != livekit.TrackSource_SCREEN_SHARE_AUDIO) || t.isSimulcast
}
func (t *Track) ID() livekit.TrackID {
@@ -1298,6 +1343,14 @@ func (t TrackSorter) Swap(i, j int) {
}
func (t TrackSorter) Less(i, j int) bool {
//
// TrackSorter is used to allocate layer-by-layer.
// So, higher priority track should come earlier so that it gets an earlier shot at each layer
//
if t[i].priority != t[j].priority {
return t[i].priority > t[j].priority
}
if t[i].maxLayers.spatial != t[j].maxLayers.spatial {
return t[i].maxLayers.spatial > t[j].maxLayers.spatial
}
@@ -1318,6 +1371,14 @@ func (m MaxDistanceSorter) Swap(i, j int) {
}
func (m MaxDistanceSorter) Less(i, j int) bool {
//
// MaxDistanceSorter is used to find a deficient track to use for probing during recovery from congestion.
// So, higher priority track should come earlier so that they have a chance to recover sooner.
//
if m[i].priority != m[j].priority {
return m[i].priority > m[j].priority
}
return m[i].DistanceToDesired() > m[j].DistanceToDesired()
}
@@ -1334,6 +1395,14 @@ func (m MinDistanceSorter) Swap(i, j int) {
}
func (m MinDistanceSorter) Less(i, j int) bool {
//
// MinDistanceSorter is used to find excess bandwidth in cooperative allocation.
// So, lower priority track should come earlier so that they contribute bandwidth to higher priority tracks.
//
if m[i].priority != m[j].priority {
return m[i].priority < m[j].priority
}
return m[i].DistanceToDesired() < m[j].DistanceToDesired()
}