diff --git a/go.mod b/go.mod index e17bd663d..b6db038c2 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a + github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c github.com/livekit/protocol v1.6.0 github.com/livekit/psrpc v0.3.2 github.com/mackerelio/go-osstat v0.2.4 @@ -45,7 +45,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20230810033253-352e893a4cad + golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb golang.org/x/sync v0.3.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 9adab1ff4..ef0468a16 100644 --- a/go.sum +++ b/go.sum @@ -122,8 +122,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A= -github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c h1:4udPqCusH93MK/7q8ZfDqcLJHGoQeKKsMi5b+/BpQvk= +github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= github.com/livekit/protocol v1.6.0 h1:19S+vFZqnivKIOpyR3DEK/mSaykQ3UEf7H2G/mBOE54= github.com/livekit/protocol v1.6.0/go.mod h1:SUS9foM1xBzw/AFrgTJuFX/oSuwlnIbHmpdiPdCvwEM= github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= @@ -293,8 +293,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= -golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= -golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA= +golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 3f82ce69d..bdb64e0c4 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -520,7 +520,7 @@ func (t *MediaTrackReceiver) RemoveSubscriber(subscriberID livekit.ParticipantID } func (t *MediaTrackReceiver) removeAllSubscribersForMime(mime string, willBeResumed bool) { - t.params.Logger.Infow("removing all subscribers for mime", "mime", mime) + t.params.Logger.Debugw("removing all subscribers for mime", "mime", mime) for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribersForMime(mime) { t.RemoveSubscriber(subscriberID, willBeResumed) } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index a9ae4dfab..e634fa573 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -58,8 +58,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e return err } - // update state after to sending message, so that no participant updates could slip through before JoinResponse is - // sent + // update state after sending message, so that no participant updates could slip through before JoinResponse is sent p.updateLock.Lock() if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index f153d27ee..f614f8979 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -740,9 +740,7 @@ func (r *Room) sendRoomUpdate() { roomInfo := r.ToProto() // Send update to participants for _, p := range r.GetParticipants() { - // new participants receive the update as part of JoinResponse - // skip inactive participants - if p.State() != livekit.ParticipantInfo_ACTIVE { + if !p.IsReady() { continue } @@ -1071,6 +1069,15 @@ func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bo return nil } } + } else { + ep := r.GetParticipant(identity) + if ep != nil { + epi := ep.ToProto() + if epi.JoinedAt > pi.JoinedAt { + // older session update, newer session has already become active, so nothing to do + return nil + } + } } if shouldSend { diff --git a/pkg/sfu/buffer/helpers.go b/pkg/sfu/buffer/helpers.go index f52464b4c..274b73e47 100644 --- a/pkg/sfu/buffer/helpers.go +++ b/pkg/sfu/buffer/helpers.go @@ -19,6 +19,7 @@ import ( "errors" "github.com/livekit/protocol/logger" + "github.com/pion/rtp/codecs" ) var ( @@ -310,55 +311,28 @@ func IsH264KeyFrame(payload []byte) bool { // ------------------------------------- +// IsVP9KeyFrame detects if vp9 payload is a keyframe +// taken from https://github.com/jech/galene/blob/master/codecs/codecs.go +// all credits belongs to Juliusz Chroboczek @jech and the awesome Galene SFU func IsVP9KeyFrame(payload []byte) bool { - payloadLen := len(payload) - if payloadLen < 1 { + var vp9 codecs.VP9Packet + _, err := vp9.Unmarshal(payload) + if err != nil || len(vp9.Payload) < 1 { + return false + } + if !vp9.B { return false } - idx := 0 - I := payload[idx]&0x80 > 0 - P := payload[idx]&0x40 > 0 - L := payload[idx]&0x20 > 0 - F := payload[idx]&0x10 > 0 - B := payload[idx]&0x08 > 0 - - if F && !I { + if (vp9.Payload[0] & 0xc0) != 0x80 { return false } - // Check for PictureID - if I { - idx++ - if payloadLen < idx+1 { - return false - } - // Check if m is 1, then Picture ID is 15 bits - if payload[idx]&0x80 > 0 { - idx++ - if payloadLen < idx+1 { - return false - } - } + profile := (vp9.Payload[0] >> 4) & 0x3 + if profile != 3 { + return (vp9.Payload[0] & 0xC) == 0 } - - // Check if TL0PICIDX is present - sid := -1 - if L { - idx++ - if payloadLen < idx+1 { - return false - } - - tid := (payload[idx] >> 5) & 0x7 - if !P && tid != 0 { - return false - } - - sid = int((payload[idx] >> 1) & 0x7) - } - - return !P && (!L || (L && sid == 0)) && B + return (vp9.Payload[0] & 0x6) == 0 } // ------------------------------------- diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 0eed03660..8f2b54f7d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -424,7 +424,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - if !r.maybeAdjustStartSN(rtph, pktSize, hdrSize, payloadSize) { + if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) { if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize r.headerBytesDuplicate += hdrSize @@ -497,7 +497,7 @@ func (r *RTPStats) ResyncOnNextPacket() { r.resyncOnNextPacket = true } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { +func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return false } @@ -507,17 +507,37 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, pktSize uint64, hdrSize } r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1 - beforeAdjust := r.extStartSN + snBeforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) + if r.extStartSN > snBeforeAdjust { + // wrapping back + r.cycles++ + } r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true) for _, s := range r.snapshots { - if s.extStartSN == beforeAdjust { + if s.extStartSN == snBeforeAdjust { s.extStartSN = r.extStartSN } } + tsBeforeAdjust := r.extStartTS + r.extStartTS = uint64(rtph.Timestamp) + if r.extStartTS > tsBeforeAdjust { + // wrapping back + r.tsCycles++ + } + r.logger.Infow( + "adjusting starting sequence number", + "snBefore", snBeforeAdjust, + "snAfter", r.extStartSN, + "snCyles", r.cycles, + "tsBefore", tsBeforeAdjust, + "tsAfter", r.extStartTS, + "tsCyles", r.tsCycles, + ) + return true } @@ -771,11 +791,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { defer r.lock.Unlock() if srData != nil { - r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) + r.maybeAdjustFirstPacketTime(srData.RTPTimestamp) } } -func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) { +func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { if time.Since(r.startTime) > firstPacketTimeAdjustWindow { return } @@ -786,16 +806,27 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) { // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second)) + samplesDiff := int32(ts - uint32(r.extStartTS)) + if samplesDiff < 0 { + // out-of-order, skip + return + } + samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second)) firstTime := time.Now().Add(-samplesDuration) if firstTime.Before(r.firstTime) { - r.logger.Infow( + r.logger.Debugw( "adjusting first packet time", "before", r.firstTime.String(), "after", firstTime.String(), ) if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { - r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime)) + r.logger.Infow("first packet time adjustment too big, ignoring", + "adjustment", r.firstTime.Sub(firstTime), + "before", r.firstTime.String(), + "after", firstTime.String(), + "ts", ts, + "extStartTS", r.extStartTS, + ) } else { r.firstTime = firstTime } @@ -814,8 +845,10 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received anachronous sender report", - "current", srData.NTPTimestamp.Time(), - "last", r.srNewest.NTPTimestamp.Time(), + "currentNTP", srData.NTPTimestamp.Time().String(), + "currentRTP", srData.RTPTimestamp, + "lastNTP", r.srNewest.NTPTimestamp.Time().String(), + "lastRTP", r.srNewest.RTPTimestamp, ) return } @@ -831,7 +864,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles - r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp) // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration @@ -952,7 +985,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds()) if nowRTPExtUsingRate > nowRTPExt { nowRTPExt = nowRTPExtUsingRate - nowRTP = uint32(nowRTPExtUsingRate) + nowRTP = uint32(nowRTPExt) } } @@ -979,6 +1012,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) ) ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate)) + nowRTP = uint32(nowRTPExt) } // monitor and log RTP timestamp anomalies diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index c5c4138ba..389d330c3 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/livekit/protocol/logger" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -38,6 +39,7 @@ func TestRTPStats(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) totalDuration := 5 * time.Second @@ -79,6 +81,7 @@ func TestRTPStats_Update(t *testing.T) { clockRate := uint32(90000) r := NewRTPStats(RTPStatsParams{ ClockRate: clockRate, + Logger: logger.GetLogger(), }) sequenceNumber := uint16(rand.Float64() * float64(1<<16)) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 35e8af728..91c4bb533 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -409,9 +409,11 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, // Bind is called under RTPSender.mu lock, call the RTPSender.GetParameters in goroutine to avoid deadlock go func() { if tr := d.transceiver.Load(); tr != nil { - extensions := tr.Sender().GetParameters().HeaderExtensions - d.params.Logger.Debugw("negotiated downtrack extensions", "extensions", extensions) - d.SetRTPHeaderExtensions(extensions) + if sender := tr.Sender(); sender != nil { + extensions := sender.GetParameters().HeaderExtensions + d.params.Logger.Debugw("negotiated downtrack extensions", "extensions", extensions) + d.SetRTPHeaderExtensions(extensions) + } } }() diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 74cb87de8..1d5c5ac7f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -309,6 +309,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ f.vls = videolayerselector.NewVP9(f.logger) } } + // SVC-TODO: Support for VP9 simulcast. When DD is not available, have to pick selector based on VP9 SVC or Simulcast case "video/av1": // DD-TODO : we only enable dd layer selector for av1/vp9 now, in the future we can enable it for vp8 too if f.vls != nil { @@ -316,6 +317,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ } else { f.vls = videolayerselector.NewDependencyDescriptor(f.logger) } + // SVC-TODO: Support for AV1 Simulcast or just single spatial layer - won't have DD in that case } } @@ -1220,7 +1222,10 @@ func (f *Forwarder) GetNextHigherTransition(brs Bitrates, allowOvershoot bool) ( for s := minSpatial; s <= maxSpatial; s++ { for t := minTemporal; t <= maxTemporal; t++ { bandwidthRequested := brs[s][t] - if bandwidthRequested == 0 { + // traverse till finding a layer requiring more bits. + // NOTE: it possible that higher temporal layer of lower spatial layer + // could use more bits than lower temporal layer of higher spatial layer. + if bandwidthRequested == 0 || bandwidthRequested < alreadyAllocated { continue } @@ -1330,11 +1335,7 @@ func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) Video alloc.PauseReason != f.lastAllocation.PauseReason || alloc.TargetLayer != f.lastAllocation.TargetLayer || alloc.RequestLayerSpatial != f.lastAllocation.RequestLayerSpatial { - if reason == "optimal" { - f.logger.Debugw(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc) - } else { - f.logger.Infow(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc) - } + f.logger.Debugw(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc) } f.lastAllocation = alloc @@ -1429,7 +1430,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.referenceLayerSpatial = layer f.rtpMunger.SetLastSnTs(extPkt) f.codecMunger.SetLast(extPkt) - f.logger.Infow( + f.logger.Debugw( "starting forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, "timestamp", extPkt.Packet.Timestamp, @@ -1440,7 +1441,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) { - f.logger.Infow( + f.logger.Debugw( message, "layer", layer, "expectedTS", expectedTS, @@ -1465,7 +1466,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet // Ideally, refTS and expectedTS should be very close and lastTS should be before both of those. // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. - lastTS := f.rtpMunger.GetLast().LastTS + rtpMungerState := f.rtpMunger.GetLast() + lastTS := rtpMungerState.LastTS refTS := lastTS expectedTS := lastTS switchingAt := time.Now() @@ -1563,19 +1565,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) - nextTS = expectedTS - } else { - nextTS = refTS } + nextTS = refTS } } if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) { - f.logger.Infow("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS) + f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS) // nominal increase nextTS = lastTS + 1 } - f.logger.Infow( + f.logger.Debugw( "next timestamp on switch", "switchingAt", switchingAt.String(), "layer", layer, @@ -1585,7 +1585,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "referenceLayerSpatial", f.referenceLayerSpatial, "expectedTS", expectedTS, "nextTS", nextTS, - "jump", nextTS-lastTS, + "tsJump", nextTS-lastTS, + "nextSN", rtpMungerState.LastSN+1, ) f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index c9b336aa4..ed5046bdc 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -132,6 +132,10 @@ type WebRTCReceiver struct { redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32) } +// SVC-TODO: Have to use more conditions to differentiate between +// SVC-TODO: SVC and non-SVC (could be single layer or simulcast). +// SVC-TODO: May only need to differentiate between simulcast and non-simulcast +// SVC-TODO: i. e. may be possible to treat single layer as SVC to get proper/intended functionality. func IsSvcCodec(mime string) bool { switch strings.ToLower(mime) { case "video/av1": @@ -232,10 +236,13 @@ func NewWebRTCReceiver( }) w.connectionStats.Start(w.trackInfo) - for _, ext := range receiver.GetParameters().HeaderExtensions { - if ext.URI == dd.ExtensionURI { - w.streamTrackerManager.AddDependencyDescriptorTrackers() - break + // SVC-TODO: Handle DD for non-SVC cases??? + if w.isSVC { + for _, ext := range receiver.GetParameters().HeaderExtensions { + if ext.URI == dd.ExtensionURI { + w.streamTrackerManager.AddDependencyDescriptorTrackers() + break + } } } diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 57cf49e8b..9254be438 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -114,7 +114,7 @@ func (r *RedReceiver) Close() { func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { // red encoding don't support nack - return 0, bucket.ErrPacketNotFound + return 0, bucket.ErrPacketMismatch } func (r *RedReceiver) encodeRedForPrimary(pkt *rtp.Packet, redPayload []byte) (int, error) { diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 48d7eb14c..6441714b2 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -852,24 +852,19 @@ func (s *StreamAllocator) allocateTrack(track *Track) { // // For both cases, do // a. Find cooperative transition from track that needs allocation. - // b. If track is currently streaming at minimum, do not do anything. - // c. If track is giving back bits, apply the transition and use bits given + // b. If track is giving back bits, apply the transition and use bits given // back to boost any deficient track(s). // // If track needs more bits, i.e. upward transition (may need resume or higher layer subscription), // a. Try to allocate using existing headroom. This can be tried to get the best // possible fit for the available headroom. // b. If there is not enough headroom to allocate anything, ask for best offer from - // other tracks that are currently streaming and try to use it. + // other tracks that are currently streaming and try to use it. This is done only if the + // track needing change is not currently streaming, i. e. it has to be resumed. // track.ProvisionalAllocatePrepare() transition := track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) - // track is currently streaming at minimum - if transition.BandwidthDelta == 0 { - return - } - // downgrade, giving back bits if transition.From.GreaterThan(transition.To) { allocation := track.ProvisionalAllocateCommit() @@ -928,6 +923,11 @@ func (s *StreamAllocator) allocateTrack(track *Track) { transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom } + // track is currently streaming at minimum + if transition.BandwidthDelta == 0 { + return + } + // if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired. bandwidthAcquired := int64(0) var contributingTracks []*Track @@ -1212,20 +1212,6 @@ func (s *StreamAllocator) newChannelObserverNonProbe() *ChannelObserver { func (s *StreamAllocator) initProbe(probeGoalDeltaBps int64) { expectedBandwidthUsage := s.getExpectedBandwidthUsage() - if float64(expectedBandwidthUsage) > 1.5*float64(s.committedChannelCapacity) { - // STREAM-ALLOCATOR-TODO-START - // Should probably skip probing if the expected usage is much higher than committed channel capacity. - // But, give that bandwidth estimate is volatile at times and can drop down to small values, - // not probing means streaming stuck in a well for long. - // Observe this and figure out if there is a threshold from practical use cases that can be used to - // skip probing safely - // STREAM-ALLOCATOR-TODO-END - s.params.Logger.Warnw( - "stream allocator: starting probe alarm", - fmt.Errorf("expected too high, expected: %d, committed: %d", expectedBandwidthUsage, s.committedChannelCapacity), - ) - } - probeClusterId, probeGoalBps := s.probeController.InitProbe(probeGoalDeltaBps, expectedBandwidthUsage) channelState := "" diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 30ba98322..9aa82f91e 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -571,7 +571,15 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) { // use minimal offset to indicate value availability in the extremely unlikely case of // both layers using the same timestamp if offset == 0 { - s.logger.Infow("using default offset", "ref", ref, "other", other) + s.logger.Debugw( + "using default offset", + "ref", ref, + "refNTP", srRef.NTPTimestamp.Time().String(), + "refRTP", srRef.RTPTimestamp, + "other", other, + "otherNTP", srOther.NTPTimestamp.Time().String(), + "otherRTP", srOther.RTPTimestamp, + ) offset = 1 } diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 299002fbf..f9f102e1c 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -113,7 +113,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended // re-adjust start if necessary. The conditions are // 1. Not seen more than half the range yet - // 1. wrap around compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space + // 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space // 2. no wrap around, but out-of-order compared to start and not completed a half cycle , sequences like (10, 9), (65530, 65528) in uint16 space cycles := w.cycles totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1 diff --git a/pkg/sfu/videolayerselector/base.go b/pkg/sfu/videolayerselector/base.go index 346632cb9..08b61e308 100644 --- a/pkg/sfu/videolayerselector/base.go +++ b/pkg/sfu/videolayerselector/base.go @@ -126,7 +126,7 @@ func (b *Base) Select(_extPkt *buffer.ExtPacket, _layer int32) (result VideoLaye } func (b *Base) Rollback() { - b.logger.Infow( + b.logger.Debugw( "rolling back", "previous", b.previousLayer, "current", b.currentLayer,