diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 198a4ecf8..58574e162 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -28,7 +28,7 @@ jobs: - uses: actions/checkout@v4 - name: Docker meta id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: # list of Docker images to use as base name for tags images: | @@ -53,17 +53,17 @@ jobs: args: generate - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push id: docker_build - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: true diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 4da1bfecc..cd3b523fc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -38,7 +38,7 @@ jobs: go-version: '>=1.18' - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v4 + uses: goreleaser/goreleaser-action@v5 with: distribution: goreleaser version: latest diff --git a/go.mod b/go.mod index 105e1e20d..0df7de2d2 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f - github.com/livekit/protocol v1.7.2 + github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 4dfbe1ccd..6119003f1 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.2 h1:TPk8rIv5ZZSx1IU5jaGA2W+RdoDlE8dp4CFHE0MKoGo= -github.com/livekit/protocol v1.7.2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 h1:5PdmCpGGXA2hz1pKGgKSJYTjmk3Kkm+kNiW5NOFARCI= +github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index d84ed7cb5..334a37086 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -717,7 +717,13 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea return nil } - p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String(), "isExpectedToResume", isExpectedToResume) + p.params.Logger.Infow( + "participant closing", + "sendLeave", sendLeave, + "reason", reason.String(), + "isExpectedToResume", isExpectedToResume, + "clientInfo", p.params.ClientInfo.String(), + ) p.clearDisconnectTimer() p.clearMigrationTimer() @@ -824,7 +830,6 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { if p.IsClosed() || p.IsDisconnected() { return } - // TODO: change to debug once we are confident p.subLogger.Infow("closing subscriber peer connection to aid migration") // diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 53c284e87..d04eb5cf8 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -26,12 +26,13 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" + "github.com/pion/sctp" + "github.com/livekit/livekit-server/pkg/sfu/connectionquality" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/pion/sctp" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -1299,7 +1300,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp if source != nil && op.ID() == source.ID() { continue } - if len(dest) > 0 { + if len(dest) > 0 || len(destIdentities) > 0 { found := false for _, dID := range dest { if op.ID() == livekit.ParticipantID(dID) { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index d0d009f2e..f27e449a2 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -16,6 +16,7 @@ package buffer import ( "encoding/binary" + "errors" "io" "strings" "sync" @@ -120,6 +121,9 @@ type Buffer struct { paused bool frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator frameRateCalculated bool + + packetNotFoundCount atomic.Uint32 + packetTooOldCount atomic.Uint32 } // NewBuffer constructs a new Buffer @@ -465,8 +469,13 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { - if err != bucket.ErrRTXPacket { - b.logger.Warnw("could not add RTP packet to bucket", err) + if errors.Is(err, bucket.ErrPacketTooOld) { + packetTooOldCount := b.packetTooOldCount.Inc() + if packetTooOldCount%20 == 0 { + b.logger.Warnw("could not add packet to bucket", err, "count", packetTooOldCount) + } + } else if err != bucket.ErrRTXPacket { + b.logger.Warnw("could not add packet to bucket", err) } return } @@ -483,7 +492,10 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { n, err := b.getPacket(buf, ep.Packet.SequenceNumber) if err != nil { - b.logger.Warnw("could not get packet", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber()) + packetNotFoundCount := b.packetNotFoundCount.Inc() + if packetNotFoundCount%20 == 0 { + b.logger.Warnw("could not get packet from bucket", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber(), "count", packetNotFoundCount) + } return nil } ep.RawPacket = buf[:n] diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 3bcf8cc48..b735b1988 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -146,7 +146,7 @@ func (r *RTPStatsSender) Update( r.highestTime = packetTime r.extStartSN = extSequenceNumber - r.extHighestSN = extSequenceNumber + r.extHighestSN = extSequenceNumber - 1 r.extStartTS = extTimestamp r.extHighestTS = extTimestamp @@ -284,7 +284,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt extHighestSNFromRR += (1 << 32) } } - if extHighestSNFromRR < r.extStartSN { + if (extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000)) < r.extStartSN { // it is possible that the `LastSequenceNumber` in the receiver report is before the starting // sequence number when dummy packets are used to trigger Pion's OnTrack path. return @@ -343,7 +343,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt r.lastRR = rr } else { r.logger.Debugw( - fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, rr.LastSequenceNumber), + fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), "lastRRTime", r.lastRRTime, "lastRR", r.lastRR, "sinceLastRR", time.Since(r.lastRRTime), @@ -605,7 +605,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se maxJitter: r.jitter, maxRtt: r.rtt, }, - extStartSNFromRR: r.extHighestSNFromRR + 1, + extStartSNFromRR: r.extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000) + 1, packetsLostFromRR: r.packetsLostFromRR, maxJitterFromRR: r.jitterFromRR, } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 5b03cfe21..1a977c650 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -299,7 +299,11 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { if packetsSent > cs.packetsSent { if cs.streamingStartedAt.IsZero() { // the start could be anywhere after last update, but using `at` as this is not required to be accurate - cs.streamingStartedAt = at + if at.IsZero() { + cs.streamingStartedAt = time.Now() + } else { + cs.streamingStartedAt = at + } } } else { cs.streamingStartedAt = time.Time{} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 26e179179..1b93951ce 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -181,6 +181,9 @@ type DownTrackStreamAllocatorListener interface { // check if track should participate in BWE IsBWEEnabled(dt *DownTrack) bool + + // check if subscription mute can be applied + IsSubscribeMutable(dt *DownTrack) bool } type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) @@ -870,7 +873,15 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa // Mute enables or disables media forwarding - subscriber triggered func (d *DownTrack) Mute(muted bool) { - changed := d.forwarder.Mute(muted) + d.streamAllocatorLock.RLock() + listener := d.streamAllocatorListener + d.streamAllocatorLock.RUnlock() + + isSubscribeMutable := true + if listener != nil { + isSubscribeMutable = listener.IsSubscribeMutable(d) + } + changed := d.forwarder.Mute(muted, isSubscribeMutable) d.handleMute(muted, changed) } @@ -953,9 +964,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { - if d.forwarder != nil { - d.forwarder.Mute(true) - } + d.forwarder.Mute(true, true) + // write blank frames after disabling so that other frames do not interfere. // Idea here is to send blank key frames to flush the decoder buffer at the remote end. // Otherwise, with transceiver re-use last frame from previous stream is held in the @@ -1199,14 +1209,24 @@ func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers b } func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { - transition := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot) - d.params.Logger.Debugw("stream: cooperative transition", "transition", transition) + transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot) + d.params.Logger.Debugw( + "stream: cooperative transition", + "transition", transition, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition } func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { - transition := d.forwarder.ProvisionalAllocateGetBestWeightedTransition() - d.params.Logger.Debugw("stream: best weighted transition", "transition", transition) + transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetBestWeightedTransition() + d.params.Logger.Debugw( + "stream: best weighted transition", + "transition", transition, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition } @@ -1226,9 +1246,15 @@ func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOver } func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) { - _, brs := d.params.Receiver.GetLayeredBitrate() + availableLayers, brs := d.params.Receiver.GetLayeredBitrate() transition, available := d.forwarder.GetNextHigherTransition(brs, allowOvershoot) - d.params.Logger.Debugw("stream: get next higher layer", "transition", transition, "available", available, "bitrates", brs) + d.params.Logger.Debugw( + "stream: get next higher layer", + "transition", transition, + "available", available, + "availableLayers", availableLayers, + "bitrates", brs, + ) return transition, available } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 2f950410c..de737c8fc 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -121,7 +121,7 @@ type VideoAllocationProvisional struct { pubMuted bool maxSeenLayer buffer.VideoLayer availableLayers []int32 - Bitrates Bitrates + bitrates Bitrates maxLayer buffer.VideoLayer currentLayer buffer.VideoLayer allocatedLayer buffer.VideoLayer @@ -373,7 +373,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.refTSOffset = state.RefTSOffset } -func (f *Forwarder) Mute(muted bool) bool { +func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool { f.lock.Lock() defer f.lock.Unlock() @@ -395,7 +395,7 @@ func (f *Forwarder) Mute(muted bool) bool { // The work around here to ignore mute does ignore an intentional mute. // It could result in some bandwidth consumed for stream without visibility in // the case of intentional mute. - if muted && f.isDeficientLocked() && f.lastAllocation.PauseReason == VideoPauseReasonBandwidth { + if muted && !isSubscribeMutable { f.logger.Debugw("ignoring forwarder mute, paused due to congestion") return false } @@ -723,7 +723,7 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow return f.updateAllocation(alloc, "optimal") } -func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates Bitrates) { +func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, bitrates Bitrates) { f.lock.Lock() defer f.lock.Unlock() @@ -732,7 +732,7 @@ func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates muted: f.muted, pubMuted: f.pubMuted, maxSeenLayer: f.vls.GetMaxSeen(), - Bitrates: Bitrates, + bitrates: bitrates, maxLayer: f.vls.GetMax(), currentLayer: f.vls.GetCurrent(), } @@ -760,14 +760,14 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu return false, 0 } - requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal] + requiredBitrate := f.provisional.bitrates[layer.Spatial][layer.Temporal] if requiredBitrate == 0 { return false, 0 } alreadyAllocatedBitrate := int64(0) if f.provisional.allocatedLayer.IsValid() { - alreadyAllocatedBitrate = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alreadyAllocatedBitrate = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] } // a layer under maximum fits, take it @@ -792,7 +792,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu return false, 0 } -func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { +func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) (VideoTransition, []int32, Bitrates) { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. @@ -821,8 +821,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: existingTargetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: -getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } // check if we should preserve current target @@ -832,9 +832,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b maximalBandwidthRequired := int64(0) for s := f.provisional.maxLayer.Spatial; s >= 0; s-- { for t := f.provisional.maxLayer.Temporal; t >= 0; t-- { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { maximalLayer = buffer.VideoLayer{Spatial: s, Temporal: t} - maximalBandwidthRequired = f.provisional.Bitrates[s][t] + maximalBandwidthRequired = f.provisional.bitrates[s][t] break } } @@ -845,7 +845,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b } if maximalLayer.IsValid() { - if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.Bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 { + if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 { // currently streaming and maybe wanting an upgrade (existingTargetLayer <= maximalLayer), // just preserve current target in the cooperative scheme of things f.provisional.allocatedLayer = existingTargetLayer @@ -853,7 +853,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b From: existingTargetLayer, To: existingTargetLayer, BandwidthDelta: 0, - } + }, f.provisional.availableLayers, f.provisional.bitrates } if existingTargetLayer.GreaterThan(maximalLayer) { @@ -862,8 +862,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: existingTargetLayer, To: maximalLayer, - BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } } } @@ -876,9 +876,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b bw := int64(0) for s := minSpatial; s <= maxSpatial; s++ { for t := minTemporal; t <= maxTemporal; t++ { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { layers = buffer.VideoLayer{Spatial: s, Temporal: t} - bw = f.provisional.Bitrates[s][t] + bw = f.provisional.bitrates[s][t] break } } @@ -915,7 +915,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b if !targetLayer.IsValid() { targetLayer = f.provisional.currentLayer if targetLayer.IsValid() { - bandwidthRequired = f.provisional.Bitrates[targetLayer.Spatial][targetLayer.Temporal] + bandwidthRequired = f.provisional.bitrates[targetLayer.Spatial][targetLayer.Temporal] } } @@ -923,11 +923,11 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b return VideoTransition{ From: f.vls.GetTarget(), To: targetLayer, - BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } -func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { +func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() (VideoTransition, []int32, Bitrates) { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. This is called on tracks other than the one needing the change. When the track @@ -952,14 +952,14 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti return VideoTransition{ From: targetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } maxReachableLayerTemporal := buffer.InvalidLayerTemporal for t := f.provisional.maxLayer.Temporal; t >= 0; t-- { for s := f.provisional.maxLayer.Spatial; s >= 0; s-- { - if f.provisional.Bitrates[s][t] != 0 { + if f.provisional.bitrates[s][t] != 0 { maxReachableLayerTemporal = t break } @@ -977,13 +977,13 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti return VideoTransition{ From: targetLayer, To: f.provisional.allocatedLayer, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested), - } + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested), + }, f.provisional.availableLayers, f.provisional.bitrates } // starting from minimum to target, find transition which gives the best // transition taking into account bits saved vs cost of such a transition - existingBandwidthNeeded := getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested) + existingBandwidthNeeded := getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested) bestLayer := buffer.InvalidLayer bestBandwidthDelta := int64(0) bestValue := float32(0) @@ -993,7 +993,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti break } - bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.Bitrates[s][t]))) + bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.bitrates[s][t]))) transitionCost := int32(0) // SVC-TODO: SVC will need a different cost transition @@ -1020,7 +1020,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti From: targetLayer, To: bestLayer, BandwidthDelta: -bestBandwidthDelta, - } + }, f.provisional.availableLayers, f.provisional.bitrates } func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { @@ -1031,13 +1031,13 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.provisional.muted, f.provisional.pubMuted, f.provisional.maxSeenLayer.Spatial, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.maxLayer, ) alloc := VideoAllocation{ BandwidthRequested: 0, - BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), - Bitrates: f.provisional.Bitrates, + BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), + Bitrates: f.provisional.bitrates, BandwidthNeeded: optimalBandwidthNeeded, TargetLayer: f.provisional.allocatedLayer, RequestLayerSpatial: f.provisional.allocatedLayer.Spatial, @@ -1047,7 +1047,7 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.provisional.pubMuted, f.provisional.maxSeenLayer, f.provisional.availableLayers, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.allocatedLayer, f.provisional.maxLayer, ), @@ -1063,8 +1063,8 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { case optimalBandwidthNeeded == 0: if f.provisional.allocatedLayer.IsValid() { // overshoot - alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] - alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) + alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) } else { alloc.PauseReason = VideoPauseReasonFeedDry @@ -1078,16 +1078,16 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { default: if f.provisional.allocatedLayer.IsValid() { - alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] + alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] } - alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) + alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested) if f.provisional.allocatedLayer.GreaterThan(f.provisional.maxLayer) || alloc.BandwidthRequested >= getOptimalBandwidthNeeded( f.provisional.muted, f.provisional.pubMuted, f.provisional.maxSeenLayer.Spatial, - f.provisional.Bitrates, + f.provisional.bitrates, f.provisional.maxLayer, ) { // could be greater than optimal if overshooting @@ -1308,18 +1308,12 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation f.lock.Lock() defer f.lock.Unlock() - existingTargetLayer := f.vls.GetTarget() - if !existingTargetLayer.IsValid() { - // already paused - return f.lastAllocation - } - maxLayer := f.vls.GetMax() maxSeenLayer := f.vls.GetMaxSeen() optimalBandwidthNeeded := getOptimalBandwidthNeeded(f.muted, f.pubMuted, maxSeenLayer.Spatial, brs, maxLayer) alloc := VideoAllocation{ BandwidthRequested: 0, - BandwidthDelta: 0 - getBandwidthNeeded(brs, existingTargetLayer, f.lastAllocation.BandwidthRequested), + BandwidthDelta: 0 - getBandwidthNeeded(brs, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), Bitrates: brs, BandwidthNeeded: optimalBandwidthNeeded, TargetLayer: buffer.InvalidLayer, @@ -1463,7 +1457,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( f.logger.Debugw( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, + "extSequenceNumber", extPkt.ExtSequenceNumber, "timestamp", extPkt.Packet.Timestamp, + "extTimestamp", extPkt.ExtTimestamp, "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) @@ -1473,7 +1469,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( f.logger.Debugw( "catch up forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, + "extSequenceNumber", extPkt.ExtSequenceNumber, "timestamp", extPkt.Packet.Timestamp, + "extTimestamp", extPkt.ExtTimestamp, "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index abd58144a..f621fd587 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -40,13 +40,19 @@ func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo func TestForwarderMute(t *testing.T) { f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) require.False(t, f.IsMuted()) - muted := f.Mute(false) + muted := f.Mute(false, true) require.False(t, muted) // no change in mute state require.False(t, f.IsMuted()) - muted = f.Mute(true) + + muted = f.Mute(true, false) + require.False(t, muted) + require.False(t, f.IsMuted()) + + muted = f.Mute(true, true) require.True(t, muted) require.True(t, f.IsMuted()) - muted = f.Mute(false) + + muted = f.Mute(false, true) require.True(t, muted) require.False(t, f.IsMuted()) } @@ -164,7 +170,7 @@ func TestForwarderAllocateOptimal(t *testing.T) { f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial) // muted should not consume any bandwidth - f.Mute(true) + f.Mute(true, true) disable(f) expectedResult = VideoAllocation{ PauseReason: VideoPauseReasonMuted, @@ -180,7 +186,7 @@ func TestForwarderAllocateOptimal(t *testing.T) { require.Equal(t, expectedResult, result) require.Equal(t, expectedResult, f.lastAllocation) - f.Mute(false) + f.Mute(false, true) // pub muted should not consume any bandwidth f.PubMute(true) @@ -616,7 +622,7 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) { {9, 10, 11, 12}, } - f.Mute(true) + f.Mute(true, true) f.ProvisionalAllocatePrepare(nil, bitrates) isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) @@ -651,13 +657,14 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial) f.SetMaxTemporalLayerSeen(buffer.DefaultMaxLayerTemporal) + availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ {1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 0, 0}, } - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // from scratch (buffer.InvalidLayer) should give back layer (0, 0) expectedTransition := VideoTransition{ @@ -665,8 +672,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 0, Temporal: 0}, BandwidthDelta: 1, } - transition := f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs := f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (0, 0) expectedLayers := buffer.VideoLayer{Spatial: 0, Temporal: 0} @@ -695,8 +704,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: targetLayer, BandwidthDelta: 0, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (2, 1) expectedLayers = buffer.VideoLayer{Spatial: 2, Temporal: 1} @@ -723,14 +734,16 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 2, Temporal: 1}, BandwidthDelta: 0, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) f.ProvisionalAllocateCommit() // mute - f.Mute(true) - f.ProvisionalAllocatePrepare(nil, bitrates) + f.Mute(true, true) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // mute should send target to buffer.InvalidLayer expectedTransition = VideoTransition{ @@ -738,17 +751,20 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.InvalidLayer, BandwidthDelta: -10, } - transition = f.ProvisionalAllocateGetCooperativeTransition(false) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) f.ProvisionalAllocateCommit() // // Test allowOvershoot // - f.Mute(false) + f.Mute(false, true) f.SetMaxSpatialLayer(0) + availableLayers = []int32{1, 2} bitrates = Bitrates{ {0, 0, 0, 0}, {5, 6, 7, 8}, @@ -756,7 +772,7 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { } f.vls.SetTarget(buffer.InvalidLayer) - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) // from scratch (buffer.InvalidLayer) should go to a layer past maximum as overshoot is allowed expectedTransition = VideoTransition{ @@ -764,8 +780,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 1, Temporal: 0}, BandwidthDelta: 5, } - transition = f.ProvisionalAllocateGetCooperativeTransition(true) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true) require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) // committing should set target to (1, 0) expectedLayers = buffer.VideoLayer{Spatial: 1, Temporal: 0} @@ -804,8 +822,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 0, Temporal: 2}, BandwidthDelta: -5, // 5 was the bandwidth needed for the last allocation } - transition = f.ProvisionalAllocateGetCooperativeTransition(true) + transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true) require.Equal(t, expectedTransition, transition) + require.Equal(t, []int32{}, al) + require.Equal(t, bitrates, brs) // committing should set target to (0, 2) expectedLayers = buffer.VideoLayer{Spatial: 0, Temporal: 2} @@ -844,13 +864,14 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial) f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal) + availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ {1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}, } - f.ProvisionalAllocatePrepare(nil, bitrates) + f.ProvisionalAllocatePrepare(availableLayers, bitrates) f.vls.SetTarget(buffer.VideoLayer{Spatial: 2, Temporal: 2}) f.lastAllocation.BandwidthRequested = bitrates[2][2] @@ -859,8 +880,10 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { To: buffer.VideoLayer{Spatial: 2, Temporal: 0}, BandwidthDelta: -2, } - transition := f.ProvisionalAllocateGetBestWeightedTransition() + transition, al, brs := f.ProvisionalAllocateGetBestWeightedTransition() require.Equal(t, expectedTransition, transition) + require.Equal(t, availableLayers, al) + require.Equal(t, bitrates, brs) } func TestForwarderAllocateNextHigher(t *testing.T) { @@ -1142,7 +1165,7 @@ func TestForwarderPauseMute(t *testing.T) { // should have set target at (0, 0) f.ProvisionalAllocateCommit() - f.Mute(true) + f.Mute(true, true) expectedResult := VideoAllocation{ PauseReason: VideoPauseReasonMuted, BandwidthRequested: 0, @@ -1161,7 +1184,7 @@ func TestForwarderPauseMute(t *testing.T) { func TestForwarderGetTranslationParamsMuted(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) - f.Mute(true) + f.Mute(true, true) params := &testutils.TestExtPacketParams{ SequenceNumber: 23333, diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 775c8d42a..7cd56395f 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -132,6 +132,7 @@ func (s *sequencer) push( defer s.Unlock() if !s.initialized { + s.initialized = true s.extHighestSN = extModifiedSN - 1 s.extHighestTS = extModifiedTS s.updateSNOffset() @@ -178,6 +179,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui s.Lock() defer s.Unlock() + s.logger.Debugw("sequencer padding", "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive) if s.snRangeMap == nil { return } diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index e188d54c3..0c2d7d52b 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -515,6 +515,18 @@ func (s *StreamAllocator) IsBWEEnabled(downTrack *sfu.DownTrack) bool { return true } +// called to check if track subscription mute can be applied +func (s *StreamAllocator) IsSubscribeMutable(downTrack *sfu.DownTrack) bool { + s.videoTracksMu.Lock() + defer s.videoTracksMu.Unlock() + + if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil { + return track.IsSubscribeMutable() + } + + return true +} + func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) { shouldPost := false s.videoTracksMu.Lock() diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index 528792928..be93b9e4b 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -101,6 +101,10 @@ func (t *Track) SetStreamState(streamState StreamState) bool { return true } +func (t *Track) IsSubscribeMutable() bool { + return t.streamState != StreamStatePaused +} + func (t *Track) SetPriority(priority uint8) bool { if priority == 0 { switch t.source {