From 10893b9b33e4d549a01e4864030e980013fd68b2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 22 Aug 2023 00:03:37 +0530 Subject: [PATCH 1/7] Store referenceLayerSpatial in Forwarder state. (#1986) When restoring state, reference layer could change before this change. That meant the time stamp base would change and cause jumps. But, the solution in this change to store the reference layer state and restoring it has a different issue. It is possible that the reference is layer 2 (HIGH) for example. On a migration when the down track has to re-attach and resume to a moved up stream track, it is possible that layer 2 is not published due to bandwidth constraint after publisher migrates to new node. In that case, the stream cannot be resumed as time stamp adjustment cannot be calculated. An option is to set referenceSpatialLayer always at layer 0 (LOW). But, that also has a couple of issues - Browsers like FF have shown issues with layer mapping. - Layer 0 is lowest bit rate. So, it will have RTCP at lower frequency. That could introduce a slight latency in stream start as we need RTCP sender report to calculate the time stamp. Open to ideas on how to handle this better. --- pkg/sfu/forwarder.go | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 1d5c5ac7f..88876d307 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -154,12 +154,13 @@ type TranslationParams struct { // ------------------------------------------------------------------- type ForwarderState struct { - Started bool - PreStartTime time.Time - FirstTS uint32 - RefTSOffset uint32 - RTP RTPMungerState - Codec interface{} + Started bool + ReferenceLayerSpatial int32 + PreStartTime time.Time + FirstTS uint32 + RefTSOffset uint32 + RTP RTPMungerState + Codec interface{} } func (f ForwarderState) String() string { @@ -168,8 +169,9 @@ func (f ForwarderState) String() string { case codecmunger.VP8State: codecString = codecState.String() } - return fmt.Sprintf("ForwarderState{started: %v, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", + return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", f.Started, + f.ReferenceLayerSpatial, f.PreStartTime.String(), f.FirstTS, f.RefTSOffset, @@ -330,12 +332,13 @@ func (f *Forwarder) GetState() ForwarderState { } return ForwarderState{ - Started: f.started, - PreStartTime: f.preStartTime, - FirstTS: f.firstTS, - RefTSOffset: f.refTSOffset, - RTP: f.rtpMunger.GetLast(), - Codec: f.codecMunger.GetState(), + Started: f.started, + ReferenceLayerSpatial: f.referenceLayerSpatial, + PreStartTime: f.preStartTime, + FirstTS: f.firstTS, + RefTSOffset: f.refTSOffset, + RTP: f.rtpMunger.GetLast(), + Codec: f.codecMunger.GetState(), } } @@ -351,6 +354,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.codecMunger.SeedState(state.Codec) f.started = true + f.referenceLayerSpatial = state.ReferenceLayerSpatial f.preStartTime = state.PreStartTime f.firstTS = state.FirstTS f.refTSOffset = state.RefTSOffset @@ -1451,11 +1455,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e ) } - if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { - // on a resume, reference layer may not be set, so only set when it is invalid - f.referenceLayerSpatial = layer - } - // Compute how much time passed between the previous forwarded packet // and the current incoming (to be forwarded) packet and calculate // timestamp offset on source change. From adead8bb2b99614d3fde91c4e508ba89ec2f8efa Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 22 Aug 2023 17:40:51 -0700 Subject: [PATCH 2/7] v1.4.5 (#1988) --- CHANGELOG | 39 +++++++++++++++++++++++++++++++++++++++ version/version.go | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index f4893fd8c..87f096580 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,45 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.5] - 2023-08-22 + +### Added +- Add ability to roll back video layer selection. (#1871) +- Allow listing ingress by id (#1874) +- E2EE trailer for server injected packets. (#1908) +- Add support for ingress URL pull (#1938 #1939) +- (experimental) Add control of playout delay (#1838 #1930) +- Add option to advertise external ip only (#1962) +- Allow data packet to be sent to participants by identity (#1982) + +### Fixed +- Fix RTC IP when binding to 0.0.0.0 (#1862) +- Prevent anachronous sample reading in connection stats (#1863) +- Fixed resubscribe race due to desire changed before cleaning up (#1865) +- Fixed numPublisher computation by marking dirty after track published changes (#1878) +- Attempt to avoid out-of-order max subscribed layer notifications. (#1882) +- Improved packet loss handling for SVC codecs (#1912 ) +- Frame integrity check for SVC codecs (#1914) +- Issue full reconnect if subscriber PC is closed on ICERestart (#1919) +- Do not post max layer event for audio. (#1932) +- Never use dd tracker for non-svc codec (#1952) +- Fix race condition causing new participants to have stale room metadata (#1969) +- Fixed VP9 handling for non-SVC content. (#1973) +- Ensure older session does not clobber newer session. (#1974) +- Do not start RTPStats on a padding packet. (#1984) + +### Changed +- Push track quality to poor on a bandwidth constrained pause (#1867) +- AV sync improvements (#1875 #1892 #1944 #1951 #1955 #1956 #1968 #1971 #1986) +- Do not send unnecessary room updates when content isn't changed (#1881) +- start reading signal messages before session handler finishes (#1883) +- changing key file permissions control to allow group readable (#1893) +- close disconnected participants when signal channel fails (#1895) +- Improved stream allocator handling during transitions and reallocation. (#1905 #1906) +- Stream allocator tweaks to reduce re-allocation (#1936) +- Reduce NACK traffic by delaying retransmission after first send. (#1918) +- Temper stream allocator more to avoid false negative downgrades (#1920) + ## [1.4.4] - 2023-07-08 ### Added diff --git a/version/version.go b/version/version.go index 1df2fdae5..a4b4ba000 100644 --- a/version/version.go +++ b/version/version.go @@ -14,4 +14,4 @@ package version -const Version = "1.4.4" +const Version = "1.4.5" From 3733c4af6c66fc6df9102a2f6c085aa386c37a7c Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 23 Aug 2023 13:05:33 -0700 Subject: [PATCH 3/7] Update github.com/ua-parser/uap-go digest to f8d2018 (#1972) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index aa8c09fd7..49d11d08b 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thoas/go-funk v0.9.3 github.com/twitchtv/twirp v8.1.3+incompatible - github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f + github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 diff --git a/go.sum b/go.sum index 900588264..35686219f 100644 --- a/go.sum +++ b/go.sum @@ -260,8 +260,8 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= -github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f h1:A+MmlgpvrHLeUP8dkBVn4Pnf5Bp5Yk2OALm7SEJLLE8= -github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E= +github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 h1:YsXCA7ZdgFMgwDpNpYj4y2WPRVrOVVDAkQlFc477T54= +github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/urfave/negroni/v3 v3.0.0 h1:Vo8CeZfu1lFR9gW8GnAb6dOGCJyijfil9j/jKKc/JhU= From 36dadbacb269a4405068a0da2d6019ef060e0035 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 24 Aug 2023 09:09:42 +0530 Subject: [PATCH 4/7] Drop padding only packets on publisher side. (#1990) * Drop padding only packet on publisher side. * add UT * update deps * remove debug * add fast path short cut * correct comment * fix test * fix for Linux --- go.mod | 2 +- go.sum | 4 +- pkg/sfu/buffer/buffer.go | 59 ++++++++------- pkg/sfu/buffer/rtpstats.go | 41 ++++++++--- pkg/sfu/buffer/rtpstats_test.go | 4 +- pkg/sfu/utils/rangemap.go | 123 ++++++++++++++++++++++++++++++++ pkg/sfu/utils/rangemap_test.go | 122 +++++++++++++++++++++++++++++++ 7 files changed, 315 insertions(+), 40 deletions(-) create mode 100644 pkg/sfu/utils/rangemap.go create mode 100644 pkg/sfu/utils/rangemap_test.go diff --git a/go.mod b/go.mod index 49d11d08b..24d5bf8fb 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c + github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 github.com/livekit/protocol v1.6.1 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 diff --git a/go.sum b/go.sum index 35686219f..31050de5f 100644 --- a/go.sum +++ b/go.sum @@ -122,8 +122,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c h1:4udPqCusH93MK/7q8ZfDqcLJHGoQeKKsMi5b+/BpQvk= -github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I= +github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk= github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index baae76b46..957aaa452 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -30,6 +30,7 @@ import ( "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/audio" + "github.com/livekit/livekit-server/pkg/sfu/utils" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/bucket" @@ -80,6 +81,8 @@ type Buffer struct { closed atomic.Bool mime string + snRangeMap *utils.RangeMap[uint32, uint32] + // supported feedbacks latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -124,6 +127,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { mediaSSRC: ssrc, videoPool: vp, audioPool: ap, + snRangeMap: utils.NewRangeMap[uint32, uint32](100), pliThrottle: int64(500 * time.Millisecond), logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU), } @@ -404,42 +408,40 @@ func (b *Buffer) SetRTT(rtt uint32) { } func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { - pktBuf, err := b.bucket.AddPacket(pkt) - if err != nil { - // - // Even when erroring, do - // 1. state update - // 2. TWCC just in case remote side is retransmitting an old packet for probing - // - // But, do not forward those packets - // - var rtpPacket rtp.Packet - if uerr := rtpPacket.Unmarshal(pkt); uerr == nil { - b.updateStreamState(&rtpPacket, arrivalTime) - b.processHeaderExtensions(&rtpPacket, arrivalTime) - } + var rtpPacket rtp.Packet + if err := rtpPacket.Unmarshal(pkt); err != nil { + b.logger.Errorw("could not unmarshal RTP packet", err) + return + } + extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime) + b.processHeaderExtensions(&rtpPacket, arrivalTime) + if !isOutOfOrder && len(rtpPacket.Payload) == 0 { + // drop padding only in-order packet + b.snRangeMap.IncValue(1) + return + } + + // add to RTX buffer using sequence number after accounting for dropped padding only packets + snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber) + if err != nil { + b.logger.Errorw("could not get sequence number adjustment", err) + return + } + rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment) + _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) + if err != nil { if err != bucket.ErrRTXPacket { b.logger.Warnw("could not add RTP packet to bucket", err) } return } - var p rtp.Packet - err = p.Unmarshal(pktBuf) - if err != nil { - b.logger.Warnw("error unmarshaling RTP packet", err) - return - } - - b.updateStreamState(&p, arrivalTime) - b.processHeaderExtensions(&p, arrivalTime) - b.doNACKs() b.doReports(arrivalTime) - ep := b.getExtPacket(&p, arrivalTime) + ep := b.getExtPacket(&rtpPacket, arrivalTime) if ep == nil { return } @@ -497,18 +499,21 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) { flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime) if b.nacker != nil { b.nacker.Remove(p.SequenceNumber) if flowState.HasLoss { + b.snRangeMap.AddRange(flowState.LossStartInclusive, flowState.LossEndExclusive) for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ { - b.nacker.Push(lost) + b.nacker.Push(uint16(lost)) } } } + + return flowState.ExtSeqNumber, flowState.IsOutOfOrder } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 24a0c34a7..91069d377 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -65,8 +65,12 @@ func (d driftResult) String() string { type RTPFlowState struct { HasLoss bool - LossStartInclusive uint16 - LossEndExclusive uint16 + LossStartInclusive uint32 + LossEndExclusive uint32 + + IsOutOfOrder bool + + ExtSeqNumber uint32 } type IntervalStats struct { @@ -449,14 +453,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } } + flowState.IsOutOfOrder = true + + cycles := r.cycles + if rtph.SequenceNumber > r.highestSN { + cycles-- + } + flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, cycles) + // in-order default: - if diff > 1 { - flowState.HasLoss = true - flowState.LossStartInclusive = r.highestSN + 1 - flowState.LossEndExclusive = rtph.SequenceNumber - } - // update gap histogram r.updateGapHistogram(int(diff)) @@ -466,6 +472,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false) + if diff > 1 { + flowState.HasLoss = true + + cycles := r.cycles + if r.highestSN+1 < r.highestSN { + cycles++ + } + flowState.LossStartInclusive = getExtSN(r.highestSN+1, cycles) + } + if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ } @@ -481,6 +497,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa // NOTE: this may not be the first packet with this time stamp if there is packet loss. r.highestTime = packetTime } + + if flowState.HasLoss { + flowState.LossEndExclusive = getExtSN(rtph.SequenceNumber, r.cycles) + } + flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, r.cycles) } if !isDuplicate { @@ -1733,6 +1754,10 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps // ---------------------------------- +func getExtSN(sn uint16, cycles uint16) uint32 { + return (uint32(cycles) << 16) | uint32(sn) +} + func getExtTS(ts uint32, cycles uint32) uint64 { return (uint64(cycles) << 32) | uint64(ts) } diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 389d330c3..74e4774f4 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -126,8 +126,8 @@ func TestRTPStats_Update(t *testing.T) { packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now()) require.True(t, flowState.HasLoss) - require.Equal(t, sequenceNumber-9, flowState.LossStartInclusive) - require.Equal(t, sequenceNumber, flowState.LossEndExclusive) + require.Equal(t, uint32(sequenceNumber-9), flowState.LossStartInclusive) + require.Equal(t, uint32(sequenceNumber), flowState.LossEndExclusive) require.Equal(t, uint32(17), r.packetsLost) // out-of-order should decrement number of lost packets diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go new file mode 100644 index 000000000..75ffd1646 --- /dev/null +++ b/pkg/sfu/utils/rangemap.go @@ -0,0 +1,123 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "errors" + "math" + "unsafe" +) + +const ( + minRanges = 1 +) + +var ( + errReversedOrder = errors.New("end is before start") + errKeyNotFound = errors.New("key not found") +) + +type rangeType interface { + uint32 +} + +type valueType interface { + uint32 +} + +type rangeVal[RT rangeType, VT valueType] struct { + start RT + end RT + value VT +} + +type RangeMap[RT rangeType, VT valueType] struct { + halfRange RT + + size int + ranges []rangeVal[RT, VT] + runningValue VT +} + +func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] { + var t RT + return &RangeMap[RT, VT]{ + halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1), + size: int(math.Max(float64(size), float64(minRanges))), + } +} + +func (r *RangeMap[RT, VT]) IncValue(inc VT) { + r.runningValue += inc +} + +func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error { + if endExclusive-startInclusive > r.halfRange { + return errReversedOrder + } + + isNewRange := true + // check if last range can be extended + if len(r.ranges) != 0 { + lr := &r.ranges[len(r.ranges)-1] + if startInclusive <= lr.end { + return errReversedOrder + } + if lr.value == r.runningValue { + lr.end = endExclusive - 1 + isNewRange = false + } else { + // end last range before start and start a new range + lr.end = startInclusive - 1 + } + } + + if isNewRange { + r.ranges = append(r.ranges, rangeVal[RT, VT]{ + start: startInclusive, + end: endExclusive - 1, + value: r.runningValue, + }) + } + r.prune() + return nil +} + +func (r *RangeMap[RT, VT]) GetValue(key RT) (VT, error) { + numRanges := len(r.ranges) + if numRanges != 0 { + if key > r.ranges[numRanges-1].end { + return r.runningValue, nil + } + + if key < r.ranges[0].start { + return 0, errKeyNotFound + } + } + + for _, rv := range r.ranges { + if key-rv.start < r.halfRange && rv.end-key < r.halfRange { + return rv.value, nil + } + } + + return r.runningValue, nil +} + +func (r *RangeMap[RT, VT]) prune() { + if len(r.ranges) > r.size { + r.ranges = r.ranges[len(r.ranges)-r.size:] + } +} diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go new file mode 100644 index 000000000..dcc6a745d --- /dev/null +++ b/pkg/sfu/utils/rangemap_test.go @@ -0,0 +1,122 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRangeMapUint32(t *testing.T) { + r := NewRangeMap[uint32, uint32](2) + + // getting value for any key should be 0 default + value, err := r.GetValue(33333) + require.NoError(t, err) + require.Equal(t, uint32(0), value) + value, err = r.GetValue(0xffffffff) + require.NoError(t, err) + require.Equal(t, uint32(0), value) + + // getting value for any key should be incremented value + r.IncValue(2) + value, err = r.GetValue(66666666) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + value, err = r.GetValue(0) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + + // add a couple of ranges, as the value is same should just extend + err = r.AddRange(10, 20) + require.NoError(t, err) + err = r.AddRange(30, 40) + require.NoError(t, err) + require.Equal(t, 1, len(r.ranges)) + require.Equal(t, uint32(10), r.ranges[0].start) + require.Equal(t, uint32(39), r.ranges[0].end) + require.Equal(t, uint32(2), r.ranges[0].value) + + // bump value + r.IncValue(1) + // getting value in previously added range should return 2 + value, err = r.GetValue(22) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + // outside range should return 3 + value, err = r.GetValue(662) + require.NoError(t, err) + require.Equal(t, uint32(3), value) + + // adding out-of-order range should return error + err = r.AddRange(60, 50) + require.Error(t, err, errReversedOrder) + + // adding overlapping should return error + err = r.AddRange(30, 50) + require.Error(t, err, errReversedOrder) + + // adding a non-overlapping range should extend previous range and add new one + err = r.AddRange(50, 60) + require.NoError(t, err) + require.Equal(t, 2, len(r.ranges)) + + require.Equal(t, uint32(10), r.ranges[0].start) + require.Equal(t, uint32(49), r.ranges[0].end) + require.Equal(t, uint32(2), r.ranges[0].value) + + require.Equal(t, uint32(50), r.ranges[1].start) + require.Equal(t, uint32(59), r.ranges[1].end) + require.Equal(t, uint32(3), r.ranges[1].value) + + // getting an old value should not succeed, but start of first range should return no error + value, err = r.GetValue(9) + require.Error(t, err, errKeyNotFound) + value, err = r.GetValue(10) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + + // adding another range should prune the first one as size if set to 2 + r.IncValue(10) + err = r.AddRange(1000, 1233) + require.NoError(t, err) + require.Equal(t, 2, len(r.ranges)) + + require.Equal(t, uint32(50), r.ranges[0].start) + require.Equal(t, uint32(999), r.ranges[0].end) + require.Equal(t, uint32(3), r.ranges[0].value) + + require.Equal(t, uint32(1000), r.ranges[1].start) + require.Equal(t, uint32(1232), r.ranges[1].end) + require.Equal(t, uint32(13), r.ranges[1].value) + + // previously valid range should return key not found after pruning + value, err = r.GetValue(10) + require.Error(t, err, errKeyNotFound) + + value, err = r.GetValue(999) + require.NoError(t, err) + require.Equal(t, uint32(3), value) + + value, err = r.GetValue(1200) + require.NoError(t, err) + require.Equal(t, uint32(13), value) + + // something newer than what is in ranges should return running value + value, err = r.GetValue(3000) + require.NoError(t, err) + require.Equal(t, uint32(13), value) +} From 8c99a9e307e583f5ca0ef74e6694d195a78aefaa Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 24 Aug 2023 13:25:49 +0530 Subject: [PATCH 5/7] Move `GetAudioLevel` interface. (#1992) To allow use with RemoteParticipant/RemoteMediaTrack too. --- pkg/rtc/participant.go | 17 ----- pkg/rtc/types/interfaces.go | 6 +- pkg/rtc/types/typesfakes/fake_media_track.go | 70 ++++++++++++++++++++ pkg/rtc/types/typesfakes/fake_participant.go | 70 ++++++++++++++++++++ pkg/rtc/uptrackmanager.go | 16 +++++ 5 files changed, 160 insertions(+), 19 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 359aec5a0..7ba7e50ea 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -886,23 +886,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti // signal connection methods // -func (p *ParticipantImpl) GetAudioLevel() (level float64, active bool) { - level = 0 - for _, pt := range p.GetPublishedTracks() { - mediaTrack := pt.(types.LocalMediaTrack) - if mediaTrack.Source() == livekit.TrackSource_MICROPHONE { - tl, ta := mediaTrack.GetAudioLevel() - if ta { - active = true - if tl > level { - level = tl - } - } - } - } - return -} - func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo { numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index baacff5dd..5275b5ad3 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -243,6 +243,8 @@ type Participant interface { GetPublishedTracks() []MediaTrack RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool) + GetAudioLevel() (smoothedLevel float64, active bool) + // HasPermission checks permission of the subscriber by identity. Returns true if subscriber is allowed to subscribe // to the track with trackID HasPermission(trackID livekit.TrackID, subIdentity livekit.ParticipantIdentity) bool @@ -348,7 +350,6 @@ type LocalParticipant interface { GetSubscribedParticipants() []livekit.ParticipantID IsSubscribedTo(sid livekit.ParticipantID) bool - GetAudioLevel() (smoothedLevel float64, active bool) GetConnectionQuality() *livekit.ConnectionQualityInfo // server sent messages @@ -445,6 +446,8 @@ type MediaTrack interface { UpdateVideoLayers(layers []*livekit.VideoLayer) IsSimulcast() bool + GetAudioLevel() (level float64, active bool) + Close(willBeResumed bool) IsOpen() bool @@ -480,7 +483,6 @@ type LocalMediaTrack interface { SignalCid() string HasSdpCid(cid string) bool - GetAudioLevel() (level float64, active bool) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) SetRTT(rtt uint32) diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index af0c8c1b0..fd472c52e 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -48,6 +48,18 @@ type FakeMediaTrack struct { getAllSubscribersReturnsOnCall map[int]struct { result1 []livekit.ParticipantID } + GetAudioLevelStub func() (float64, bool) + getAudioLevelMutex sync.RWMutex + getAudioLevelArgsForCall []struct { + } + getAudioLevelReturns struct { + result1 float64 + result2 bool + } + getAudioLevelReturnsOnCall map[int]struct { + result1 float64 + result2 bool + } GetNumSubscribersStub func() int getNumSubscribersMutex sync.RWMutex getNumSubscribersArgsForCall []struct { @@ -478,6 +490,62 @@ func (fake *FakeMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []live }{result1} } +func (fake *FakeMediaTrack) GetAudioLevel() (float64, bool) { + fake.getAudioLevelMutex.Lock() + ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] + fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct { + }{}) + stub := fake.GetAudioLevelStub + fakeReturns := fake.getAudioLevelReturns + fake.recordInvocation("GetAudioLevel", []interface{}{}) + fake.getAudioLevelMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeMediaTrack) GetAudioLevelCallCount() int { + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() + return len(fake.getAudioLevelArgsForCall) +} + +func (fake *FakeMediaTrack) GetAudioLevelCalls(stub func() (float64, bool)) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = stub +} + +func (fake *FakeMediaTrack) GetAudioLevelReturns(result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + fake.getAudioLevelReturns = struct { + result1 float64 + result2 bool + }{result1, result2} +} + +func (fake *FakeMediaTrack) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + if fake.getAudioLevelReturnsOnCall == nil { + fake.getAudioLevelReturnsOnCall = make(map[int]struct { + result1 float64 + result2 bool + }) + } + fake.getAudioLevelReturnsOnCall[i] = struct { + result1 float64 + result2 bool + }{result1, result2} +} + func (fake *FakeMediaTrack) GetNumSubscribers() int { fake.getNumSubscribersMutex.Lock() ret, specificReturn := fake.getNumSubscribersReturnsOnCall[len(fake.getNumSubscribersArgsForCall)] @@ -1640,6 +1708,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.closeMutex.RUnlock() fake.getAllSubscribersMutex.RLock() defer fake.getAllSubscribersMutex.RUnlock() + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() fake.getNumSubscribersMutex.RLock() defer fake.getNumSubscribersMutex.RUnlock() fake.getQualityForDimensionMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index a660644cb..fa92204fe 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -43,6 +43,18 @@ type FakeParticipant struct { debugInfoReturnsOnCall map[int]struct { result1 map[string]interface{} } + GetAudioLevelStub func() (float64, bool) + getAudioLevelMutex sync.RWMutex + getAudioLevelArgsForCall []struct { + } + getAudioLevelReturns struct { + result1 float64 + result2 bool + } + getAudioLevelReturnsOnCall map[int]struct { + result1 float64 + result2 bool + } GetPublishedTrackStub func(livekit.TrackID) types.MediaTrack getPublishedTrackMutex sync.RWMutex getPublishedTrackArgsForCall []struct { @@ -377,6 +389,62 @@ func (fake *FakeParticipant) DebugInfoReturnsOnCall(i int, result1 map[string]in }{result1} } +func (fake *FakeParticipant) GetAudioLevel() (float64, bool) { + fake.getAudioLevelMutex.Lock() + ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] + fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct { + }{}) + stub := fake.GetAudioLevelStub + fakeReturns := fake.getAudioLevelReturns + fake.recordInvocation("GetAudioLevel", []interface{}{}) + fake.getAudioLevelMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeParticipant) GetAudioLevelCallCount() int { + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() + return len(fake.getAudioLevelArgsForCall) +} + +func (fake *FakeParticipant) GetAudioLevelCalls(stub func() (float64, bool)) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = stub +} + +func (fake *FakeParticipant) GetAudioLevelReturns(result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + fake.getAudioLevelReturns = struct { + result1 float64 + result2 bool + }{result1, result2} +} + +func (fake *FakeParticipant) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + if fake.getAudioLevelReturnsOnCall == nil { + fake.getAudioLevelReturnsOnCall = make(map[int]struct { + result1 float64 + result2 bool + }) + } + fake.getAudioLevelReturnsOnCall[i] = struct { + result1 float64 + result2 bool + }{result1, result2} +} + func (fake *FakeParticipant) GetPublishedTrack(arg1 livekit.TrackID) types.MediaTrack { fake.getPublishedTrackMutex.Lock() ret, specificReturn := fake.getPublishedTrackReturnsOnCall[len(fake.getPublishedTrackArgsForCall)] @@ -1236,6 +1304,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.closeMutex.RUnlock() fake.debugInfoMutex.RLock() defer fake.debugInfoMutex.RUnlock() + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() fake.getPublishedTrackMutex.RLock() defer fake.getPublishedTrackMutex.RUnlock() fake.getPublishedTracksMutex.RLock() diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 961134c83..1d1dfa6e5 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -418,3 +418,19 @@ func (u *UpTrackManager) DebugInfo() map[string]interface{} { return info } + +func (u *UpTrackManager) GetAudioLevel() (level float64, active bool) { + level = 0 + for _, pt := range u.GetPublishedTracks() { + if pt.Source() == livekit.TrackSource_MICROPHONE { + tl, ta := pt.GetAudioLevel() + if ta { + active = true + if tl > level { + level = tl + } + } + } + } + return +} From ce418dc6b3b56192dbef9523b284bf2c4d64acbd Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 24 Aug 2023 12:50:05 -0700 Subject: [PATCH 6/7] Do not generate a stream key for URL pull ingress (#1993) --- pkg/service/ingress.go | 5 ++++- pkg/service/redisstore.go | 10 +++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 73de7663c..a8fd81d6c 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -139,7 +139,10 @@ func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string urlStr = urlObj.String() } - sk := utils.NewGuid("") + var sk string + if req.InputType != livekit.IngressInput_URL_INPUT { + sk = utils.NewGuid("") + } info := &livekit.IngressInfo{ IngressId: utils.NewGuid(utils.IngressPrefix), diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index e8fe4ecc9..df119a5e5 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -514,7 +514,7 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo) if info.IngressId == "" { return errors.New("Missing IngressId") } - if info.StreamKey == "" { + if info.StreamKey == "" && info.InputType != livekit.IngressInput_URL_INPUT { return errors.New("Missing StreamKey") } @@ -543,7 +543,9 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo) results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error { p.HSet(s.ctx, IngressKey, info.IngressId, data) - p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId) + if info.StreamKey != "" { + p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId) + } if oldRoom != info.RoomName { if oldRoom != "" { @@ -799,7 +801,9 @@ func (s *RedisStore) UpdateIngressState(ctx context.Context, ingressId string, s func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error { tx := s.rc.TxPipeline() tx.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) - tx.HDel(s.ctx, StreamKeyKey, info.IngressId) + if info.StreamKey != "" { + tx.HDel(s.ctx, StreamKeyKey, info.StreamKey) + } tx.HDel(s.ctx, IngressKey, info.IngressId) tx.Del(s.ctx, IngressStatePrefix+info.IngressId) if _, err := tx.Exec(s.ctx); err != nil { From 9d467e07d88df8ff853150df9d911772a205eb24 Mon Sep 17 00:00:00 2001 From: Pingos Date: Fri, 25 Aug 2023 17:14:31 +0800 Subject: [PATCH 7/7] fix bug: p.pendingTracksLock.Unlock() when mid is empty (#1994) --- pkg/rtc/participant.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7ba7e50ea..51aabb7c8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1687,6 +1687,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei ) mid := p.TransportManager.GetPublisherMid(rtpReceiver) if mid == "" { + p.pendingTracksLock.Unlock() p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID()) return nil, false }