From 73af5da956343a16536b08fc2e622a8f2bbac069 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 28 Dec 2023 15:50:51 +0530 Subject: [PATCH] Notify TrackInfo available from red receivers. (#2354) * Notify TrackInfo available from red receivers. That kicks off the down track scorer. * test --- pkg/sfu/redprimaryreceiver.go | 4 ++++ pkg/sfu/redreceiver.go | 4 ++++ pkg/sfu/redreceiver_test.go | 41 +++++++++++++++++++++++++++++------ 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index e7d33099f..e9e2642f3 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -96,7 +96,10 @@ func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } + track.TrackInfoAvailable() + r.downTrackSpreader.Store(track) + r.logger.Debugw("red primary receiver downtrack added", "subscriberID", track.SubscriberID()) return nil } @@ -106,6 +109,7 @@ func (r *RedPrimaryReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID) } r.downTrackSpreader.Free(subscriberID) + r.logger.Debugw("red primary receiver downtrack deleted", "subscriberID", subscriberID) } func (r *RedPrimaryReceiver) IsClosed() bool { diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 9254be438..fe254a972 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -87,7 +87,10 @@ func (r *RedReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } + track.TrackInfoAvailable() + r.downTrackSpreader.Store(track) + r.logger.Debugw("red receiver downtrack added", "subscriberID", track.SubscriberID()) return nil } @@ -97,6 +100,7 @@ func (r *RedReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID) { } r.downTrackSpreader.Free(subscriberID) + r.logger.Debugw("red receiver downtrack deleted", "subscriberID", subscriberID) } func (r *RedReceiver) CanClose() bool { diff --git a/pkg/sfu/redreceiver_test.go b/pkg/sfu/redreceiver_test.go index 2aae30182..72261f7db 100644 --- a/pkg/sfu/redreceiver_test.go +++ b/pkg/sfu/redreceiver_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/protocol/logger" ) const tsStep = uint32(48000 / 1000 * 10) @@ -38,11 +39,17 @@ func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) error { return nil } +func (dt *dummyDowntrack) TrackInfoAvailable() {} + func TestRedReceiver(t *testing.T) { dt := &dummyDowntrack{TrackSender: &DownTrack{}} t.Run("normal", func(t *testing.T) { - w := &WebRTCReceiver{isRED: true, kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + isRED: true, + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } require.Equal(t, w.GetRedReceiver(), w) w.isRED = false red := w.GetRedReceiver().(*RedReceiver) @@ -64,7 +71,10 @@ func TestRedReceiver(t *testing.T) { }) t.Run("packet lost and jump", func(t *testing.T) { - w := &WebRTCReceiver{kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } red := w.GetRedReceiver().(*RedReceiver) require.NoError(t, red.AddDownTrack(dt)) @@ -112,7 +122,10 @@ func TestRedReceiver(t *testing.T) { }) t.Run("unorder and repeat", func(t *testing.T) { - w := &WebRTCReceiver{kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } red := w.GetRedReceiver().(*RedReceiver) require.NoError(t, red.AddDownTrack(dt)) @@ -141,7 +154,11 @@ func TestRedReceiver(t *testing.T) { }) t.Run("encoding exceed space", func(t *testing.T) { - w := &WebRTCReceiver{isRED: true, kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + isRED: true, + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } require.Equal(t, w.GetRedReceiver(), w) w.isRED = false red := w.GetRedReceiver().(*RedReceiver) @@ -162,7 +179,11 @@ func TestRedReceiver(t *testing.T) { }) t.Run("large timestamp gap", func(t *testing.T) { - w := &WebRTCReceiver{isRED: true, kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + isRED: true, + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } require.Equal(t, w.GetRedReceiver(), w) w.isRED = false red := w.GetRedReceiver().(*RedReceiver) @@ -257,7 +278,10 @@ func generateRedPkts(t *testing.T, pkts []*rtp.Packet, redCount int) []*rtp.Pack func testRedRedPrimaryReceiver(t *testing.T, maxPktCount, redCount int, sendPktIdx, expectPktIdx []int) { dt := &dummyDowntrack{TrackSender: &DownTrack{}} - w := &WebRTCReceiver{kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } require.Equal(t, w.GetPrimaryReceiverForRed(), w) w.isRED = true red := w.GetPrimaryReceiverForRed().(*RedPrimaryReceiver) @@ -283,7 +307,10 @@ func testRedRedPrimaryReceiver(t *testing.T, maxPktCount, redCount int, sendPktI } func TestRedPrimaryReceiver(t *testing.T) { - w := &WebRTCReceiver{kind: webrtc.RTPCodecTypeAudio} + w := &WebRTCReceiver{ + kind: webrtc.RTPCodecTypeAudio, + logger: logger.GetLogger(), + } require.Equal(t, w.GetPrimaryReceiverForRed(), w) w.isRED = true red := w.GetPrimaryReceiverForRed().(*RedPrimaryReceiver)