diff --git a/go.mod b/go.mod index 701adce6a..b43bd76cf 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 + github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 85d9da14d..396968cb8 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 h1:D3lWDCvyMuHNVgP4cjscDdK8DBQQykIJFHnx3ragxhk= -github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= +github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914 h1:zD7PFc1Qjrr+bzAMKwmA/sjBpaSDWh0mboKRB4XJLBA= +github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 16289bc01..80bab8bc6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -460,6 +460,9 @@ func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTi } if ep.KeyFrame { b.logger.Debugw("key frame received") + if b.rtpStats != nil { + b.rtpStats.UpdateKeyFrame(1) + } } return ep, temporalLayer diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index c9f1facf3..c6bfcd874 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -98,9 +98,15 @@ type RTPStats struct { plis uint32 lastPli time.Time + layerLockPlis uint32 + lastLayerLockPli time.Time + firs uint32 lastFir time.Time + keyFrames uint32 + lastKeyFrame time.Time + rtt uint32 maxRtt uint32 @@ -396,6 +402,18 @@ func (r *RTPStats) TimeSinceLastPli() int64 { return time.Now().UnixNano() - r.lastPli.UnixNano() } +func (r *RTPStats) UpdateLayerLockPliAndTime(pliCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.layerLockPlis += pliCount + r.lastLayerLockPli = time.Now() +} + func (r *RTPStats) UpdateFir(firCount uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -418,6 +436,18 @@ func (r *RTPStats) UpdateFirTime() { r.lastFir = time.Now() } +func (r *RTPStats) UpdateKeyFrame(kfCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.keyFrames += kfCount + r.lastKeyFrame = time.Now() +} + func (r *RTPStats) UpdateRtt(rtt uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -427,8 +457,8 @@ func (r *RTPStats) UpdateRtt(rtt uint32) { } r.rtt = rtt - if r.rtt > r.maxRtt { - r.maxRtt = r.rtt + if rtt > r.maxRtt { + r.maxRtt = rtt } for _, s := range r.snapshots { @@ -586,7 +616,7 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate) str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage) str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate) - str += fmt.Sprintf(", f: %d|%.1f/s", p.Frames, p.FrameRate) + str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate)) str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate) str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate) @@ -621,7 +651,10 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf("%d|%d", p.Nacks, p.NackMisses) str += ", pli:" - str += fmt.Sprintf("%d|%+v", p.Plis, p.LastPli.AsTime().Format(time.UnixDate)) + str += fmt.Sprintf("%d|%+v / %d|%+v", + p.Plis, p.LastPli.AsTime().Format(time.UnixDate), + p.LayerLockPlis, p.LastLayerLockPli.AsTime().Format(time.UnixDate), + ) str += ", fir:" str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate)) @@ -704,12 +737,16 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { PacketsOutOfOrder: r.packetsOutOfOrder, Frames: r.frames, FrameRate: frameRate, + KeyFrames: r.keyFrames, + LastKeyFrame: timestamppb.New(r.lastKeyFrame), JitterCurrent: jitterTime, JitterMax: maxJitterTime, Nacks: r.nacks, NackMisses: r.nackMisses, Plis: r.plis, LastPli: timestamppb.New(r.lastPli), + LayerLockPlis: r.layerLockPlis, + LastLayerLockPli: timestamppb.New(r.lastLayerLockPli), Firs: r.firs, LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, @@ -873,6 +910,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { bytesPadding := uint64(0) packetsOutOfOrder := uint32(0) frames := uint32(0) + keyFrames := uint32(0) + lastKeyFrame := time.Time{} jitter := float64(0.0) maxJitter := float64(0) gapHistogram := make(map[int32]uint32, GapHistogramNumBins) @@ -880,6 +919,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { nackMisses := uint32(0) plis := uint32(0) lastPli := time.Time{} + layerLockPlis := uint32(0) + lastLayerLockPli := time.Time{} firs := uint32(0) lastFir := time.Time{} rtt := uint32(0) @@ -909,6 +950,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { frames += stats.Frames + keyFrames += stats.KeyFrames + if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) { + lastKeyFrame = stats.LastKeyFrame.AsTime() + } + jitter += stats.JitterCurrent if stats.JitterMax > maxJitter { maxJitter = stats.JitterMax @@ -926,6 +972,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { lastPli = stats.LastPli.AsTime() } + layerLockPlis += stats.LayerLockPlis + if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) { + lastLayerLockPli = stats.LastLayerLockPli.AsTime() + } + firs += stats.Firs if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) { lastFir = stats.LastFir.AsTime() @@ -977,6 +1028,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { PacketsOutOfOrder: packetsOutOfOrder, Frames: frames, FrameRate: frameRate, + KeyFrames: keyFrames, + LastKeyFrame: timestamppb.New(lastKeyFrame), JitterCurrent: jitter / float64(len(statses)), JitterMax: maxJitter, GapHistogram: gapHistogram, @@ -984,6 +1037,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { NackMisses: nackMisses, Plis: plis, LastPli: timestamppb.New(lastPli), + LayerLockPlis: layerLockPlis, + LastLayerLockPli: timestamppb.New(lastLayerLockPli), Firs: firs, LastFir: timestamppb.New(lastFir), RttCurrent: rtt / uint32(len(statses)), diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5da64d2e3..b22b07749 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -39,9 +39,10 @@ const ( RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 - firstKeyFramePLIInterval = 500 * time.Millisecond - FlagStopRTXOnPLI = true + + keyFrameIntervalMin = 200 + keyFrameIntervalMax = 1000 ) var ( @@ -103,6 +104,8 @@ type DownTrack struct { statsLock sync.RWMutex totalRepeatedNACKs uint32 + keyFrameRequestGeneration atomic.Uint32 + connectionStats *connectionquality.ConnectionStats connectionQualitySnapshotId uint32 @@ -140,8 +143,6 @@ type DownTrack struct { // update rtt onRttUpdate func(dt *DownTrack, rtt uint32) - - closed chan struct{} } // NewDownTrack returns a DownTrack. @@ -175,7 +176,6 @@ func NewDownTrack( kind: kind, forwarder: NewForwarder(c, kind, logger), callbacksQueue: utils.NewOpsQueue(logger), - closed: make(chan struct{}), } d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -235,8 +235,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, } d.bound.Store(true) - go d.requestFirstKeyframe() - d.connectionStats.Start() return codec, nil @@ -293,17 +291,38 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver = transceiver } -func (d *DownTrack) requestFirstKeyframe() { - ticker := time.NewTicker(firstKeyFramePLIInterval) - for !d.forwarder.ReceivedFirstKeyFrame() { - select { - case <-d.closed: - return +func (d *DownTrack) maybeStartKeyFrameRequester() { + // + // Always move to next generation to abandon any running key frame requester + // This ensures that it is stopped if forwarding is disabled due to mute + // or paused due to bandwidth constraints. A new key frame requester is + // started if a layer lock is required. + // + gen := d.keyFrameRequestGeneration.Inc() - case <-ticker.C: - if l := d.forwarder.TargetLayers(); l != InvalidLayers { - d.receiver.SendPLI(l.spatial) - } + locked, layer := d.forwarder.CheckSync() + if !locked { + go d.keyFrameRequester(gen, layer) + } +} + +func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { + interval := 2 * d.rtpStats.GetRtt() + if interval < keyFrameIntervalMin { + interval = keyFrameIntervalMin + } + if interval > keyFrameIntervalMax { + interval = keyFrameIntervalMax + } + ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + for { + d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) + d.receiver.SendPLI(layer) + d.rtpStats.UpdateLayerLockPliAndTime(1) + + <-ticker.C + if generation != d.keyFrameRequestGeneration.Load() { + return } } } @@ -323,11 +342,6 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } tp, err := d.forwarder.GetTranslationParams(extPkt, layer) - if tp.shouldSendPLI { - d.receiver.SendPLI(layer) - d.rtpStats.UpdatePli(1) - d.rtpStats.UpdatePliTime() - } if tp.shouldDrop { if tp.isDroppingRelevant { d.pktsDropped.Inc() @@ -379,6 +393,13 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if extPkt.KeyFrame { d.isNACKThrottled.Store(false) + d.rtpStats.UpdateKeyFrame(1) + + locked, _ := d.forwarder.CheckSync() + if locked { + // move generator to stop key frame requester + d.keyFrameRequestGeneration.Inc() + } } d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) @@ -557,7 +578,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.callbacksQueue.Stop() - close(d.closed) + d.keyFrameRequestGeneration.Inc() }) } @@ -689,6 +710,7 @@ func (d *DownTrack) DistanceToDesired() int32 { func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation { allocation := d.forwarder.Allocate(availableChannelCapacity, allowPause, d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: allocation", "channel", availableChannelCapacity, "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } @@ -715,6 +737,7 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { allocation := d.forwarder.ProvisionalAllocateCommit() d.logger.Debugw("stream: allocation commit", "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } @@ -725,6 +748,7 @@ func (d *DownTrack) FinalizeAllocate() VideoAllocation { func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64) (VideoAllocation, bool) { allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: allocation next higher layer", "allocation", allocation, "available", available) + d.maybeStartKeyFrameRequester() return allocation, available } @@ -737,6 +761,7 @@ func (d *DownTrack) GetNextHigherTransition() (VideoTransition, bool) { func (d *DownTrack) Pause() VideoAllocation { allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: pause", "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index c6453a6f3..201043b31 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -121,7 +121,6 @@ const ( type TranslationParams struct { shouldDrop bool isDroppingRelevant bool - shouldSendPLI bool isSwitchingToMaxLayer bool rtp *TranslationParamsRTP vp8 *TranslationParamsVP8 @@ -1144,6 +1143,16 @@ func (f *Forwarder) resyncLocked() { f.lastSSRC = 0 } +func (f *Forwarder) CheckSync() (locked bool, layer int32) { + f.lock.RLock() + defer f.lock.RUnlock() + + layer = f.targetLayers.spatial + locked = f.targetLayers.spatial == f.currentLayers.spatial + + return +} + func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool) { if !FlagFilterRTX { filtered = nacks @@ -1247,7 +1256,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } - tp.shouldSendPLI = false if f.targetLayers.spatial != f.currentLayers.spatial { if f.targetLayers.spatial == layer { if extPkt.KeyFrame { @@ -1258,8 +1266,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in tp.isSwitchingToMaxLayer = true } f.receivedFirstKeyFrame.Store(true) - } else { - tp.shouldSendPLI = true } } } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index f93a46063..599ed0473 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1167,8 +1167,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { temporal: 1, } expectedTP = TranslationParams{ - shouldDrop: true, - shouldSendPLI: true, + shouldDrop: true, } actualTP, err = f.GetTranslationParams(extPkt, 0) require.NoError(t, err) diff --git a/pkg/sfu/helpers.go b/pkg/sfu/helpers.go index 0003d5420..78cc3fe90 100644 --- a/pkg/sfu/helpers.go +++ b/pkg/sfu/helpers.go @@ -48,6 +48,9 @@ func getRttMs(report *rtcp.ReceptionReport) uint32 { // middle 32-bits of current NTP time now := uint32(buffer.ToNtpTime(time.Now()) >> 16) + if now < (report.LastSenderReport + report.Delay) { + return 0 + } ntpDiff := now - report.LastSenderReport - report.Delay return uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0)) }