diff --git a/pkg/routing/selector/utils.go b/pkg/routing/selector/utils.go index 88b6f29bd..cf6738b59 100644 --- a/pkg/routing/selector/utils.go +++ b/pkg/routing/selector/utils.go @@ -16,7 +16,7 @@ package selector import ( "math/rand/v2" - "sort" + "slices" "time" "github.com/thoas/go-funk" @@ -24,6 +24,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/utils" ) const AvailableSeconds = 5 @@ -124,42 +125,42 @@ func selectLowestSortedNode(nodes []*livekit.Node, sortBy string) (*livekit.Node idx := funk.RandomInt(0, len(nodes)) return nodes[idx], nil case "sysload": - sort.Slice(nodes, func(i, j int) bool { - return GetNodeSysload(nodes[i]) < GetNodeSysload(nodes[j]) + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + return utils.Signum(GetNodeSysload(a) - GetNodeSysload(b)) }) return nodes[0], nil case "cpuload": - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.CpuLoad < nodes[j].Stats.CpuLoad + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + return utils.Signum(a.Stats.CpuLoad - b.Stats.CpuLoad) }) return nodes[0], nil case "rooms": - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.NumRooms < nodes[j].Stats.NumRooms + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + return utils.Signum(a.Stats.NumRooms - b.Stats.NumRooms) }) return nodes[0], nil case "clients": - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.NumClients < nodes[j].Stats.NumClients + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + return utils.Signum(a.Stats.NumClients - b.Stats.NumClients) }) return nodes[0], nil case "tracks": - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Stats.NumTracksIn+nodes[i].Stats.NumTracksOut < nodes[j].Stats.NumTracksIn+nodes[j].Stats.NumTracksOut + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + return utils.Signum((a.Stats.NumTracksIn + a.Stats.NumTracksOut) - (b.Stats.NumTracksIn + b.Stats.NumTracksOut)) }) return nodes[0], nil case "bytespersec": - sort.Slice(nodes, func(i, j int) bool { - ratei := &livekit.NodeStatsRate{} - if len(nodes[i].Stats.Rates) > 0 { - ratei = nodes[i].Stats.Rates[0] + slices.SortFunc(nodes, func(a, b *livekit.Node) int { + ratea := &livekit.NodeStatsRate{} + if len(a.Stats.Rates) > 0 { + ratea = a.Stats.Rates[0] } - ratej := &livekit.NodeStatsRate{} - if len(nodes[j].Stats.Rates) > 0 { - ratej = nodes[j].Stats.Rates[0] + rateb := &livekit.NodeStatsRate{} + if len(b.Stats.Rates) > 0 { + rateb = b.Stats.Rates[0] } - return ratei.BytesIn+ratei.BytesOut < ratej.BytesIn+ratej.BytesOut + return utils.Signum((ratea.BytesIn + ratea.BytesOut) - (rateb.BytesIn + rateb.BytesOut)) }) return nodes[0], nil default: diff --git a/pkg/rtc/dynacast/dynacastmanager_test.go b/pkg/rtc/dynacast/dynacastmanager_test.go index 6fffcbe31..6d48e67ea 100644 --- a/pkg/rtc/dynacast/dynacastmanager_test.go +++ b/pkg/rtc/dynacast/dynacastmanager_test.go @@ -15,7 +15,8 @@ package dynacast import ( - "sort" + "slices" + "strings" "sync" "testing" "time" @@ -498,19 +499,25 @@ func TestCodecRegression(t *testing.T) { } func subscribedCodecsAsString(c1 []*livekit.SubscribedCodec) string { - sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec }) - var s1 string + slices.SortFunc(c1, func(a, b *livekit.SubscribedCodec) int { + return strings.Compare(a.Codec, b.Codec) + }) + + var s1 strings.Builder for _, c := range c1 { - s1 += c.String() + s1.WriteString(c.String()) } - return s1 + return s1.String() } func subscribedAudioCodecsAsString(c1 []*livekit.SubscribedAudioCodec) string { - sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec }) - var s1 string + slices.SortFunc(c1, func(a, b *livekit.SubscribedAudioCodec) int { + return strings.Compare(a.Codec, b.Codec) + }) + + var s1 strings.Builder for _, c := range c1 { - s1 += c.String() + s1.WriteString(c.String()) } - return s1 + return s1.String() } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 9c896b644..2fa2650bd 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "slices" - "sort" "strings" "sync" @@ -207,8 +206,8 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority receivers = append(receivers, &simulcastReceiver{TrackReceiver: receiver, priority: priority}) } - sort.Slice(receivers, func(i, j int) bool { - return receivers[i].Priority() < receivers[j].Priority() + slices.SortFunc(receivers, func(a, b *simulcastReceiver) int { + return sutils.Signum(a.Priority() - b.Priority()) }) if mid != "" { @@ -349,8 +348,8 @@ func (t *MediaTrackReceiver) SetPotentialCodecs(codecs []webrtc.RTPCodecParamete }) } } - sort.Slice(receivers, func(i, j int) bool { - return receivers[i].Priority() < receivers[j].Priority() + slices.SortFunc(receivers, func(a, b *simulcastReceiver) int { + return sutils.Signum(a.Priority() - b.Priority()) }) t.receivers = receivers t.lock.Unlock() @@ -1090,9 +1089,7 @@ func (t *MediaTrackReceiver) GetQualityForDimension(mimeType mime.MimeType, widt layerSizes = providedSizes // comparing height always requestedSize = height - sort.Slice(layerSizes, func(i, j int) bool { - return layerSizes[i] < layerSizes[j] - }) + slices.Sort(layerSizes) } // finds the highest layer with smallest dimensions that still satisfy client demands diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index c2007ec3e..7389eb327 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -20,7 +20,6 @@ import ( "maps" "math" "slices" - "sort" "strings" "sync" "time" @@ -393,8 +392,8 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { }) } - sort.Slice(speakers, func(i, j int) bool { - return speakers[i].Level > speakers[j].Level + slices.SortFunc(speakers, func(a, b *livekit.SpeakerInfo) int { + return sutils.Signum(b.Level - a.Level) }) // quantize to smooth out small changes diff --git a/pkg/sfu/buffer/buffer_base.go b/pkg/sfu/buffer/buffer_base.go index 9fae0672d..20e3bc7c9 100644 --- a/pkg/sfu/buffer/buffer_base.go +++ b/pkg/sfu/buffer/buffer_base.go @@ -664,7 +664,7 @@ func (b *BufferBase) SendPLI(force bool) { pliThrottle := b.pliThrottle b.RUnlock() - if (rtpStats == nil && !force) || !rtpStats.CheckAndUpdatePli(pliThrottle, force) { + if rtpStats == nil || !rtpStats.CheckAndUpdatePli(pliThrottle, force) { return } diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index e4934e1f5..842aae777 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -16,7 +16,7 @@ package buffer import ( "fmt" - "sort" + "slices" "sync" "time" @@ -288,8 +288,14 @@ func ProcessFrameDependencyStructure(structure *dd.FrameDependencyStructure) []D } // sort decode target layer by spatial and temporal from high to low - sort.Slice(decodeTargets, func(i, j int) bool { - return decodeTargets[i].Layer.GreaterThan(decodeTargets[j].Layer) + slices.SortFunc(decodeTargets, func(a, b DependencyDescriptorDecodeTarget) int { + if a.Layer.GreaterThan(b.Layer) { + return -1 + } + if b.Layer.GreaterThan(a.Layer) { + return 1 + } + return 0 }) return decodeTargets diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d2cae63a1..a134e16e7 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -159,6 +159,9 @@ func (w *WebRTCReceiver) GetConnectionScoreAndQuality() (float32, livekit.Connec } func (w *WebRTCReceiver) ssrc(layer int) uint32 { + w.upTracksMu.Lock() + defer w.upTracksMu.Unlock() + if track := w.upTracks[layer]; track != nil { return uint32(track.SSRC()) } diff --git a/pkg/sfu/videolayerselector/dependencydescriptor_test.go b/pkg/sfu/videolayerselector/dependencydescriptor_test.go index 91d78d882..925c017dd 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor_test.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor_test.go @@ -15,7 +15,7 @@ package videolayerselector import ( - "sort" + "slices" "testing" "github.com/pion/rtp" @@ -287,8 +287,14 @@ func createDDFrames(maxLayer buffer.VideoLayer, startFrameNumber uint16) []*buff activeBitMask |= 1 << uint(i*int(maxLayer.Temporal+1)+j) } } - sort.Slice(decodeTargets, func(i, j int) bool { - return decodeTargets[i].Layer.GreaterThan(decodeTargets[j].Layer) + slices.SortFunc(decodeTargets, func(a, b buffer.DependencyDescriptorDecodeTarget) int { + if a.Layer.GreaterThan(b.Layer) { + return -1 + } + if b.Layer.GreaterThan(a.Layer) { + return 1 + } + return 0 }) chainDiffs := make([]int, int(maxLayer.Spatial)+1) diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 475819776..59429a0d9 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -310,7 +310,7 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { stat := &livekit.AnalyticsStat{ MinScore: minScore, - MedianScore: utils.MedianFloat32(scores), + MedianScore: utils.Median(scores), Streams: []*livekit.AnalyticsStream{coalescedStream}, Mime: stats[len(stats)-1].Mime, // use the latest Mime } diff --git a/pkg/utils/math.go b/pkg/utils/math.go index 2bfb5a752..b2e116b8c 100644 --- a/pkg/utils/math.go +++ b/pkg/utils/math.go @@ -14,10 +14,10 @@ package utils -import "sort" +import "slices" -// MedianFloat32 gets median value for an array of float32 -func MedianFloat32(input []float32) float32 { +// Median gets median value for an array +func Median[T float32](input []T) T { num := len(input) switch num { case 0: @@ -25,9 +25,7 @@ func MedianFloat32(input []float32) float32 { case 1: return input[0] } - sort.Slice(input, func(i, j int) bool { - return input[i] < input[j] - }) + slices.Sort(input) if num%2 != 0 { return input[num/2] } @@ -35,3 +33,16 @@ func MedianFloat32(input []float32) float32 { right := input[num/2] return (left + right) / 2 } + +func Signum[T int | int32 | float32](val T) int { + switch { + case val < 0: + return -1 + + case val > 0: + return 1 + + default: + return 0 + } +}