From 33f9726b79e1224bb768c9c1e9852980f7307986 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 16 Mar 2022 19:55:12 +0530 Subject: [PATCH] Key frames (#522) * Key frames - Keep track of key frame stats - Split out PLI from down track used for purpose of layer locking. This will give us a good picture of down stream issues forcing a PLI. - Use key frame requester whenever there is a layer lock required. Not just the first key frame. With the synchronous thing, the counter was just ridiculously high like 150 or something because of all the initial padding packets. Also, use RTT in key frame requester. * send first PLI before waiting * Turn off key frame requester when disabled * simplify --- go.mod | 2 +- go.sum | 4 +-- pkg/sfu/buffer/buffer.go | 3 ++ pkg/sfu/buffer/rtpstats.go | 63 ++++++++++++++++++++++++++++++--- pkg/sfu/downtrack.go | 71 ++++++++++++++++++++++++++------------ pkg/sfu/forwarder.go | 14 +++++--- pkg/sfu/forwarder_test.go | 3 +- pkg/sfu/helpers.go | 3 ++ 8 files changed, 127 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 701adce6a..b43bd76cf 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 + github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 85d9da14d..396968cb8 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 h1:D3lWDCvyMuHNVgP4cjscDdK8DBQQykIJFHnx3ragxhk= -github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= +github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914 h1:zD7PFc1Qjrr+bzAMKwmA/sjBpaSDWh0mboKRB4XJLBA= +github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 16289bc01..80bab8bc6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -460,6 +460,9 @@ func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTi } if ep.KeyFrame { b.logger.Debugw("key frame received") + if b.rtpStats != nil { + b.rtpStats.UpdateKeyFrame(1) + } } return ep, temporalLayer diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index c9f1facf3..c6bfcd874 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -98,9 +98,15 @@ type RTPStats struct { plis uint32 lastPli time.Time + layerLockPlis uint32 + lastLayerLockPli time.Time + firs uint32 lastFir time.Time + keyFrames uint32 + lastKeyFrame time.Time + rtt uint32 maxRtt uint32 @@ -396,6 +402,18 @@ func (r *RTPStats) TimeSinceLastPli() int64 { return time.Now().UnixNano() - r.lastPli.UnixNano() } +func (r *RTPStats) UpdateLayerLockPliAndTime(pliCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.layerLockPlis += pliCount + r.lastLayerLockPli = time.Now() +} + func (r *RTPStats) UpdateFir(firCount uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -418,6 +436,18 @@ func (r *RTPStats) UpdateFirTime() { r.lastFir = time.Now() } +func (r *RTPStats) UpdateKeyFrame(kfCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.keyFrames += kfCount + r.lastKeyFrame = time.Now() +} + func (r *RTPStats) UpdateRtt(rtt uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -427,8 +457,8 @@ func (r *RTPStats) UpdateRtt(rtt uint32) { } r.rtt = rtt - if r.rtt > r.maxRtt { - r.maxRtt = r.rtt + if rtt > r.maxRtt { + r.maxRtt = rtt } for _, s := range r.snapshots { @@ -586,7 +616,7 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate) str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage) str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate) - str += fmt.Sprintf(", f: %d|%.1f/s", p.Frames, p.FrameRate) + str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate)) str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate) str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate) @@ -621,7 +651,10 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf("%d|%d", p.Nacks, p.NackMisses) str += ", pli:" - str += fmt.Sprintf("%d|%+v", p.Plis, p.LastPli.AsTime().Format(time.UnixDate)) + str += fmt.Sprintf("%d|%+v / %d|%+v", + p.Plis, p.LastPli.AsTime().Format(time.UnixDate), + p.LayerLockPlis, p.LastLayerLockPli.AsTime().Format(time.UnixDate), + ) str += ", fir:" str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate)) @@ -704,12 +737,16 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { PacketsOutOfOrder: r.packetsOutOfOrder, Frames: r.frames, FrameRate: frameRate, + KeyFrames: r.keyFrames, + LastKeyFrame: timestamppb.New(r.lastKeyFrame), JitterCurrent: jitterTime, JitterMax: maxJitterTime, Nacks: r.nacks, NackMisses: r.nackMisses, Plis: r.plis, LastPli: timestamppb.New(r.lastPli), + LayerLockPlis: r.layerLockPlis, + LastLayerLockPli: timestamppb.New(r.lastLayerLockPli), Firs: r.firs, LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, @@ -873,6 +910,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { bytesPadding := uint64(0) packetsOutOfOrder := uint32(0) frames := uint32(0) + keyFrames := uint32(0) + lastKeyFrame := time.Time{} jitter := float64(0.0) maxJitter := float64(0) gapHistogram := make(map[int32]uint32, GapHistogramNumBins) @@ -880,6 +919,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { nackMisses := uint32(0) plis := uint32(0) lastPli := time.Time{} + layerLockPlis := uint32(0) + lastLayerLockPli := time.Time{} firs := uint32(0) lastFir := time.Time{} rtt := uint32(0) @@ -909,6 +950,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { frames += stats.Frames + keyFrames += stats.KeyFrames + if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) { + lastKeyFrame = stats.LastKeyFrame.AsTime() + } + jitter += stats.JitterCurrent if stats.JitterMax > maxJitter { maxJitter = stats.JitterMax @@ -926,6 +972,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { lastPli = stats.LastPli.AsTime() } + layerLockPlis += stats.LayerLockPlis + if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) { + lastLayerLockPli = stats.LastLayerLockPli.AsTime() + } + firs += stats.Firs if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) { lastFir = stats.LastFir.AsTime() @@ -977,6 +1028,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { PacketsOutOfOrder: packetsOutOfOrder, Frames: frames, FrameRate: frameRate, + KeyFrames: keyFrames, + LastKeyFrame: timestamppb.New(lastKeyFrame), JitterCurrent: jitter / float64(len(statses)), JitterMax: maxJitter, GapHistogram: gapHistogram, @@ -984,6 +1037,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { NackMisses: nackMisses, Plis: plis, LastPli: timestamppb.New(lastPli), + LayerLockPlis: layerLockPlis, + LastLayerLockPli: timestamppb.New(lastLayerLockPli), Firs: firs, LastFir: timestamppb.New(lastFir), RttCurrent: rtt / uint32(len(statses)), diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5da64d2e3..b22b07749 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -39,9 +39,10 @@ const ( RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 - firstKeyFramePLIInterval = 500 * time.Millisecond - FlagStopRTXOnPLI = true + + keyFrameIntervalMin = 200 + keyFrameIntervalMax = 1000 ) var ( @@ -103,6 +104,8 @@ type DownTrack struct { statsLock sync.RWMutex totalRepeatedNACKs uint32 + keyFrameRequestGeneration atomic.Uint32 + connectionStats *connectionquality.ConnectionStats connectionQualitySnapshotId uint32 @@ -140,8 +143,6 @@ type DownTrack struct { // update rtt onRttUpdate func(dt *DownTrack, rtt uint32) - - closed chan struct{} } // NewDownTrack returns a DownTrack. @@ -175,7 +176,6 @@ func NewDownTrack( kind: kind, forwarder: NewForwarder(c, kind, logger), callbacksQueue: utils.NewOpsQueue(logger), - closed: make(chan struct{}), } d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -235,8 +235,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, } d.bound.Store(true) - go d.requestFirstKeyframe() - d.connectionStats.Start() return codec, nil @@ -293,17 +291,38 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver = transceiver } -func (d *DownTrack) requestFirstKeyframe() { - ticker := time.NewTicker(firstKeyFramePLIInterval) - for !d.forwarder.ReceivedFirstKeyFrame() { - select { - case <-d.closed: - return +func (d *DownTrack) maybeStartKeyFrameRequester() { + // + // Always move to next generation to abandon any running key frame requester + // This ensures that it is stopped if forwarding is disabled due to mute + // or paused due to bandwidth constraints. A new key frame requester is + // started if a layer lock is required. + // + gen := d.keyFrameRequestGeneration.Inc() - case <-ticker.C: - if l := d.forwarder.TargetLayers(); l != InvalidLayers { - d.receiver.SendPLI(l.spatial) - } + locked, layer := d.forwarder.CheckSync() + if !locked { + go d.keyFrameRequester(gen, layer) + } +} + +func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { + interval := 2 * d.rtpStats.GetRtt() + if interval < keyFrameIntervalMin { + interval = keyFrameIntervalMin + } + if interval > keyFrameIntervalMax { + interval = keyFrameIntervalMax + } + ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + for { + d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) + d.receiver.SendPLI(layer) + d.rtpStats.UpdateLayerLockPliAndTime(1) + + <-ticker.C + if generation != d.keyFrameRequestGeneration.Load() { + return } } } @@ -323,11 +342,6 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } tp, err := d.forwarder.GetTranslationParams(extPkt, layer) - if tp.shouldSendPLI { - d.receiver.SendPLI(layer) - d.rtpStats.UpdatePli(1) - d.rtpStats.UpdatePliTime() - } if tp.shouldDrop { if tp.isDroppingRelevant { d.pktsDropped.Inc() @@ -379,6 +393,13 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if extPkt.KeyFrame { d.isNACKThrottled.Store(false) + d.rtpStats.UpdateKeyFrame(1) + + locked, _ := d.forwarder.CheckSync() + if locked { + // move generator to stop key frame requester + d.keyFrameRequestGeneration.Inc() + } } d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) @@ -557,7 +578,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.callbacksQueue.Stop() - close(d.closed) + d.keyFrameRequestGeneration.Inc() }) } @@ -689,6 +710,7 @@ func (d *DownTrack) DistanceToDesired() int32 { func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation { allocation := d.forwarder.Allocate(availableChannelCapacity, allowPause, d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: allocation", "channel", availableChannelCapacity, "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } @@ -715,6 +737,7 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { allocation := d.forwarder.ProvisionalAllocateCommit() d.logger.Debugw("stream: allocation commit", "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } @@ -725,6 +748,7 @@ func (d *DownTrack) FinalizeAllocate() VideoAllocation { func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64) (VideoAllocation, bool) { allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: allocation next higher layer", "allocation", allocation, "available", available) + d.maybeStartKeyFrameRequester() return allocation, available } @@ -737,6 +761,7 @@ func (d *DownTrack) GetNextHigherTransition() (VideoTransition, bool) { func (d *DownTrack) Pause() VideoAllocation { allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative()) d.logger.Debugw("stream: pause", "allocation", allocation) + d.maybeStartKeyFrameRequester() return allocation } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index c6453a6f3..201043b31 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -121,7 +121,6 @@ const ( type TranslationParams struct { shouldDrop bool isDroppingRelevant bool - shouldSendPLI bool isSwitchingToMaxLayer bool rtp *TranslationParamsRTP vp8 *TranslationParamsVP8 @@ -1144,6 +1143,16 @@ func (f *Forwarder) resyncLocked() { f.lastSSRC = 0 } +func (f *Forwarder) CheckSync() (locked bool, layer int32) { + f.lock.RLock() + defer f.lock.RUnlock() + + layer = f.targetLayers.spatial + locked = f.targetLayers.spatial == f.currentLayers.spatial + + return +} + func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool) { if !FlagFilterRTX { filtered = nacks @@ -1247,7 +1256,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } - tp.shouldSendPLI = false if f.targetLayers.spatial != f.currentLayers.spatial { if f.targetLayers.spatial == layer { if extPkt.KeyFrame { @@ -1258,8 +1266,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in tp.isSwitchingToMaxLayer = true } f.receivedFirstKeyFrame.Store(true) - } else { - tp.shouldSendPLI = true } } } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index f93a46063..599ed0473 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1167,8 +1167,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { temporal: 1, } expectedTP = TranslationParams{ - shouldDrop: true, - shouldSendPLI: true, + shouldDrop: true, } actualTP, err = f.GetTranslationParams(extPkt, 0) require.NoError(t, err) diff --git a/pkg/sfu/helpers.go b/pkg/sfu/helpers.go index 0003d5420..78cc3fe90 100644 --- a/pkg/sfu/helpers.go +++ b/pkg/sfu/helpers.go @@ -48,6 +48,9 @@ func getRttMs(report *rtcp.ReceptionReport) uint32 { // middle 32-bits of current NTP time now := uint32(buffer.ToNtpTime(time.Now()) >> 16) + if now < (report.LastSenderReport + report.Delay) { + return 0 + } ntpDiff := now - report.LastSenderReport - report.Delay return uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0)) }