diff --git a/pkg/sfu/playoutdelay.go b/pkg/sfu/playoutdelay.go index 47eeffb18..991f1401b 100644 --- a/pkg/sfu/playoutdelay.go +++ b/pkg/sfu/playoutdelay.go @@ -17,6 +17,7 @@ package sfu import ( "sync" "sync/atomic" + "time" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/rtpextension" @@ -30,11 +31,11 @@ const ( PlayoutDelaySending PlayoutDelayAcked - jitterLowMultiToDelay = 10 - jitterHighMultiToDelay = 15 - jitterHighThreshold = 15 - + jitterMultiToDelay = 10 targetDelayLogThreshold = 500 + + // limit max delay change to make it smoother for a/v sync + maxDelayChangePerSec = 80 ) func (s PlayoutDelayState) String() string { @@ -56,6 +57,7 @@ type PlayoutDelayController struct { currentDelay uint32 extBytes atomic.Value //[]byte sendingAtSeq uint16 + sendingAtTime time.Time logger logger.Logger rtpStats *buffer.RTPStatsSender snapshotID uint32 @@ -89,20 +91,21 @@ func (c *PlayoutDelayController) SetJitter(jitter uint32) { } c.lock.Lock() - multi := jitterLowMultiToDelay - if jitter >= jitterHighThreshold { - multi = jitterHighMultiToDelay - } - targetDelay := jitter * uint32(multi) + targetDelay := jitter * jitterMultiToDelay if nackPercent > 60 { targetDelay += (nackPercent - 60) * 2 } - // increase delay quickly, decrease slowly to make fps more stable - if targetDelay > c.currentDelay { - targetDelay = (targetDelay-c.currentDelay)*3/4 + c.currentDelay - } else { - targetDelay = c.currentDelay - (c.currentDelay-targetDelay)/5 + elapsed := time.Since(c.sendingAtTime) + delayChangeLimit := uint32(maxDelayChangePerSec * elapsed.Seconds()) + if delayChangeLimit > maxDelayChangePerSec { + delayChangeLimit = maxDelayChangePerSec + } + + if targetDelay > c.currentDelay+delayChangeLimit { + targetDelay = c.currentDelay + delayChangeLimit + } else if c.currentDelay > targetDelay+delayChangeLimit { + targetDelay = c.currentDelay - delayChangeLimit } if targetDelay < c.minDelay { targetDelay = c.minDelay @@ -138,6 +141,7 @@ func (c *PlayoutDelayController) GetDelayExtension(seq uint16) []byte { c.lock.Lock() c.state.Store(int32(PlayoutDelaySending)) c.sendingAtSeq = seq + c.sendingAtTime = time.Now() c.lock.Unlock() return c.extBytes.Load().([]byte) case PlayoutDelaySending: diff --git a/pkg/sfu/playoutdelay_test.go b/pkg/sfu/playoutdelay_test.go index 05a88f5d0..cc24d2848 100644 --- a/pkg/sfu/playoutdelay_test.go +++ b/pkg/sfu/playoutdelay_test.go @@ -16,6 +16,7 @@ package sfu import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -26,23 +27,23 @@ import ( func TestPlayoutDelay(t *testing.T) { stats := buffer.NewRTPStatsSender(buffer.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}) - c, err := NewPlayoutDelayController(100, 1000, logger.GetLogger(), stats) + c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats) require.NoError(t, err) ext := c.GetDelayExtension(100) - playoutDelayEqual(t, ext, 100, 1000) + playoutDelayEqual(t, ext, 100, 120) ext = c.GetDelayExtension(105) - playoutDelayEqual(t, ext, 100, 1000) + playoutDelayEqual(t, ext, 100, 120) // seq acked before delay changed c.OnSeqAcked(65534) ext = c.GetDelayExtension(105) - playoutDelayEqual(t, ext, 100, 1000) + playoutDelayEqual(t, ext, 100, 120) c.OnSeqAcked(90) ext = c.GetDelayExtension(105) - playoutDelayEqual(t, ext, 100, 1000) + playoutDelayEqual(t, ext, 100, 120) // seq acked, no extension sent for new packet c.OnSeqAcked(103) @@ -55,16 +56,19 @@ func TestPlayoutDelay(t *testing.T) { require.Nil(t, ext) // delay changed, generate new extension to send + time.Sleep(200 * time.Millisecond) c.SetJitter(50) + t.Log(c.currentDelay, c.state.Load()) ext = c.GetDelayExtension(108) var delay rtpextension.PlayOutDelay require.NoError(t, delay.Unmarshal(ext)) require.Greater(t, delay.Min, uint16(100)) // can't go above max + time.Sleep(200 * time.Millisecond) c.SetJitter(10000) ext = c.GetDelayExtension(109) - playoutDelayEqual(t, ext, 1000, 1000) + playoutDelayEqual(t, ext, 120, 120) } func playoutDelayEqual(t *testing.T, data []byte, min, max uint16) {