From be4ea444250d6f48a11a8b5cefcd8b05120285ee Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 11 Sep 2023 10:29:09 +0530 Subject: [PATCH 01/11] Log ClientInfo on participant close. (#2057) Want to check client distribution for negotiation failed induced closes. --- pkg/rtc/participant.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d84ed7cb5..2f61882b3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -717,7 +717,13 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea return nil } - p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String(), "isExpectedToResume", isExpectedToResume) + p.params.Logger.Infow( + "participant closing", + "sendLeave", sendLeave, + "reason", reason.String(), + "isExpectedToResume", isExpectedToResume, + "clientInfo", p.params.ClientInfo.String(), + ) p.clearDisconnectTimer() p.clearMigrationTimer() From 696798279eba5aea6b9afbf900638aed9218749a Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 11 Sep 2023 14:42:48 +0800 Subject: [PATCH 02/11] Check destination identites of data message if sid is empty (#2058) --- pkg/rtc/room.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 53c284e87..d04eb5cf8 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -26,12 +26,13 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" + "github.com/pion/sctp" + "github.com/livekit/livekit-server/pkg/sfu/connectionquality" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/pion/sctp" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -1299,7 +1300,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp if source != nil && op.ID() == source.ID() { continue } - if len(dest) > 0 { + if len(dest) > 0 || len(destIdentities) > 0 { found := false for _, dID := range dest { if op.ID() == livekit.ParticipantID(dID) { From f333ce05ed0c24385dde4b84be9e1bddd8b34609 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 11 Sep 2023 20:59:47 +0530 Subject: [PATCH 03/11] fix start (#2060) --- pkg/sfu/buffer/rtpstats_sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 3bcf8cc48..c67f6099d 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -146,7 +146,7 @@ func (r *RTPStatsSender) Update( r.highestTime = packetTime r.extStartSN = extSequenceNumber - r.extHighestSN = extSequenceNumber + r.extHighestSN = extSequenceNumber - 1 r.extStartTS = extTimestamp r.extHighestTS = extTimestamp From 463c62b99a70b55d1d968f28a8fce128911a6fb1 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 11 Sep 2023 22:16:02 +0530 Subject: [PATCH 04/11] Update deps (#2061) --- go.mod | 2 +- go.sum | 4 ++-- pkg/rtc/participant.go | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 105e1e20d..0df7de2d2 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f - github.com/livekit/protocol v1.7.2 + github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 4dfbe1ccd..6119003f1 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.2 h1:TPk8rIv5ZZSx1IU5jaGA2W+RdoDlE8dp4CFHE0MKoGo= -github.com/livekit/protocol v1.7.2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 h1:5PdmCpGGXA2hz1pKGgKSJYTjmk3Kkm+kNiW5NOFARCI= +github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 2f61882b3..334a37086 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -830,7 +830,6 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { if p.IsClosed() || p.IsDisconnected() { return } - // TODO: change to debug once we are confident p.subLogger.Infow("closing subscriber peer connection to aid migration") // From 254a35543d9caf5d5dcea773e4f185e40e01282a Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 12 Sep 2023 08:34:28 +0530 Subject: [PATCH 05/11] Fix down stream stats. (#2063) Need to pass in the correct time. Previously streaming start was determined by another delta snap shot which as removed for efficiency. Did not realise that we were passing in zero time for stats. Also, revert of the change (the part which did not re-pause) from this PR (https://github.com/livekit/livekit/pull/2037). That change affects other paths. The edge it was trying to fix is more rare. Need to think about a way which covers all cases. --- pkg/sfu/connectionquality/connectionstats.go | 2 +- pkg/sfu/forwarder.go | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 5b03cfe21..05daf5f25 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -310,7 +310,7 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { } func (cs *ConnectionStats) getStat() { - score, streams := cs.updateScoreAt(time.Time{}) + score, streams := cs.updateScoreAt(time.Now()) if cs.onStatsUpdate != nil && len(streams) != 0 { analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams)) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8fc170ab2..68fa1f0fd 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1307,18 +1307,12 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation f.lock.Lock() defer f.lock.Unlock() - existingTargetLayer := f.vls.GetTarget() - if !existingTargetLayer.IsValid() { - // already paused - return f.lastAllocation - } - maxLayer := f.vls.GetMax() maxSeenLayer := f.vls.GetMaxSeen() optimalBandwidthNeeded := getOptimalBandwidthNeeded(f.muted, f.pubMuted, maxSeenLayer.Spatial, brs, maxLayer) alloc := VideoAllocation{ BandwidthRequested: 0, - BandwidthDelta: 0 - getBandwidthNeeded(brs, existingTargetLayer, f.lastAllocation.BandwidthRequested), + BandwidthDelta: 0 - getBandwidthNeeded(brs, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), Bitrates: brs, BandwidthNeeded: optimalBandwidthNeeded, TargetLayer: buffer.InvalidLayer, From 5f701ece348dd228f63782312e1845ebd58180f2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 12 Sep 2023 14:36:25 +0530 Subject: [PATCH 06/11] Include top bits from start in highest sequence number from RR. (#2064) Streaming could start after 16-bits has rolled over. So, have to add that base back to what is received in receiver report. Otherwise, it looks like there are not packets received in window leading to poor quality. --- pkg/sfu/buffer/rtpstats_sender.go | 6 +++--- pkg/sfu/forwarder.go | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index c67f6099d..b735b1988 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -284,7 +284,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt extHighestSNFromRR += (1 << 32) } } - if extHighestSNFromRR < r.extStartSN { + if (extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000)) < r.extStartSN { // it is possible that the `LastSequenceNumber` in the receiver report is before the starting // sequence number when dummy packets are used to trigger Pion's OnTrack path. return @@ -343,7 +343,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt r.lastRR = rr } else { r.logger.Debugw( - fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, rr.LastSequenceNumber), + fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), "lastRRTime", r.lastRRTime, "lastRR", r.lastRR, "sinceLastRR", time.Since(r.lastRRTime), @@ -605,7 +605,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se maxJitter: r.jitter, maxRtt: r.rtt, }, - extStartSNFromRR: r.extHighestSNFromRR + 1, + extStartSNFromRR: r.extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000) + 1, packetsLostFromRR: r.packetsLostFromRR, maxJitterFromRR: r.jitterFromRR, } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 68fa1f0fd..6e2805d09 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1456,7 +1456,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.logger.Debugw( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, + "extSequenceNumber", extPkt.ExtSequenceNumber, "timestamp", extPkt.Packet.Timestamp, + "extTimestamp", extPkt.ExtTimestamp, "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) @@ -1466,7 +1468,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.logger.Debugw( "catch up forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, + "extSequenceNumber", extPkt.ExtSequenceNumber, "timestamp", extPkt.Packet.Timestamp, + "extTimestamp", extPkt.ExtTimestamp, "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) From 2cf751d2615d7816f626fed660549c272194ecfd Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 13 Sep 2023 01:38:34 +0530 Subject: [PATCH 07/11] Use timer in scorer lock scope. (#2066) Using time from outside make anachronous samples in expected distance/bit rate measurement. So, have to let the time be snap shotted in scorer lock scope. --- pkg/sfu/connectionquality/connectionstats.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 05daf5f25..1a977c650 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -299,7 +299,11 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { if packetsSent > cs.packetsSent { if cs.streamingStartedAt.IsZero() { // the start could be anywhere after last update, but using `at` as this is not required to be accurate - cs.streamingStartedAt = at + if at.IsZero() { + cs.streamingStartedAt = time.Now() + } else { + cs.streamingStartedAt = at + } } } else { cs.streamingStartedAt = time.Time{} @@ -310,7 +314,7 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { } func (cs *ConnectionStats) getStat() { - score, streams := cs.updateScoreAt(time.Now()) + score, streams := cs.updateScoreAt(time.Time{}) if cs.onStatsUpdate != nil && len(streams) != 0 { analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams)) From 1a21dcb4f206eba037066feb4a1c1863760b5627 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 12 Sep 2023 21:51:42 -0700 Subject: [PATCH 08/11] Update github workflows (#2062) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/docker.yaml | 8 ++++---- .github/workflows/release.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 198a4ecf8..58574e162 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -28,7 +28,7 @@ jobs: - uses: actions/checkout@v4 - name: Docker meta id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: # list of Docker images to use as base name for tags images: | @@ -53,17 +53,17 @@ jobs: args: generate - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push id: docker_build - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: true diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 4da1bfecc..cd3b523fc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -38,7 +38,7 @@ jobs: go-version: '>=1.18' - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v4 + uses: goreleaser/goreleaser-action@v5 with: distribution: goreleaser version: latest From 3f9f3adf91a2b296a5934970ca493754c8639314 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 13 Sep 2023 12:53:16 +0530 Subject: [PATCH 09/11] Fix init. (#2067) Stupid me. --- pkg/sfu/sequencer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 775c8d42a..7cd56395f 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -132,6 +132,7 @@ func (s *sequencer) push( defer s.Unlock() if !s.initialized { + s.initialized = true s.extHighestSN = extModifiedSN - 1 s.extHighestTS = extModifiedTS s.updateSNOffset() @@ -178,6 +179,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui s.Lock() defer s.Unlock() + s.logger.Debugw("sequencer padding", "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive) if s.snRangeMap == nil { return } From a55c50f61d80f55eb3a8d57c8e0cc0f9e18ac146 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 13 Sep 2023 15:52:10 +0530 Subject: [PATCH 10/11] Throttle packet errors/warns a bit. (#2068) * Throttle packet errors/warns a bit. In very bad network conditions, it is possible that packets arrive out-of-order, have choppy behaviour. Use some counters and temper logs. * slight change in comment --- pkg/sfu/buffer/buffer.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index d0d009f2e..f27e449a2 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -16,6 +16,7 @@ package buffer import ( "encoding/binary" + "errors" "io" "strings" "sync" @@ -120,6 +121,9 @@ type Buffer struct { paused bool frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator frameRateCalculated bool + + packetNotFoundCount atomic.Uint32 + packetTooOldCount atomic.Uint32 } // NewBuffer constructs a new Buffer @@ -465,8 +469,13 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { - if err != bucket.ErrRTXPacket { - b.logger.Warnw("could not add RTP packet to bucket", err) + if errors.Is(err, bucket.ErrPacketTooOld) { + packetTooOldCount := b.packetTooOldCount.Inc() + if packetTooOldCount%20 == 0 { + b.logger.Warnw("could not add packet to bucket", err, "count", packetTooOldCount) + } + } else if err != bucket.ErrRTXPacket { + b.logger.Warnw("could not add packet to bucket", err) } return } @@ -483,7 +492,10 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { n, err := b.getPacket(buf, ep.Packet.SequenceNumber) if err != nil { - b.logger.Warnw("could not get packet", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber()) + packetNotFoundCount := b.packetNotFoundCount.Inc() + if packetNotFoundCount%20 == 0 { + b.logger.Warnw("could not get packet from bucket", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber(), "count", packetNotFoundCount) + } return nil } ep.RawPacket = buf[:n] From 68aebb0106311d3ce71e0731b7c638224dd79364 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 13 Sep 2023 19:59:24 +0530 Subject: [PATCH 11/11] Do not mute when stream is paused. (#2069) --- pkg/sfu/downtrack.go | 46 ++++++++++--- pkg/sfu/forwarder.go | 78 +++++++++++----------- pkg/sfu/forwarder_test.go | 65 ++++++++++++------ pkg/sfu/streamallocator/streamallocator.go | 12 ++++ pkg/sfu/streamallocator/track.go | 4 ++ 5 files changed, 135 insertions(+), 70 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index fb445f222..0ddb62c19 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -181,6 +181,9 @@ type DownTrackStreamAllocatorListener interface { // check if track should participate in BWE IsBWEEnabled(dt *DownTrack) bool + + // check if subscription mute can be applied + IsSubscribeMutable(dt *DownTrack) bool } type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) @@ -830,7 +833,15 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa // Mute enables or disables media forwarding - subscriber triggered func (d *DownTrack) Mute(muted bool) { - changed := d.forwarder.Mute(muted) + d.streamAllocatorLock.RLock() + listener := d.streamAllocatorListener + d.streamAllocatorLock.RUnlock() + + isSubscribeMutable := true + if listener != nil { + isSubscribeMutable = listener.IsSubscribeMutable(d) + } + changed := d.forwarder.Mute(muted, isSubscribeMutable) d.handleMute(muted, changed) } @@ -913,9 +924,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { - if d.forwarder != nil { - d.forwarder.Mute(true) - } + d.forwarder.Mute(true, true) + // write blank frames after disabling so that other frames do not interfere. // Idea here is to send blank key frames to flush the decoder buffer at the remote end. // Otherwise, with transceiver re-use last frame from previous stream is held in the @@ -1159,14 +1169,24 @@ func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers b } func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { - transition := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot) - d.params.Logger.Debugw("stream: cooperative transition", "transition", transition) + transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot) + d.params.Logger.Debugw( + "stream: cooperative transition", + "transition", transition, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition } func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { - transition := d.forwarder.ProvisionalAllocateGetBestWeightedTransition() - d.params.Logger.Debugw("stream: best weighted transition", "transition", transition) + transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetBestWeightedTransition() + d.params.Logger.Debugw( + "stream: best weighted transition", + "transition", transition, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition } @@ -1186,9 +1206,15 @@ func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOver } func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) { - _, brs := d.params.Receiver.GetLayeredBitrate() + availableLayers, brs := d.params.Receiver.GetLayeredBitrate() transition, available := d.forwarder.GetNextHigherTransition(brs, allowOvershoot) - d.params.Logger.Debugw("stream: get next higher layer", "transition", transition, "available", available, "bitrates", brs) + d.params.Logger.Debugw( + "stream: get next higher layer", + "transition", transition, + "available", available, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition, available } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 6e2805d09..f0c37a3c2 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -121,7 +121,7 @@ type VideoAllocationProvisional struct { pubMuted bool maxSeenLayer buffer.VideoLayer availableLayers []int32 - Bitrates Bitrates + bitrates Bitrates maxLayer buffer.VideoLayer currentLayer buffer.VideoLayer allocatedLayer buffer.VideoLayer @@ -372,7 +372,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.refTSOffset = state.RefTSOffset } -func (f *Forwarder) Mute(muted bool) bool { +func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool { f.lock.Lock() defer f.lock.Unlock() @@ -394,7 +394,7 @@ func (f *Forwarder) Mute(muted bool) bool { // The work around here to ignore mute does ignore an intentional mute. // It could result in some bandwidth consumed for stream without visibility in // the case of intentional mute. - if muted && f.isDeficientLocked() && f.lastAllocation.PauseReason == VideoPauseReasonBandwidth { + if muted && !isSubscribeMutable { f.logger.Debugw("ignoring forwarder mute, paused due to congestion") return false } @@ -722,7 +722,7 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow return f.updateAllocation(alloc, "optimal") } -func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates Bitrates) { +func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, bitrates Bitrates) { f.lock.Lock() defer f.lock.Unlock() @@ -731,7 +731,7 @@ func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates muted: f.muted, pubMuted: f.pubMuted, maxSeenLayer: f.vls.GetMaxSeen(), - Bitrates: Bitrates, + bitrates: bitrates, maxLayer: f.vls.GetMax(), currentLayer: f.vls.GetCurrent(), } @@ -759,14 +759,14 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu return false, 0 } - requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal] + requiredBitrate := f.provisional.bitrates[layer.Spatial][layer.Temporal] if requiredBitrate == 0 { return false, 0 } alreadyAllocatedBitrate := int64(0) if f.provisional.allocatedLayer.IsValid() { - alreadyAllocatedBitrate = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alreadyAllocatedBitrate = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] } // a layer under maximum fits, take it @@ -791,7 +791,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu return false, 0 } -func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { +func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) (VideoTransition, []int32, Bitrates) { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. @@ -820,8 +820,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: existingTargetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: -getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } // check if we should preserve current target @@ -831,9 +831,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b maximalBandwidthRequired := int64(0) for s := f.provisional.maxLayer.Spatial; s >= 0; s-- { for t := f.provisional.maxLayer.Temporal; t >= 0; t-- { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { maximalLayer = buffer.VideoLayer{Spatial: s, Temporal: t} - maximalBandwidthRequired = f.provisional.Bitrates[s][t] + maximalBandwidthRequired = f.provisional.bitrates[s][t] break } } @@ -844,7 +844,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b } if maximalLayer.IsValid() { - if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.Bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 { + if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 { // currently streaming and maybe wanting an upgrade (existingTargetLayer <= maximalLayer), // just preserve current target in the cooperative scheme of things f.provisional.allocatedLayer = existingTargetLayer @@ -852,7 +852,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b From: existingTargetLayer, To: existingTargetLayer, BandwidthDelta: 0, - } + }, f.provisional.availableLayers, f.provisional.bitrates } if existingTargetLayer.GreaterThan(maximalLayer) { @@ -861,8 +861,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: existingTargetLayer, To: maximalLayer, - BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } } } @@ -875,9 +875,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b bw := int64(0) for s := minSpatial; s <= maxSpatial; s++ { for t := minTemporal; t <= maxTemporal; t++ { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { layers = buffer.VideoLayer{Spatial: s, Temporal: t} - bw = f.provisional.Bitrates[s][t] + bw = f.provisional.bitrates[s][t] break } } @@ -914,7 +914,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b if !targetLayer.IsValid() { targetLayer = f.provisional.currentLayer if targetLayer.IsValid() { - bandwidthRequired = f.provisional.Bitrates[targetLayer.Spatial][targetLayer.Temporal] + bandwidthRequired = f.provisional.bitrates[targetLayer.Spatial][targetLayer.Temporal] } } @@ -922,11 +922,11 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: f.vls.GetTarget(), To: targetLayer, - BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } -func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { +func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() (VideoTransition, []int32, Bitrates) { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. This is called on tracks other than the one needing the change. When the track @@ -951,14 +951,14 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti return VideoTransition{ From: targetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } maxReachableLayerTemporal := buffer.InvalidLayerTemporal for t := f.provisional.maxLayer.Temporal; t >= 0; t-- { for s := f.provisional.maxLayer.Spatial; s >= 0; s-- { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { maxReachableLayerTemporal = t break } @@ -976,13 +976,13 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti return VideoTransition{ From: targetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } // starting from minimum to target, find transition which gives the best // transition taking into account bits saved vs cost of such a transition - existingBandwidthNeeded := getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested) + existingBandwidthNeeded := getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested) bestLayer := buffer.InvalidLayer bestBandwidthDelta := int64(0) bestValue := float32(0) @@ -992,7 +992,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti break } - bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.Bitrates[s][t]))) + bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.bitrates[s][t]))) transitionCost := int32(0) // SVC-TODO: SVC will need a different cost transition @@ -1019,7 +1019,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti From: targetLayer, To: bestLayer, BandwidthDelta: -bestBandwidthDelta, - } + }, f.provisional.availableLayers, f.provisional.bitrates } func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { @@ -1030,13 +1030,13 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.provisional.muted, f.provisional.pubMuted, f.provisional.maxSeenLayer.Spatial, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.maxLayer, ) alloc := VideoAllocation{ BandwidthRequested: 0, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), - Bitrates: f.provisional.Bitrates, + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), + Bitrates: f.provisional.bitrates, BandwidthNeeded: optimalBandwidthNeeded, TargetLayer: f.provisional.allocatedLayer, RequestLayerSpatial: f.provisional.allocatedLayer.Spatial, @@ -1046,7 +1046,7 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.provisional.pubMuted, f.provisional.maxSeenLayer, f.provisional.availableLayers, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.allocatedLayer, f.provisional.maxLayer, ), @@ -1062,8 +1062,8 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { case optimalBandwidthNeeded == 0: if f.provisional.allocatedLayer.IsValid() { // overshoot - alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] - alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) + alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) } else { alloc.PauseReason = VideoPauseReasonFeedDry @@ -1077,16 +1077,16 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { default: if f.provisional.allocatedLayer.IsValid() { - alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] } - alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) + alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) if f.provisional.allocatedLayer.GreaterThan(f.provisional.maxLayer) || alloc.BandwidthRequested >= getOptimalBandwidthNeeded( f.provisional.muted, f.provisional.pubMuted, f.provisional.maxSeenLayer.Spatial, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.maxLayer, ) { // could be greater than optimal if overshooting diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index abd58144a..f621fd587 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -40,13 +40,19 @@ func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo func TestForwarderMute(t *testing.T) { f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) require.False(t, f.IsMuted()) - muted := f.Mute(false) + muted := f.Mute(false, true) require.False(t, muted) // no change in mute state require.False(t, f.IsMuted()) - muted = f.Mute(true) + + muted = f.Mute(true, false) + require.False(t, muted) + require.False(t, f.IsMuted()) + + muted = f.Mute(true, true) require.True(t, muted) require.True(t, f.IsMuted()) - muted = f.Mute(false) + + muted = f.Mute(false, true) require.True(t, muted) require.False(t, f.IsMuted()) } @@ -164,7 +170,7 @@ func TestForwarderAllocateOptimal(t *testing.T) { f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial) // muted should not consume any bandwidth - f.Mute(true) + f.Mute(true, true) disable(f) expectedResult = VideoAllocation{ PauseReason: VideoPauseReasonMuted, @@ -180,7 +186,7 @@ func TestForwarderAllocateOptimal(t *testing.T) { require.Equal(t, expectedResult, result) require.Equal(t, expectedResult, f.lastAllocation) - f.Mute(false) + f.Mute(false, true) // pub muted should not consume any bandwidth f.PubMute(true) @@ -616,7 +622,7 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) { {9, 10, 11, 12}, } - f.Mute(true) + f.Mute(true, true) f.ProvisionalAllocatePrepare(nil, bitrates) isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) @@ -651,13 +657,14 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial) f.SetMaxTemporalLayerSeen(buffer.DefaultMaxLayerTemporal) + availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ {1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 0, 0}, } - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // from scratch (buffer.InvalidLayer) should give back layer (0, 0) expectedTransition := VideoTransition{ @@ -665,8 +672,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 0, Temporal: 0}, BandwidthDelta: 1, } - transition := f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs := f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (0, 0) expectedLayers := buffer.VideoLayer{Spatial: 0, Temporal: 0} @@ -695,8 +704,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: targetLayer, BandwidthDelta: 0, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (2, 1) expectedLayers = buffer.VideoLayer{Spatial: 2, Temporal: 1} @@ -723,14 +734,16 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 2, Temporal: 1}, BandwidthDelta: 0, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) f.ProvisionalAllocateCommit() // mute - f.Mute(true) - f.ProvisionalAllocatePrepare(nil, bitrates) + f.Mute(true, true) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // mute should send target to buffer.InvalidLayer expectedTransition = VideoTransition{ @@ -738,17 +751,20 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.InvalidLayer, BandwidthDelta: -10, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) f.ProvisionalAllocateCommit() // // Test allowOvershoot // - f.Mute(false) + f.Mute(false, true) f.SetMaxSpatialLayer(0) + availableLayers = []int32{1, 2} bitrates = Bitrates{ {0, 0, 0, 0}, {5, 6, 7, 8}, @@ -756,7 +772,7 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { } f.vls.SetTarget(buffer.InvalidLayer) - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // from scratch (buffer.InvalidLayer) should go to a layer past maximum as overshoot is allowed expectedTransition = VideoTransition{ @@ -764,8 +780,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 1, Temporal: 0}, BandwidthDelta: 5, } - transition = f.ProvisionalAllocateGetCooperativeTransition(true) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (1, 0) expectedLayers = buffer.VideoLayer{Spatial: 1, Temporal: 0} @@ -804,8 +822,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 0, Temporal: 2}, BandwidthDelta: -5, // 5 was the bandwidth needed for the last allocation } - transition = f.ProvisionalAllocateGetCooperativeTransition(true) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true) require.Equal(t, expectedTransition, transition) + require.Equal(t, []int32{}, al) + require.Equal(t, bitrates, brs) // committing should set target to (0, 2) expectedLayers = buffer.VideoLayer{Spatial: 0, Temporal: 2} @@ -844,13 +864,14 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial) f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal) + availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ {1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}, } - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) f.vls.SetTarget(buffer.VideoLayer{Spatial: 2, Temporal: 2}) f.lastAllocation.BandwidthRequested = bitrates[2][2] @@ -859,8 +880,10 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 2, Temporal: 0}, BandwidthDelta: -2, } - transition := f.ProvisionalAllocateGetBestWeightedTransition() + transition, al, brs := f.ProvisionalAllocateGetBestWeightedTransition() require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) } func TestForwarderAllocateNextHigher(t *testing.T) { @@ -1142,7 +1165,7 @@ func TestForwarderPauseMute(t *testing.T) { // should have set target at (0, 0) f.ProvisionalAllocateCommit() - f.Mute(true) + f.Mute(true, true) expectedResult := VideoAllocation{ PauseReason: VideoPauseReasonMuted, BandwidthRequested: 0, @@ -1161,7 +1184,7 @@ func TestForwarderPauseMute(t *testing.T) { func TestForwarderGetTranslationParamsMuted(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) - f.Mute(true) + f.Mute(true, true) params := &testutils.TestExtPacketParams{ SequenceNumber: 23333, diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index e188d54c3..0c2d7d52b 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -515,6 +515,18 @@ func (s *StreamAllocator) IsBWEEnabled(downTrack *sfu.DownTrack) bool { return true } +// called to check if track subscription mute can be applied +func (s *StreamAllocator) IsSubscribeMutable(downTrack *sfu.DownTrack) bool { + s.videoTracksMu.Lock() + defer s.videoTracksMu.Unlock() + + if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil { + return track.IsSubscribeMutable() + } + + return true +} + func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) { shouldPost := false s.videoTracksMu.Lock() diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index 528792928..be93b9e4b 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -101,6 +101,10 @@ func (t *Track) SetStreamState(streamState StreamState) bool { return true } +func (t *Track) IsSubscribeMutable() bool { + return t.streamState != StreamStatePaused +} + func (t *Track) SetPriority(priority uint8) bool { if priority == 0 { switch t.source {