Key frames (#522)

* Key frames

- Keep track of key frame stats
- Split out PLI from down track used for purpose of layer locking.
This will give us a good picture of down stream issues forcing a PLI.
- Use key frame requester whenever there is a layer lock required.
Not just the first key frame. With the synchronous thing, the counter
was just ridiculously high like 150 or something because of all
the initial padding packets. Also, use RTT in key frame requester.

* send first PLI before waiting

* Turn off key frame requester when disabled

* simplify
This commit is contained in:
Raja Subramanian
2022-03-16 19:55:12 +05:30
committed by GitHub
parent 82192f524e
commit 33f9726b79
8 changed files with 127 additions and 36 deletions
+1 -1
View File
@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36
github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+2 -2
View File
@@ -134,8 +134,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 h1:D3lWDCvyMuHNVgP4cjscDdK8DBQQykIJFHnx3ragxhk=
github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914 h1:zD7PFc1Qjrr+bzAMKwmA/sjBpaSDWh0mboKRB4XJLBA=
github.com/livekit/protocol v0.11.14-0.20220316120321-6f2a49e44914/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
+3
View File
@@ -460,6 +460,9 @@ func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTi
}
if ep.KeyFrame {
b.logger.Debugw("key frame received")
if b.rtpStats != nil {
b.rtpStats.UpdateKeyFrame(1)
}
}
return ep, temporalLayer
+59 -4
View File
@@ -98,9 +98,15 @@ type RTPStats struct {
plis uint32
lastPli time.Time
layerLockPlis uint32
lastLayerLockPli time.Time
firs uint32
lastFir time.Time
keyFrames uint32
lastKeyFrame time.Time
rtt uint32
maxRtt uint32
@@ -396,6 +402,18 @@ func (r *RTPStats) TimeSinceLastPli() int64 {
return time.Now().UnixNano() - r.lastPli.UnixNano()
}
func (r *RTPStats) UpdateLayerLockPliAndTime(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.layerLockPlis += pliCount
r.lastLayerLockPli = time.Now()
}
func (r *RTPStats) UpdateFir(firCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -418,6 +436,18 @@ func (r *RTPStats) UpdateFirTime() {
r.lastFir = time.Now()
}
func (r *RTPStats) UpdateKeyFrame(kfCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.keyFrames += kfCount
r.lastKeyFrame = time.Now()
}
func (r *RTPStats) UpdateRtt(rtt uint32) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -427,8 +457,8 @@ func (r *RTPStats) UpdateRtt(rtt uint32) {
}
r.rtt = rtt
if r.rtt > r.maxRtt {
r.maxRtt = r.rtt
if rtt > r.maxRtt {
r.maxRtt = rtt
}
for _, s := range r.snapshots {
@@ -586,7 +616,7 @@ func (r *RTPStats) ToString() string {
str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate)
str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage)
str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate)
str += fmt.Sprintf(", f: %d|%.1f/s", p.Frames, p.FrameRate)
str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate))
str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate)
str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate)
@@ -621,7 +651,10 @@ func (r *RTPStats) ToString() string {
str += fmt.Sprintf("%d|%d", p.Nacks, p.NackMisses)
str += ", pli:"
str += fmt.Sprintf("%d|%+v", p.Plis, p.LastPli.AsTime().Format(time.UnixDate))
str += fmt.Sprintf("%d|%+v / %d|%+v",
p.Plis, p.LastPli.AsTime().Format(time.UnixDate),
p.LayerLockPlis, p.LastLayerLockPli.AsTime().Format(time.UnixDate),
)
str += ", fir:"
str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate))
@@ -704,12 +737,16 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
PacketsOutOfOrder: r.packetsOutOfOrder,
Frames: r.frames,
FrameRate: frameRate,
KeyFrames: r.keyFrames,
LastKeyFrame: timestamppb.New(r.lastKeyFrame),
JitterCurrent: jitterTime,
JitterMax: maxJitterTime,
Nacks: r.nacks,
NackMisses: r.nackMisses,
Plis: r.plis,
LastPli: timestamppb.New(r.lastPli),
LayerLockPlis: r.layerLockPlis,
LastLayerLockPli: timestamppb.New(r.lastLayerLockPli),
Firs: r.firs,
LastFir: timestamppb.New(r.lastFir),
RttCurrent: r.rtt,
@@ -873,6 +910,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
bytesPadding := uint64(0)
packetsOutOfOrder := uint32(0)
frames := uint32(0)
keyFrames := uint32(0)
lastKeyFrame := time.Time{}
jitter := float64(0.0)
maxJitter := float64(0)
gapHistogram := make(map[int32]uint32, GapHistogramNumBins)
@@ -880,6 +919,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
nackMisses := uint32(0)
plis := uint32(0)
lastPli := time.Time{}
layerLockPlis := uint32(0)
lastLayerLockPli := time.Time{}
firs := uint32(0)
lastFir := time.Time{}
rtt := uint32(0)
@@ -909,6 +950,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
frames += stats.Frames
keyFrames += stats.KeyFrames
if lastKeyFrame.IsZero() || lastKeyFrame.Before(stats.LastKeyFrame.AsTime()) {
lastKeyFrame = stats.LastKeyFrame.AsTime()
}
jitter += stats.JitterCurrent
if stats.JitterMax > maxJitter {
maxJitter = stats.JitterMax
@@ -926,6 +972,11 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
lastPli = stats.LastPli.AsTime()
}
layerLockPlis += stats.LayerLockPlis
if lastLayerLockPli.IsZero() || lastLayerLockPli.Before(stats.LastLayerLockPli.AsTime()) {
lastLayerLockPli = stats.LastLayerLockPli.AsTime()
}
firs += stats.Firs
if lastFir.IsZero() || lastPli.Before(stats.LastFir.AsTime()) {
lastFir = stats.LastFir.AsTime()
@@ -977,6 +1028,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
PacketsOutOfOrder: packetsOutOfOrder,
Frames: frames,
FrameRate: frameRate,
KeyFrames: keyFrames,
LastKeyFrame: timestamppb.New(lastKeyFrame),
JitterCurrent: jitter / float64(len(statses)),
JitterMax: maxJitter,
GapHistogram: gapHistogram,
@@ -984,6 +1037,8 @@ func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
NackMisses: nackMisses,
Plis: plis,
LastPli: timestamppb.New(lastPli),
LayerLockPlis: layerLockPlis,
LastLayerLockPli: timestamppb.New(lastLayerLockPli),
Firs: firs,
LastFir: timestamppb.New(lastFir),
RttCurrent: rtt / uint32(len(statses)),
+48 -23
View File
@@ -39,9 +39,10 @@ const (
RTPPaddingEstimatedHeaderSize = 20
RTPBlankFramesMax = 6
firstKeyFramePLIInterval = 500 * time.Millisecond
FlagStopRTXOnPLI = true
keyFrameIntervalMin = 200
keyFrameIntervalMax = 1000
)
var (
@@ -103,6 +104,8 @@ type DownTrack struct {
statsLock sync.RWMutex
totalRepeatedNACKs uint32
keyFrameRequestGeneration atomic.Uint32
connectionStats *connectionquality.ConnectionStats
connectionQualitySnapshotId uint32
@@ -140,8 +143,6 @@ type DownTrack struct {
// update rtt
onRttUpdate func(dt *DownTrack, rtt uint32)
closed chan struct{}
}
// NewDownTrack returns a DownTrack.
@@ -175,7 +176,6 @@ func NewDownTrack(
kind: kind,
forwarder: NewForwarder(c, kind, logger),
callbacksQueue: utils.NewOpsQueue(logger),
closed: make(chan struct{}),
}
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
@@ -235,8 +235,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
}
d.bound.Store(true)
go d.requestFirstKeyframe()
d.connectionStats.Start()
return codec, nil
@@ -293,17 +291,38 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) {
d.transceiver = transceiver
}
func (d *DownTrack) requestFirstKeyframe() {
ticker := time.NewTicker(firstKeyFramePLIInterval)
for !d.forwarder.ReceivedFirstKeyFrame() {
select {
case <-d.closed:
return
func (d *DownTrack) maybeStartKeyFrameRequester() {
//
// Always move to next generation to abandon any running key frame requester
// This ensures that it is stopped if forwarding is disabled due to mute
// or paused due to bandwidth constraints. A new key frame requester is
// started if a layer lock is required.
//
gen := d.keyFrameRequestGeneration.Inc()
case <-ticker.C:
if l := d.forwarder.TargetLayers(); l != InvalidLayers {
d.receiver.SendPLI(l.spatial)
}
locked, layer := d.forwarder.CheckSync()
if !locked {
go d.keyFrameRequester(gen, layer)
}
}
func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
interval := 2 * d.rtpStats.GetRtt()
if interval < keyFrameIntervalMin {
interval = keyFrameIntervalMin
}
if interval > keyFrameIntervalMax {
interval = keyFrameIntervalMax
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
for {
d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer)
d.receiver.SendPLI(layer)
d.rtpStats.UpdateLayerLockPliAndTime(1)
<-ticker.C
if generation != d.keyFrameRequestGeneration.Load() {
return
}
}
}
@@ -323,11 +342,6 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
if tp.shouldSendPLI {
d.receiver.SendPLI(layer)
d.rtpStats.UpdatePli(1)
d.rtpStats.UpdatePliTime()
}
if tp.shouldDrop {
if tp.isDroppingRelevant {
d.pktsDropped.Inc()
@@ -379,6 +393,13 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
if extPkt.KeyFrame {
d.isNACKThrottled.Store(false)
d.rtpStats.UpdateKeyFrame(1)
locked, _ := d.forwarder.CheckSync()
if locked {
// move generator to stop key frame requester
d.keyFrameRequestGeneration.Inc()
}
}
d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano())
@@ -557,7 +578,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
}
d.callbacksQueue.Stop()
close(d.closed)
d.keyFrameRequestGeneration.Inc()
})
}
@@ -689,6 +710,7 @@ func (d *DownTrack) DistanceToDesired() int32 {
func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation {
allocation := d.forwarder.Allocate(availableChannelCapacity, allowPause, d.receiver.GetBitrateTemporalCumulative())
d.logger.Debugw("stream: allocation", "channel", availableChannelCapacity, "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}
@@ -715,6 +737,7 @@ func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
allocation := d.forwarder.ProvisionalAllocateCommit()
d.logger.Debugw("stream: allocation commit", "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}
@@ -725,6 +748,7 @@ func (d *DownTrack) FinalizeAllocate() VideoAllocation {
func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64) (VideoAllocation, bool) {
allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative())
d.logger.Debugw("stream: allocation next higher layer", "allocation", allocation, "available", available)
d.maybeStartKeyFrameRequester()
return allocation, available
}
@@ -737,6 +761,7 @@ func (d *DownTrack) GetNextHigherTransition() (VideoTransition, bool) {
func (d *DownTrack) Pause() VideoAllocation {
allocation := d.forwarder.Pause(d.receiver.GetBitrateTemporalCumulative())
d.logger.Debugw("stream: pause", "allocation", allocation)
d.maybeStartKeyFrameRequester()
return allocation
}
+10 -4
View File
@@ -121,7 +121,6 @@ const (
type TranslationParams struct {
shouldDrop bool
isDroppingRelevant bool
shouldSendPLI bool
isSwitchingToMaxLayer bool
rtp *TranslationParamsRTP
vp8 *TranslationParamsVP8
@@ -1144,6 +1143,16 @@ func (f *Forwarder) resyncLocked() {
f.lastSSRC = 0
}
func (f *Forwarder) CheckSync() (locked bool, layer int32) {
f.lock.RLock()
defer f.lock.RUnlock()
layer = f.targetLayers.spatial
locked = f.targetLayers.spatial == f.currentLayers.spatial
return
}
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool) {
if !FlagFilterRTX {
filtered = nacks
@@ -1247,7 +1256,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
return tp, nil
}
tp.shouldSendPLI = false
if f.targetLayers.spatial != f.currentLayers.spatial {
if f.targetLayers.spatial == layer {
if extPkt.KeyFrame {
@@ -1258,8 +1266,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
tp.isSwitchingToMaxLayer = true
}
f.receivedFirstKeyFrame.Store(true)
} else {
tp.shouldSendPLI = true
}
}
}
+1 -2
View File
@@ -1167,8 +1167,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
temporal: 1,
}
expectedTP = TranslationParams{
shouldDrop: true,
shouldSendPLI: true,
shouldDrop: true,
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
require.NoError(t, err)
+3
View File
@@ -48,6 +48,9 @@ func getRttMs(report *rtcp.ReceptionReport) uint32 {
// middle 32-bits of current NTP time
now := uint32(buffer.ToNtpTime(time.Now()) >> 16)
if now < (report.LastSenderReport + report.Delay) {
return 0
}
ntpDiff := now - report.LastSenderReport - report.Delay
return uint32(math.Ceil(float64(ntpDiff) * 1000.0 / 65536.0))
}