mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 05:25:19 +00:00
Merge remote-tracking branch 'origin/master' into raja_1833
This commit is contained in:
@@ -28,7 +28,7 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Docker meta
|
||||
id: meta
|
||||
uses: docker/metadata-action@v4
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
# list of Docker images to use as base name for tags
|
||||
images: |
|
||||
@@ -53,17 +53,17 @@ jobs:
|
||||
args: generate
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v2
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
id: docker_build
|
||||
uses: docker/build-push-action@v4
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
|
||||
@@ -38,7 +38,7 @@ jobs:
|
||||
go-version: '>=1.18'
|
||||
|
||||
- name: Run GoReleaser
|
||||
uses: goreleaser/goreleaser-action@v4
|
||||
uses: goreleaser/goreleaser-action@v5
|
||||
with:
|
||||
distribution: goreleaser
|
||||
version: latest
|
||||
|
||||
@@ -18,7 +18,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f
|
||||
github.com/livekit/protocol v1.7.2
|
||||
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32
|
||||
github.com/livekit/psrpc v0.3.3
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
github.com/magefile/mage v1.15.0
|
||||
|
||||
@@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
|
||||
github.com/livekit/protocol v1.7.2 h1:TPk8rIv5ZZSx1IU5jaGA2W+RdoDlE8dp4CFHE0MKoGo=
|
||||
github.com/livekit/protocol v1.7.2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
|
||||
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 h1:5PdmCpGGXA2hz1pKGgKSJYTjmk3Kkm+kNiW5NOFARCI=
|
||||
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
|
||||
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
|
||||
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
|
||||
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
|
||||
|
||||
@@ -717,7 +717,13 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
|
||||
return nil
|
||||
}
|
||||
|
||||
p.params.Logger.Infow("participant closing", "sendLeave", sendLeave, "reason", reason.String(), "isExpectedToResume", isExpectedToResume)
|
||||
p.params.Logger.Infow(
|
||||
"participant closing",
|
||||
"sendLeave", sendLeave,
|
||||
"reason", reason.String(),
|
||||
"isExpectedToResume", isExpectedToResume,
|
||||
"clientInfo", p.params.ClientInfo.String(),
|
||||
)
|
||||
p.clearDisconnectTimer()
|
||||
p.clearMigrationTimer()
|
||||
|
||||
@@ -824,7 +830,6 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
|
||||
if p.IsClosed() || p.IsDisconnected() {
|
||||
return
|
||||
}
|
||||
// TODO: change to debug once we are confident
|
||||
p.subLogger.Infow("closing subscriber peer connection to aid migration")
|
||||
|
||||
//
|
||||
|
||||
+3
-2
@@ -26,12 +26,13 @@ import (
|
||||
"go.uber.org/atomic"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/pion/sctp"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
sutils "github.com/livekit/livekit-server/pkg/utils"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/utils"
|
||||
"github.com/pion/sctp"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
@@ -1299,7 +1300,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
|
||||
if source != nil && op.ID() == source.ID() {
|
||||
continue
|
||||
}
|
||||
if len(dest) > 0 {
|
||||
if len(dest) > 0 || len(destIdentities) > 0 {
|
||||
found := false
|
||||
for _, dID := range dest {
|
||||
if op.ID() == livekit.ParticipantID(dID) {
|
||||
|
||||
@@ -16,6 +16,7 @@ package buffer
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -120,6 +121,9 @@ type Buffer struct {
|
||||
paused bool
|
||||
frameRateCalculator [DefaultMaxLayerSpatial + 1]FrameRateCalculator
|
||||
frameRateCalculated bool
|
||||
|
||||
packetNotFoundCount atomic.Uint32
|
||||
packetTooOldCount atomic.Uint32
|
||||
}
|
||||
|
||||
// NewBuffer constructs a new Buffer
|
||||
@@ -465,8 +469,13 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
|
||||
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber)
|
||||
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
|
||||
if err != nil {
|
||||
if err != bucket.ErrRTXPacket {
|
||||
b.logger.Warnw("could not add RTP packet to bucket", err)
|
||||
if errors.Is(err, bucket.ErrPacketTooOld) {
|
||||
packetTooOldCount := b.packetTooOldCount.Inc()
|
||||
if packetTooOldCount%20 == 0 {
|
||||
b.logger.Warnw("could not add packet to bucket", err, "count", packetTooOldCount)
|
||||
}
|
||||
} else if err != bucket.ErrRTXPacket {
|
||||
b.logger.Warnw("could not add packet to bucket", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -483,7 +492,10 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
|
||||
func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket {
|
||||
n, err := b.getPacket(buf, ep.Packet.SequenceNumber)
|
||||
if err != nil {
|
||||
b.logger.Warnw("could not get packet", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber())
|
||||
packetNotFoundCount := b.packetNotFoundCount.Inc()
|
||||
if packetNotFoundCount%20 == 0 {
|
||||
b.logger.Warnw("could not get packet from bucket", err, "sn", ep.Packet.SequenceNumber, "headSN", b.bucket.HeadSequenceNumber(), "count", packetNotFoundCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ep.RawPacket = buf[:n]
|
||||
|
||||
@@ -146,7 +146,7 @@ func (r *RTPStatsSender) Update(
|
||||
r.highestTime = packetTime
|
||||
|
||||
r.extStartSN = extSequenceNumber
|
||||
r.extHighestSN = extSequenceNumber
|
||||
r.extHighestSN = extSequenceNumber - 1
|
||||
|
||||
r.extStartTS = extTimestamp
|
||||
r.extHighestTS = extTimestamp
|
||||
@@ -284,7 +284,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
|
||||
extHighestSNFromRR += (1 << 32)
|
||||
}
|
||||
}
|
||||
if extHighestSNFromRR < r.extStartSN {
|
||||
if (extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000)) < r.extStartSN {
|
||||
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
|
||||
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
|
||||
return
|
||||
@@ -343,7 +343,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
|
||||
r.lastRR = rr
|
||||
} else {
|
||||
r.logger.Debugw(
|
||||
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, rr.LastSequenceNumber),
|
||||
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR),
|
||||
"lastRRTime", r.lastRRTime,
|
||||
"lastRR", r.lastRR,
|
||||
"sinceLastRR", time.Since(r.lastRRTime),
|
||||
@@ -605,7 +605,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se
|
||||
maxJitter: r.jitter,
|
||||
maxRtt: r.rtt,
|
||||
},
|
||||
extStartSNFromRR: r.extHighestSNFromRR + 1,
|
||||
extStartSNFromRR: r.extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000) + 1,
|
||||
packetsLostFromRR: r.packetsLostFromRR,
|
||||
maxJitterFromRR: r.jitterFromRR,
|
||||
}
|
||||
|
||||
@@ -299,7 +299,11 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time {
|
||||
if packetsSent > cs.packetsSent {
|
||||
if cs.streamingStartedAt.IsZero() {
|
||||
// the start could be anywhere after last update, but using `at` as this is not required to be accurate
|
||||
cs.streamingStartedAt = at
|
||||
if at.IsZero() {
|
||||
cs.streamingStartedAt = time.Now()
|
||||
} else {
|
||||
cs.streamingStartedAt = at
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cs.streamingStartedAt = time.Time{}
|
||||
|
||||
+36
-10
@@ -181,6 +181,9 @@ type DownTrackStreamAllocatorListener interface {
|
||||
|
||||
// check if track should participate in BWE
|
||||
IsBWEEnabled(dt *DownTrack) bool
|
||||
|
||||
// check if subscription mute can be applied
|
||||
IsSubscribeMutable(dt *DownTrack) bool
|
||||
}
|
||||
|
||||
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
|
||||
@@ -870,7 +873,15 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
|
||||
|
||||
// Mute enables or disables media forwarding - subscriber triggered
|
||||
func (d *DownTrack) Mute(muted bool) {
|
||||
changed := d.forwarder.Mute(muted)
|
||||
d.streamAllocatorLock.RLock()
|
||||
listener := d.streamAllocatorListener
|
||||
d.streamAllocatorLock.RUnlock()
|
||||
|
||||
isSubscribeMutable := true
|
||||
if listener != nil {
|
||||
isSubscribeMutable = listener.IsSubscribeMutable(d)
|
||||
}
|
||||
changed := d.forwarder.Mute(muted, isSubscribeMutable)
|
||||
d.handleMute(muted, changed)
|
||||
}
|
||||
|
||||
@@ -953,9 +964,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
d.bindLock.Lock()
|
||||
d.params.Logger.Debugw("close down track", "flushBlankFrame", flush)
|
||||
if d.bound.Load() {
|
||||
if d.forwarder != nil {
|
||||
d.forwarder.Mute(true)
|
||||
}
|
||||
d.forwarder.Mute(true, true)
|
||||
|
||||
// write blank frames after disabling so that other frames do not interfere.
|
||||
// Idea here is to send blank key frames to flush the decoder buffer at the remote end.
|
||||
// Otherwise, with transceiver re-use last frame from previous stream is held in the
|
||||
@@ -1199,14 +1209,24 @@ func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers b
|
||||
}
|
||||
|
||||
func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
|
||||
transition := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot)
|
||||
d.params.Logger.Debugw("stream: cooperative transition", "transition", transition)
|
||||
transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetCooperativeTransition(allowOvershoot)
|
||||
d.params.Logger.Debugw(
|
||||
"stream: cooperative transition",
|
||||
"transition", transition,
|
||||
"availableLayers", availableLayers,
|
||||
"bitrates", brs,
|
||||
)
|
||||
return transition
|
||||
}
|
||||
|
||||
func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
|
||||
transition := d.forwarder.ProvisionalAllocateGetBestWeightedTransition()
|
||||
d.params.Logger.Debugw("stream: best weighted transition", "transition", transition)
|
||||
transition, availableLayers, brs := d.forwarder.ProvisionalAllocateGetBestWeightedTransition()
|
||||
d.params.Logger.Debugw(
|
||||
"stream: best weighted transition",
|
||||
"transition", transition,
|
||||
"availableLayers", availableLayers,
|
||||
"bitrates", brs,
|
||||
)
|
||||
return transition
|
||||
}
|
||||
|
||||
@@ -1226,9 +1246,15 @@ func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOver
|
||||
}
|
||||
|
||||
func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) {
|
||||
_, brs := d.params.Receiver.GetLayeredBitrate()
|
||||
availableLayers, brs := d.params.Receiver.GetLayeredBitrate()
|
||||
transition, available := d.forwarder.GetNextHigherTransition(brs, allowOvershoot)
|
||||
d.params.Logger.Debugw("stream: get next higher layer", "transition", transition, "available", available, "bitrates", brs)
|
||||
d.params.Logger.Debugw(
|
||||
"stream: get next higher layer",
|
||||
"transition", transition,
|
||||
"available", available,
|
||||
"availableLayers", availableLayers,
|
||||
"bitrates", brs,
|
||||
)
|
||||
return transition, available
|
||||
}
|
||||
|
||||
|
||||
+44
-46
@@ -121,7 +121,7 @@ type VideoAllocationProvisional struct {
|
||||
pubMuted bool
|
||||
maxSeenLayer buffer.VideoLayer
|
||||
availableLayers []int32
|
||||
Bitrates Bitrates
|
||||
bitrates Bitrates
|
||||
maxLayer buffer.VideoLayer
|
||||
currentLayer buffer.VideoLayer
|
||||
allocatedLayer buffer.VideoLayer
|
||||
@@ -373,7 +373,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
|
||||
f.refTSOffset = state.RefTSOffset
|
||||
}
|
||||
|
||||
func (f *Forwarder) Mute(muted bool) bool {
|
||||
func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
@@ -395,7 +395,7 @@ func (f *Forwarder) Mute(muted bool) bool {
|
||||
// The work around here to ignore mute does ignore an intentional mute.
|
||||
// It could result in some bandwidth consumed for stream without visibility in
|
||||
// the case of intentional mute.
|
||||
if muted && f.isDeficientLocked() && f.lastAllocation.PauseReason == VideoPauseReasonBandwidth {
|
||||
if muted && !isSubscribeMutable {
|
||||
f.logger.Debugw("ignoring forwarder mute, paused due to congestion")
|
||||
return false
|
||||
}
|
||||
@@ -723,7 +723,7 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow
|
||||
return f.updateAllocation(alloc, "optimal")
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates Bitrates) {
|
||||
func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, bitrates Bitrates) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
@@ -732,7 +732,7 @@ func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates
|
||||
muted: f.muted,
|
||||
pubMuted: f.pubMuted,
|
||||
maxSeenLayer: f.vls.GetMaxSeen(),
|
||||
Bitrates: Bitrates,
|
||||
bitrates: bitrates,
|
||||
maxLayer: f.vls.GetMax(),
|
||||
currentLayer: f.vls.GetCurrent(),
|
||||
}
|
||||
@@ -760,14 +760,14 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
return false, 0
|
||||
}
|
||||
|
||||
requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal]
|
||||
requiredBitrate := f.provisional.bitrates[layer.Spatial][layer.Temporal]
|
||||
if requiredBitrate == 0 {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
alreadyAllocatedBitrate := int64(0)
|
||||
if f.provisional.allocatedLayer.IsValid() {
|
||||
alreadyAllocatedBitrate = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
alreadyAllocatedBitrate = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
}
|
||||
|
||||
// a layer under maximum fits, take it
|
||||
@@ -792,7 +792,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
|
||||
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) (VideoTransition, []int32, Bitrates) {
|
||||
//
|
||||
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
|
||||
// when channel is congested.
|
||||
@@ -821,8 +821,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
return VideoTransition{
|
||||
From: existingTargetLayer,
|
||||
To: f.provisional.allocatedLayer,
|
||||
BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
BandwidthDelta: -getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
// check if we should preserve current target
|
||||
@@ -832,9 +832,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
maximalBandwidthRequired := int64(0)
|
||||
for s := f.provisional.maxLayer.Spatial; s >= 0; s-- {
|
||||
for t := f.provisional.maxLayer.Temporal; t >= 0; t-- {
|
||||
if f.provisional.Bitrates[s][t] != 0 {
|
||||
if f.provisional.bitrates[s][t] != 0 {
|
||||
maximalLayer = buffer.VideoLayer{Spatial: s, Temporal: t}
|
||||
maximalBandwidthRequired = f.provisional.Bitrates[s][t]
|
||||
maximalBandwidthRequired = f.provisional.bitrates[s][t]
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -845,7 +845,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
}
|
||||
|
||||
if maximalLayer.IsValid() {
|
||||
if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.Bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 {
|
||||
if !existingTargetLayer.GreaterThan(maximalLayer) && f.provisional.bitrates[existingTargetLayer.Spatial][existingTargetLayer.Temporal] != 0 {
|
||||
// currently streaming and maybe wanting an upgrade (existingTargetLayer <= maximalLayer),
|
||||
// just preserve current target in the cooperative scheme of things
|
||||
f.provisional.allocatedLayer = existingTargetLayer
|
||||
@@ -853,7 +853,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
From: existingTargetLayer,
|
||||
To: existingTargetLayer,
|
||||
BandwidthDelta: 0,
|
||||
}
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
if existingTargetLayer.GreaterThan(maximalLayer) {
|
||||
@@ -862,8 +862,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
return VideoTransition{
|
||||
From: existingTargetLayer,
|
||||
To: maximalLayer,
|
||||
BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
BandwidthDelta: maximalBandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -876,9 +876,9 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
bw := int64(0)
|
||||
for s := minSpatial; s <= maxSpatial; s++ {
|
||||
for t := minTemporal; t <= maxTemporal; t++ {
|
||||
if f.provisional.Bitrates[s][t] != 0 {
|
||||
if f.provisional.bitrates[s][t] != 0 {
|
||||
layers = buffer.VideoLayer{Spatial: s, Temporal: t}
|
||||
bw = f.provisional.Bitrates[s][t]
|
||||
bw = f.provisional.bitrates[s][t]
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -915,7 +915,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
if !targetLayer.IsValid() {
|
||||
targetLayer = f.provisional.currentLayer
|
||||
if targetLayer.IsValid() {
|
||||
bandwidthRequired = f.provisional.Bitrates[targetLayer.Spatial][targetLayer.Temporal]
|
||||
bandwidthRequired = f.provisional.bitrates[targetLayer.Spatial][targetLayer.Temporal]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -923,11 +923,11 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
|
||||
return VideoTransition{
|
||||
From: f.vls.GetTarget(),
|
||||
To: targetLayer,
|
||||
BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
|
||||
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() (VideoTransition, []int32, Bitrates) {
|
||||
//
|
||||
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
|
||||
// when channel is congested. This is called on tracks other than the one needing the change. When the track
|
||||
@@ -952,14 +952,14 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
|
||||
return VideoTransition{
|
||||
From: targetLayer,
|
||||
To: f.provisional.allocatedLayer,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
maxReachableLayerTemporal := buffer.InvalidLayerTemporal
|
||||
for t := f.provisional.maxLayer.Temporal; t >= 0; t-- {
|
||||
for s := f.provisional.maxLayer.Spatial; s >= 0; s-- {
|
||||
if f.provisional.Bitrates[s][t] != 0 {
|
||||
if f.provisional.bitrates[s][t] != 0 {
|
||||
maxReachableLayerTemporal = t
|
||||
break
|
||||
}
|
||||
@@ -977,13 +977,13 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
|
||||
return VideoTransition{
|
||||
From: targetLayer,
|
||||
To: f.provisional.allocatedLayer,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested),
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
// starting from minimum to target, find transition which gives the best
|
||||
// transition taking into account bits saved vs cost of such a transition
|
||||
existingBandwidthNeeded := getBandwidthNeeded(f.provisional.Bitrates, targetLayer, f.lastAllocation.BandwidthRequested)
|
||||
existingBandwidthNeeded := getBandwidthNeeded(f.provisional.bitrates, targetLayer, f.lastAllocation.BandwidthRequested)
|
||||
bestLayer := buffer.InvalidLayer
|
||||
bestBandwidthDelta := int64(0)
|
||||
bestValue := float32(0)
|
||||
@@ -993,7 +993,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
|
||||
break
|
||||
}
|
||||
|
||||
bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.Bitrates[s][t])))
|
||||
bandwidthDelta := int64(math.Max(float64(0), float64(existingBandwidthNeeded-f.provisional.bitrates[s][t])))
|
||||
|
||||
transitionCost := int32(0)
|
||||
// SVC-TODO: SVC will need a different cost transition
|
||||
@@ -1020,7 +1020,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
|
||||
From: targetLayer,
|
||||
To: bestLayer,
|
||||
BandwidthDelta: -bestBandwidthDelta,
|
||||
}
|
||||
}, f.provisional.availableLayers, f.provisional.bitrates
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
|
||||
@@ -1031,13 +1031,13 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
|
||||
f.provisional.muted,
|
||||
f.provisional.pubMuted,
|
||||
f.provisional.maxSeenLayer.Spatial,
|
||||
f.provisional.Bitrates,
|
||||
f.provisional.bitrates,
|
||||
f.provisional.maxLayer,
|
||||
)
|
||||
alloc := VideoAllocation{
|
||||
BandwidthRequested: 0,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested),
|
||||
Bitrates: f.provisional.Bitrates,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested),
|
||||
Bitrates: f.provisional.bitrates,
|
||||
BandwidthNeeded: optimalBandwidthNeeded,
|
||||
TargetLayer: f.provisional.allocatedLayer,
|
||||
RequestLayerSpatial: f.provisional.allocatedLayer.Spatial,
|
||||
@@ -1047,7 +1047,7 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
|
||||
f.provisional.pubMuted,
|
||||
f.provisional.maxSeenLayer,
|
||||
f.provisional.availableLayers,
|
||||
f.provisional.Bitrates,
|
||||
f.provisional.bitrates,
|
||||
f.provisional.allocatedLayer,
|
||||
f.provisional.maxLayer,
|
||||
),
|
||||
@@ -1063,8 +1063,8 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
|
||||
case optimalBandwidthNeeded == 0:
|
||||
if f.provisional.allocatedLayer.IsValid() {
|
||||
// overshoot
|
||||
alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested)
|
||||
alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested)
|
||||
} else {
|
||||
alloc.PauseReason = VideoPauseReasonFeedDry
|
||||
|
||||
@@ -1078,16 +1078,16 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
|
||||
|
||||
default:
|
||||
if f.provisional.allocatedLayer.IsValid() {
|
||||
alloc.BandwidthRequested = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
alloc.BandwidthRequested = f.provisional.bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
|
||||
}
|
||||
alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.Bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested)
|
||||
alloc.BandwidthDelta = alloc.BandwidthRequested - getBandwidthNeeded(f.provisional.bitrates, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested)
|
||||
|
||||
if f.provisional.allocatedLayer.GreaterThan(f.provisional.maxLayer) ||
|
||||
alloc.BandwidthRequested >= getOptimalBandwidthNeeded(
|
||||
f.provisional.muted,
|
||||
f.provisional.pubMuted,
|
||||
f.provisional.maxSeenLayer.Spatial,
|
||||
f.provisional.Bitrates,
|
||||
f.provisional.bitrates,
|
||||
f.provisional.maxLayer,
|
||||
) {
|
||||
// could be greater than optimal if overshooting
|
||||
@@ -1308,18 +1308,12 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
existingTargetLayer := f.vls.GetTarget()
|
||||
if !existingTargetLayer.IsValid() {
|
||||
// already paused
|
||||
return f.lastAllocation
|
||||
}
|
||||
|
||||
maxLayer := f.vls.GetMax()
|
||||
maxSeenLayer := f.vls.GetMaxSeen()
|
||||
optimalBandwidthNeeded := getOptimalBandwidthNeeded(f.muted, f.pubMuted, maxSeenLayer.Spatial, brs, maxLayer)
|
||||
alloc := VideoAllocation{
|
||||
BandwidthRequested: 0,
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(brs, existingTargetLayer, f.lastAllocation.BandwidthRequested),
|
||||
BandwidthDelta: 0 - getBandwidthNeeded(brs, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested),
|
||||
Bitrates: brs,
|
||||
BandwidthNeeded: optimalBandwidthNeeded,
|
||||
TargetLayer: buffer.InvalidLayer,
|
||||
@@ -1463,7 +1457,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
|
||||
f.logger.Debugw(
|
||||
"starting forwarding",
|
||||
"sequenceNumber", extPkt.Packet.SequenceNumber,
|
||||
"extSequenceNumber", extPkt.ExtSequenceNumber,
|
||||
"timestamp", extPkt.Packet.Timestamp,
|
||||
"extTimestamp", extPkt.ExtTimestamp,
|
||||
"layer", layer,
|
||||
"referenceLayerSpatial", f.referenceLayerSpatial,
|
||||
)
|
||||
@@ -1473,7 +1469,9 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
|
||||
f.logger.Debugw(
|
||||
"catch up forwarding",
|
||||
"sequenceNumber", extPkt.Packet.SequenceNumber,
|
||||
"extSequenceNumber", extPkt.ExtSequenceNumber,
|
||||
"timestamp", extPkt.Packet.Timestamp,
|
||||
"extTimestamp", extPkt.ExtTimestamp,
|
||||
"layer", layer,
|
||||
"referenceLayerSpatial", f.referenceLayerSpatial,
|
||||
)
|
||||
|
||||
+44
-21
@@ -40,13 +40,19 @@ func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo
|
||||
func TestForwarderMute(t *testing.T) {
|
||||
f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio)
|
||||
require.False(t, f.IsMuted())
|
||||
muted := f.Mute(false)
|
||||
muted := f.Mute(false, true)
|
||||
require.False(t, muted) // no change in mute state
|
||||
require.False(t, f.IsMuted())
|
||||
muted = f.Mute(true)
|
||||
|
||||
muted = f.Mute(true, false)
|
||||
require.False(t, muted)
|
||||
require.False(t, f.IsMuted())
|
||||
|
||||
muted = f.Mute(true, true)
|
||||
require.True(t, muted)
|
||||
require.True(t, f.IsMuted())
|
||||
muted = f.Mute(false)
|
||||
|
||||
muted = f.Mute(false, true)
|
||||
require.True(t, muted)
|
||||
require.False(t, f.IsMuted())
|
||||
}
|
||||
@@ -164,7 +170,7 @@ func TestForwarderAllocateOptimal(t *testing.T) {
|
||||
f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial)
|
||||
|
||||
// muted should not consume any bandwidth
|
||||
f.Mute(true)
|
||||
f.Mute(true, true)
|
||||
disable(f)
|
||||
expectedResult = VideoAllocation{
|
||||
PauseReason: VideoPauseReasonMuted,
|
||||
@@ -180,7 +186,7 @@ func TestForwarderAllocateOptimal(t *testing.T) {
|
||||
require.Equal(t, expectedResult, result)
|
||||
require.Equal(t, expectedResult, f.lastAllocation)
|
||||
|
||||
f.Mute(false)
|
||||
f.Mute(false, true)
|
||||
|
||||
// pub muted should not consume any bandwidth
|
||||
f.PubMute(true)
|
||||
@@ -616,7 +622,7 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) {
|
||||
{9, 10, 11, 12},
|
||||
}
|
||||
|
||||
f.Mute(true)
|
||||
f.Mute(true, true)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
@@ -651,13 +657,14 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
f.SetMaxPublishedLayer(buffer.DefaultMaxLayerSpatial)
|
||||
f.SetMaxTemporalLayerSeen(buffer.DefaultMaxLayerTemporal)
|
||||
|
||||
availableLayers := []int32{0, 1, 2}
|
||||
bitrates := Bitrates{
|
||||
{1, 2, 3, 4},
|
||||
{5, 6, 7, 8},
|
||||
{9, 10, 0, 0},
|
||||
}
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
f.ProvisionalAllocatePrepare(availableLayers, bitrates)
|
||||
|
||||
// from scratch (buffer.InvalidLayer) should give back layer (0, 0)
|
||||
expectedTransition := VideoTransition{
|
||||
@@ -665,8 +672,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: buffer.VideoLayer{Spatial: 0, Temporal: 0},
|
||||
BandwidthDelta: 1,
|
||||
}
|
||||
transition := f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
transition, al, brs := f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
// committing should set target to (0, 0)
|
||||
expectedLayers := buffer.VideoLayer{Spatial: 0, Temporal: 0}
|
||||
@@ -695,8 +704,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: targetLayer,
|
||||
BandwidthDelta: 0,
|
||||
}
|
||||
transition = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
// committing should set target to (2, 1)
|
||||
expectedLayers = buffer.VideoLayer{Spatial: 2, Temporal: 1}
|
||||
@@ -723,14 +734,16 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: buffer.VideoLayer{Spatial: 2, Temporal: 1},
|
||||
BandwidthDelta: 0,
|
||||
}
|
||||
transition = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
f.ProvisionalAllocateCommit()
|
||||
|
||||
// mute
|
||||
f.Mute(true)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
f.Mute(true, true)
|
||||
f.ProvisionalAllocatePrepare(availableLayers, bitrates)
|
||||
|
||||
// mute should send target to buffer.InvalidLayer
|
||||
expectedTransition = VideoTransition{
|
||||
@@ -738,17 +751,20 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: buffer.InvalidLayer,
|
||||
BandwidthDelta: -10,
|
||||
}
|
||||
transition = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(false)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
f.ProvisionalAllocateCommit()
|
||||
|
||||
//
|
||||
// Test allowOvershoot
|
||||
//
|
||||
f.Mute(false)
|
||||
f.Mute(false, true)
|
||||
f.SetMaxSpatialLayer(0)
|
||||
|
||||
availableLayers = []int32{1, 2}
|
||||
bitrates = Bitrates{
|
||||
{0, 0, 0, 0},
|
||||
{5, 6, 7, 8},
|
||||
@@ -756,7 +772,7 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
}
|
||||
|
||||
f.vls.SetTarget(buffer.InvalidLayer)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
f.ProvisionalAllocatePrepare(availableLayers, bitrates)
|
||||
|
||||
// from scratch (buffer.InvalidLayer) should go to a layer past maximum as overshoot is allowed
|
||||
expectedTransition = VideoTransition{
|
||||
@@ -764,8 +780,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: buffer.VideoLayer{Spatial: 1, Temporal: 0},
|
||||
BandwidthDelta: 5,
|
||||
}
|
||||
transition = f.ProvisionalAllocateGetCooperativeTransition(true)
|
||||
transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
// committing should set target to (1, 0)
|
||||
expectedLayers = buffer.VideoLayer{Spatial: 1, Temporal: 0}
|
||||
@@ -804,8 +822,10 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
|
||||
To: buffer.VideoLayer{Spatial: 0, Temporal: 2},
|
||||
BandwidthDelta: -5, // 5 was the bandwidth needed for the last allocation
|
||||
}
|
||||
transition = f.ProvisionalAllocateGetCooperativeTransition(true)
|
||||
transition, al, brs = f.ProvisionalAllocateGetCooperativeTransition(true)
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, []int32{}, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
|
||||
// committing should set target to (0, 2)
|
||||
expectedLayers = buffer.VideoLayer{Spatial: 0, Temporal: 2}
|
||||
@@ -844,13 +864,14 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) {
|
||||
f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial)
|
||||
f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal)
|
||||
|
||||
availableLayers := []int32{0, 1, 2}
|
||||
bitrates := Bitrates{
|
||||
{1, 2, 3, 4},
|
||||
{5, 6, 7, 8},
|
||||
{9, 10, 11, 12},
|
||||
}
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
f.ProvisionalAllocatePrepare(availableLayers, bitrates)
|
||||
|
||||
f.vls.SetTarget(buffer.VideoLayer{Spatial: 2, Temporal: 2})
|
||||
f.lastAllocation.BandwidthRequested = bitrates[2][2]
|
||||
@@ -859,8 +880,10 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) {
|
||||
To: buffer.VideoLayer{Spatial: 2, Temporal: 0},
|
||||
BandwidthDelta: -2,
|
||||
}
|
||||
transition := f.ProvisionalAllocateGetBestWeightedTransition()
|
||||
transition, al, brs := f.ProvisionalAllocateGetBestWeightedTransition()
|
||||
require.Equal(t, expectedTransition, transition)
|
||||
require.Equal(t, availableLayers, al)
|
||||
require.Equal(t, bitrates, brs)
|
||||
}
|
||||
|
||||
func TestForwarderAllocateNextHigher(t *testing.T) {
|
||||
@@ -1142,7 +1165,7 @@ func TestForwarderPauseMute(t *testing.T) {
|
||||
// should have set target at (0, 0)
|
||||
f.ProvisionalAllocateCommit()
|
||||
|
||||
f.Mute(true)
|
||||
f.Mute(true, true)
|
||||
expectedResult := VideoAllocation{
|
||||
PauseReason: VideoPauseReasonMuted,
|
||||
BandwidthRequested: 0,
|
||||
@@ -1161,7 +1184,7 @@ func TestForwarderPauseMute(t *testing.T) {
|
||||
|
||||
func TestForwarderGetTranslationParamsMuted(t *testing.T) {
|
||||
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
|
||||
f.Mute(true)
|
||||
f.Mute(true, true)
|
||||
|
||||
params := &testutils.TestExtPacketParams{
|
||||
SequenceNumber: 23333,
|
||||
|
||||
@@ -132,6 +132,7 @@ func (s *sequencer) push(
|
||||
defer s.Unlock()
|
||||
|
||||
if !s.initialized {
|
||||
s.initialized = true
|
||||
s.extHighestSN = extModifiedSN - 1
|
||||
s.extHighestTS = extModifiedTS
|
||||
s.updateSNOffset()
|
||||
@@ -178,6 +179,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.logger.Debugw("sequencer padding", "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive)
|
||||
if s.snRangeMap == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -515,6 +515,18 @@ func (s *StreamAllocator) IsBWEEnabled(downTrack *sfu.DownTrack) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// called to check if track subscription mute can be applied
|
||||
func (s *StreamAllocator) IsSubscribeMutable(downTrack *sfu.DownTrack) bool {
|
||||
s.videoTracksMu.Lock()
|
||||
defer s.videoTracksMu.Unlock()
|
||||
|
||||
if track := s.videoTracks[livekit.TrackID(downTrack.ID())]; track != nil {
|
||||
return track.IsSubscribeMutable()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) maybePostEventAllocateTrack(downTrack *sfu.DownTrack) {
|
||||
shouldPost := false
|
||||
s.videoTracksMu.Lock()
|
||||
|
||||
@@ -101,6 +101,10 @@ func (t *Track) SetStreamState(streamState StreamState) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *Track) IsSubscribeMutable() bool {
|
||||
return t.streamState != StreamStatePaused
|
||||
}
|
||||
|
||||
func (t *Track) SetPriority(priority uint8) bool {
|
||||
if priority == 0 {
|
||||
switch t.source {
|
||||
|
||||
Reference in New Issue
Block a user