Merge remote-tracking branch 'origin/master' into raja_fr

This commit is contained in:
boks1971
2023-08-18 10:30:59 +05:30
16 changed files with 135 additions and 114 deletions
+2 -2
View File
@@ -17,7 +17,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c
github.com/livekit/protocol v1.6.0
github.com/livekit/psrpc v0.3.2
github.com/mackerelio/go-osstat v0.2.4
@@ -45,7 +45,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
github.com/urfave/negroni/v3 v3.0.0
go.uber.org/atomic v1.11.0
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
golang.org/x/sync v0.3.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
+4 -4
View File
@@ -122,8 +122,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A=
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c h1:4udPqCusH93MK/7q8ZfDqcLJHGoQeKKsMi5b+/BpQvk=
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/protocol v1.6.0 h1:19S+vFZqnivKIOpyR3DEK/mSaykQ3UEf7H2G/mBOE54=
github.com/livekit/protocol v1.6.0/go.mod h1:SUS9foM1xBzw/AFrgTJuFX/oSuwlnIbHmpdiPdCvwEM=
github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew=
@@ -293,8 +293,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+1 -1
View File
@@ -520,7 +520,7 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID
}
func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, willBeResumed bool) {
t.params.Logger.Infow("removing all subscribers for mime", "mime", mime)
t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime)
for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) {
t.RemoveSubscriber(subscriberID, willBeResumed)
}
+1 -2
View File
@@ -58,8 +58,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e
return err
}
// update state after to sending message, so that no participant updates could slip through before JoinResponse is
// sent
// update state after sending message, so that no participant updates could slip through before JoinResponse is sent
p.updateLock.Lock()
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
+10 -3
View File
@@ -740,9 +740,7 @@ func (r *Room) sendRoomUpdate() {
roomInfo := r.ToProto()
// Send update to participants
for _, p := range r.GetParticipants() {
// new participants receive the update as part of JoinResponse
// skip inactive participants
if p.State() != livekit.ParticipantInfo_ACTIVE {
if !p.IsReady() {
continue
}
@@ -1071,6 +1069,15 @@ func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bo
return nil
}
}
} else {
ep := r.GetParticipant(identity)
if ep != nil {
epi := ep.ToProto()
if epi.JoinedAt > pi.JoinedAt {
// older session update, newer session has already become active, so nothing to do
return nil
}
}
}
if shouldSend {
+15 -41
View File
@@ -19,6 +19,7 @@ import (
"errors"
"github.com/livekit/protocol/logger"
"github.com/pion/rtp/codecs"
)
var (
@@ -310,55 +311,28 @@ func IsH264KeyFrame(payload []byte) bool {
// -------------------------------------
// IsVP9KeyFrame detects if vp9 payload is a keyframe
// taken from https://github.com/jech/galene/blob/master/codecs/codecs.go
// all credits belongs to Juliusz Chroboczek @jech and the awesome Galene SFU
func IsVP9KeyFrame(payload []byte) bool {
payloadLen := len(payload)
if payloadLen < 1 {
var vp9 codecs.VP9Packet
_, err := vp9.Unmarshal(payload)
if err != nil || len(vp9.Payload) < 1 {
return false
}
if !vp9.B {
return false
}
idx := 0
I := payload[idx]&0x80 > 0
P := payload[idx]&0x40 > 0
L := payload[idx]&0x20 > 0
F := payload[idx]&0x10 > 0
B := payload[idx]&0x08 > 0
if F && !I {
if (vp9.Payload[0] & 0xc0) != 0x80 {
return false
}
// Check for PictureID
if I {
idx++
if payloadLen < idx+1 {
return false
}
// Check if m is 1, then Picture ID is 15 bits
if payload[idx]&0x80 > 0 {
idx++
if payloadLen < idx+1 {
return false
}
}
profile := (vp9.Payload[0] >> 4) & 0x3
if profile != 3 {
return (vp9.Payload[0] & 0xC) == 0
}
// Check if TL0PICIDX is present
sid := -1
if L {
idx++
if payloadLen < idx+1 {
return false
}
tid := (payload[idx] >> 5) & 0x7
if !P && tid != 0 {
return false
}
sid = int((payload[idx] >> 1) & 0x7)
}
return !P && (!L || (L && sid == 0)) && B
return (vp9.Payload[0] & 0x6) == 0
}
// -------------------------------------
+47 -13
View File
@@ -424,7 +424,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
// adjust start to account for out-of-order packets before a cycle completes
if !r.maybeAdjustStartSN(rtph, pktSize, hdrSize, payloadSize) {
if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) {
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += hdrSize
@@ -497,7 +497,7 @@ func (r *RTPStats) ResyncOnNextPacket() {
r.resyncOnNextPacket = true
}
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool {
func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool {
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
return false
}
@@ -507,17 +507,37 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize
}
r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1
beforeAdjust := r.extStartSN
snBeforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
if r.extStartSN > snBeforeAdjust {
// wrapping back
r.cycles++
}
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
if s.extStartSN == snBeforeAdjust {
s.extStartSN = r.extStartSN
}
}
tsBeforeAdjust := r.extStartTS
r.extStartTS = uint64(rtph.Timestamp)
if r.extStartTS > tsBeforeAdjust {
// wrapping back
r.tsCycles++
}
r.logger.Infow(
"adjusting starting sequence number",
"snBefore", snBeforeAdjust,
"snAfter", r.extStartSN,
"snCyles", r.cycles,
"tsBefore", tsBeforeAdjust,
"tsAfter", r.extStartTS,
"tsCyles", r.tsCycles,
)
return true
}
@@ -771,11 +791,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
defer r.lock.Unlock()
if srData != nil {
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
r.maybeAdjustFirstPacketTime(srData.RTPTimestamp)
}
}
func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) {
func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
@@ -786,16 +806,27 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second))
samplesDiff := int32(ts - uint32(r.extStartTS))
if samplesDiff < 0 {
// out-of-order, skip
return
}
samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second))
firstTime := time.Now().Add(-samplesDuration)
if firstTime.Before(r.firstTime) {
r.logger.Infow(
r.logger.Debugw(
"adjusting first packet time",
"before", r.firstTime.String(),
"after", firstTime.String(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime))
r.logger.Infow("first packet time adjustment too big, ignoring",
"adjustment", r.firstTime.Sub(firstTime),
"before", r.firstTime.String(),
"after", firstTime.String(),
"ts", ts,
"extStartTS", r.extStartTS,
)
} else {
r.firstTime = firstTime
}
@@ -814,8 +845,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received anachronous sender report",
"current", srData.NTPTimestamp.Time(),
"last", r.srNewest.NTPTimestamp.Time(),
"currentNTP", srData.NTPTimestamp.Time().String(),
"currentRTP", srData.RTPTimestamp,
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
"lastRTP", r.srNewest.RTPTimestamp,
)
return
}
@@ -831,7 +864,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
@@ -952,7 +985,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExtUsingRate)
nowRTP = uint32(nowRTPExt)
}
}
@@ -979,6 +1012,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
)
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
}
// monitor and log RTP timestamp anomalies
+3
View File
@@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/livekit/protocol/logger"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@@ -38,6 +39,7 @@ func TestRTPStats(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
totalDuration := 5 * time.Second
@@ -79,6 +81,7 @@ func TestRTPStats_Update(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
+5 -3
View File
@@ -409,9 +409,11 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
// Bind is called under RTPSender.mu lock, call the RTPSender.GetParameters in goroutine to avoid deadlock
go func() {
if tr := d.transceiver.Load(); tr != nil {
extensions := tr.Sender().GetParameters().HeaderExtensions
d.params.Logger.Debugw("negotiated downtrack extensions", "extensions", extensions)
d.SetRTPHeaderExtensions(extensions)
if sender := tr.Sender(); sender != nil {
extensions := sender.GetParameters().HeaderExtensions
d.params.Logger.Debugw("negotiated downtrack extensions", "extensions", extensions)
d.SetRTPHeaderExtensions(extensions)
}
}
}()
+16 -15
View File
@@ -309,6 +309,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
f.vls = videolayerselector.NewVP9(f.logger)
}
}
// SVC-TODO: Support for VP9 simulcast. When DD is not available, have to pick selector based on VP9 SVC or Simulcast
case "video/av1":
// DD-TODO : we only enable dd layer selector for av1/vp9 now, in the future we can enable it for vp8 too
if f.vls != nil {
@@ -316,6 +317,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
} else {
f.vls = videolayerselector.NewDependencyDescriptor(f.logger)
}
// SVC-TODO: Support for AV1 Simulcast or just single spatial layer - won't have DD in that case
}
}
@@ -1220,7 +1222,10 @@ func (f *Forwarder) GetNextHigherTransition(brs Bitrates, allowOvershoot bool) (
for s := minSpatial; s <= maxSpatial; s++ {
for t := minTemporal; t <= maxTemporal; t++ {
bandwidthRequested := brs[s][t]
if bandwidthRequested == 0 {
// traverse till finding a layer requiring more bits.
// NOTE: it possible that higher temporal layer of lower spatial layer
// could use more bits than lower temporal layer of higher spatial layer.
if bandwidthRequested == 0 || bandwidthRequested < alreadyAllocated {
continue
}
@@ -1330,11 +1335,7 @@ func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) Video
alloc.PauseReason != f.lastAllocation.PauseReason ||
alloc.TargetLayer != f.lastAllocation.TargetLayer ||
alloc.RequestLayerSpatial != f.lastAllocation.RequestLayerSpatial {
if reason == "optimal" {
f.logger.Debugw(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc)
} else {
f.logger.Infow(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc)
}
f.logger.Debugw(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc)
}
f.lastAllocation = alloc
@@ -1429,7 +1430,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.referenceLayerSpatial = layer
f.rtpMunger.SetLastSnTs(extPkt)
f.codecMunger.SetLast(extPkt)
f.logger.Infow(
f.logger.Debugw(
"starting forwarding",
"sequenceNumber", extPkt.Packet.SequenceNumber,
"timestamp", extPkt.Packet.Timestamp,
@@ -1440,7 +1441,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
}
logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) {
f.logger.Infow(
f.logger.Debugw(
message,
"layer", layer,
"expectedTS", expectedTS,
@@ -1465,7 +1466,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, refTS and expectedTS should be very close and lastTS should be before both of those.
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
lastTS := f.rtpMunger.GetLast().LastTS
rtpMungerState := f.rtpMunger.GetLast()
lastTS := rtpMungerState.LastTS
refTS := lastTS
expectedTS := lastTS
switchingAt := time.Now()
@@ -1563,19 +1565,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
nextTS = expectedTS
} else {
nextTS = refTS
}
nextTS = refTS
}
}
if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) {
f.logger.Infow("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS)
f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS)
// nominal increase
nextTS = lastTS + 1
}
f.logger.Infow(
f.logger.Debugw(
"next timestamp on switch",
"switchingAt", switchingAt.String(),
"layer", layer,
@@ -1585,7 +1585,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"referenceLayerSpatial", f.referenceLayerSpatial,
"expectedTS", expectedTS,
"nextTS", nextTS,
"jump", nextTS-lastTS,
"tsJump", nextTS-lastTS,
"nextSN", rtpMungerState.LastSN+1,
)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS)
+11 -4
View File
@@ -132,6 +132,10 @@ type WebRTCReceiver struct {
redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32)
}
// SVC-TODO: Have to use more conditions to differentiate between
// SVC-TODO: SVC and non-SVC (could be single layer or simulcast).
// SVC-TODO: May only need to differentiate between simulcast and non-simulcast
// SVC-TODO: i. e. may be possible to treat single layer as SVC to get proper/intended functionality.
func IsSvcCodec(mime string) bool {
switch strings.ToLower(mime) {
case "video/av1":
@@ -232,10 +236,13 @@ func NewWebRTCReceiver(
})
w.connectionStats.Start(w.trackInfo)
for _, ext := range receiver.GetParameters().HeaderExtensions {
if ext.URI == dd.ExtensionURI {
w.streamTrackerManager.AddDependencyDescriptorTrackers()
break
// SVC-TODO: Handle DD for non-SVC cases???
if w.isSVC {
for _, ext := range receiver.GetParameters().HeaderExtensions {
if ext.URI == dd.ExtensionURI {
w.streamTrackerManager.AddDependencyDescriptorTrackers()
break
}
}
}
+1 -1
View File
@@ -114,7 +114,7 @@ func (r *RedReceiver) Close() {
func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
// red encoding don't support nack
return 0, bucket.ErrPacketNotFound
return 0, bucket.ErrPacketMismatch
}
func (r *RedReceiver) encodeRedForPrimary(pkt *rtp.Packet, redPayload []byte) (int, error) {
+8 -22
View File
@@ -852,24 +852,19 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
//
// For both cases, do
// a. Find cooperative transition from track that needs allocation.
// b. If track is currently streaming at minimum, do not do anything.
// c. If track is giving back bits, apply the transition and use bits given
// b. If track is giving back bits, apply the transition and use bits given
// back to boost any deficient track(s).
//
// If track needs more bits, i.e. upward transition (may need resume or higher layer subscription),
// a. Try to allocate using existing headroom. This can be tried to get the best
// possible fit for the available headroom.
// b. If there is not enough headroom to allocate anything, ask for best offer from
// other tracks that are currently streaming and try to use it.
// other tracks that are currently streaming and try to use it. This is done only if the
// track needing change is not currently streaming, i. e. it has to be resumed.
//
track.ProvisionalAllocatePrepare()
transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient)
// track is currently streaming at minimum
if transition.BandwidthDelta == 0 {
return
}
// downgrade, giving back bits
if transition.From.GreaterThan(transition.To) {
allocation := track.ProvisionalAllocateCommit()
@@ -928,6 +923,11 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
}
// track is currently streaming at minimum
if transition.BandwidthDelta == 0 {
return
}
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
bandwidthAcquired := int64(0)
var contributingTracks []*Track
@@ -1212,20 +1212,6 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver {
func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) {
expectedBandwidthUsage := s.getExpectedBandwidthUsage()
if float64(expectedBandwidthUsage) > 1.5*float64(s.committedChannelCapacity) {
// STREAM-ALLOCATOR-TODO-START
// Should probably skip probing if the expected usage is much higher than committed channel capacity.
// But, give that bandwidth estimate is volatile at times and can drop down to small values,
// not probing means streaming stuck in a well for long.
// Observe this and figure out if there is a threshold from practical use cases that can be used to
// skip probing safely
// STREAM-ALLOCATOR-TODO-END
s.params.Logger.Warnw(
"stream allocator: starting probe alarm",
fmt.Errorf("expected too high, expected: %d, committed: %d", expectedBandwidthUsage, s.committedChannelCapacity),
)
}
probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage)
channelState := ""
+9 -1
View File
@@ -571,7 +571,15 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) {
// use minimal offset to indicate value availability in the extremely unlikely case of
// both layers using the same timestamp
if offset == 0 {
s.logger.Infow("using default offset", "ref", ref, "other", other)
s.logger.Debugw(
"using default offset",
"ref", ref,
"refNTP", srRef.NTPTimestamp.Time().String(),
"refRTP", srRef.RTPTimestamp,
"other", other,
"otherNTP", srOther.NTPTimestamp.Time().String(),
"otherRTP", srOther.RTPTimestamp,
)
offset = 1
}
+1 -1
View File
@@ -113,7 +113,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
// re-adjust start if necessary. The conditions are
// 1. Not seen more than half the range yet
// 1. wrap around compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
// 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
// 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space
cycles := w.cycles
totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1
+1 -1
View File
@@ -126,7 +126,7 @@ func (b *Base) Select(_extPkt *buffer.ExtPacket, _layer int32) (result VideoLaye
}
func (b *Base) Rollback() {
b.logger.Infow(
b.logger.Debugw(
"rolling back",
"previous", b.previousLayer,
"current", b.currentLayer,