From 8b604df32aa91fbf6ec7cc78d8f7b1c0c79df1a2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 15 Oct 2024 17:39:42 +0530 Subject: [PATCH] Set FEC enabled properly in connection stats module. (#3098) * Set FEC enabled properly in connection stats module. With RED, the FEC indication is in primary codec. Also, clean up some bits that were not necessary (TrackInfoAvailable is not needed) TODO: There are still a couple of things to figure out - If codec is RED, Opus is added as second codec synthetically using https://github.com/livekit/livekit/blob/33098337fc17705bbdb3283c7a7034aa6b2f3745/pkg/rtc/mediaengine.go#L31 which hard codecs FEC enabled. Ideally, we should get the primary codec parameters from SDP offer. - The WebRTCReceiver does not have information about primary codec. For now, just setting FEC to true when RED is enabled. It is okay as it just affects when we declare quality drops, but ideally the primary codec should be retrieved from SDP offer. * clean up and comment * full prop check --- pkg/rtc/participant.go | 52 ++++---- pkg/rtc/wrappedreceiver.go | 5 +- pkg/sfu/connectionquality/connectionstats.go | 36 ++--- .../connectionquality/connectionstats_test.go | 25 +--- pkg/sfu/connectionquality/scorer.go | 124 ++++++------------ pkg/sfu/downtrack.go | 34 +++-- pkg/sfu/receiver.go | 9 +- pkg/sfu/redprimaryreceiver.go | 2 - pkg/sfu/redreceiver.go | 2 - 9 files changed, 109 insertions(+), 180 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index b4f29cc65..d786a2185 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2750,31 +2750,6 @@ func (p *ParticipantImpl) SupportsTransceiverReuse() bool { return p.ProtocolVersion().SupportsTransceiverReuse() && !p.SupportsSyncStreamID() } -func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) { - s := &sdp.SessionDescription{ - MediaDescriptions: []*sdp.MediaDescription{m}, - } - - for _, payloadStr := range m.MediaName.Formats { - payloadType, err := strconv.ParseUint(payloadStr, 10, 8) - if err != nil { - return nil, err - } - - codec, err := s.GetCodecForPayloadType(uint8(payloadType)) - if err != nil { - if payloadType == 0 { - continue - } - return nil, err - } - - out = append(out, codec) - } - - return out, nil -} - func (p *ParticipantImpl) SendDataPacket(kind livekit.DataPacket_Kind, encoded []byte) error { if p.State() != livekit.ParticipantInfo_ACTIVE { return ErrDataChannelUnavailable @@ -2928,3 +2903,30 @@ func (p *ParticipantImpl) HandleMetrics(senderParticipantID livekit.ParticipantI return p.TransportManager.SendDataPacket(livekit.DataPacket_RELIABLE, dpData) } + +// ---------------------------------------------- + +func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) { + s := &sdp.SessionDescription{ + MediaDescriptions: []*sdp.MediaDescription{m}, + } + + for _, payloadStr := range m.MediaName.Formats { + payloadType, err := strconv.ParseUint(payloadStr, 10, 8) + if err != nil { + return nil, err + } + + codec, err := s.GetCodecForPayloadType(uint8(payloadType)) + if err != nil { + if payloadType == 0 { + continue + } + return nil, err + } + + out = append(out, codec) + } + + return out, nil +} diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index cfde44114..6db1a7d80 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -22,6 +22,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "golang.org/x/exp/slices" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -126,9 +127,7 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo } func (r *WrappedReceiver) Codecs() []webrtc.RTPCodecParameters { - codecs := make([]webrtc.RTPCodecParameters, len(r.codecs)) - copy(codecs, r.codecs) - return codecs + return slices.Clone(r.codecs) } func (r *WrappedReceiver) DeleteDownTrack(participantID livekit.ParticipantID) { diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index d2dba70d8..aab576451 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -49,8 +49,6 @@ type ConnectionStatsSenderProvider interface { type ConnectionStatsParams struct { UpdateInterval time.Duration - MimeType string - IsFECEnabled bool IncludeRTT bool IncludeJitter bool EnableBitrateScore bool @@ -62,6 +60,8 @@ type ConnectionStatsParams struct { type ConnectionStats struct { params ConnectionStatsParams + codecMimeType atomic.String + isStarted atomic.Bool isVideo atomic.Bool @@ -80,7 +80,6 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { return &ConnectionStats{ params: params, scorer: newQualityScorer(qualityScorerParams{ - PacketLossWeight: getPacketLossWeight(params.MimeType, params.IsFECEnabled), // LK-TODO: have to notify codec change? IncludeRTT: params.IncludeRTT, IncludeJitter: params.IncludeJitter, EnableBitrateScore: params.EnableBitrateScore, @@ -89,27 +88,20 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { } } -func (cs *ConnectionStats) start(trackInfo *livekit.TrackInfo) { - cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO) +func (cs *ConnectionStats) StartAt(codecMimeType string, isFECEnabled bool, at time.Time) { + if cs.isStarted.Swap(true) { + return + } + + cs.isVideo.Store(strings.HasPrefix(strings.ToLower(codecMimeType), "video/")) + cs.codecMimeType.Store(codecMimeType) + cs.scorer.StartAt(getPacketLossWeight(codecMimeType, isFECEnabled), at) + go cs.updateStatsWorker() } -func (cs *ConnectionStats) StartAt(trackInfo *livekit.TrackInfo, at time.Time) { - if cs.isStarted.Swap(true) { - return - } - - cs.scorer.StartAt(at) - cs.start(trackInfo) -} - -func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo) { - if cs.isStarted.Swap(true) { - return - } - - cs.scorer.Start() - cs.start(trackInfo) +func (cs *ConnectionStats) Start(codecMimeType string, isFECEnabled bool) { + cs.StartAt(codecMimeType, isFECEnabled, time.Now()) } func (cs *ConnectionStats) Close() { @@ -348,7 +340,7 @@ func (cs *ConnectionStats) getStat() { cs.onStatsUpdate(cs, &livekit.AnalyticsStat{ Score: score, Streams: analyticsStreams, - Mime: cs.params.MimeType, + Mime: cs.codecMimeType.Load(), }) } } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 90a4ac015..fa95fbdca 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -60,8 +61,6 @@ func TestConnectionQuality(t *testing.T) { trp := newTestReceiverProvider() t.Run("quality scorer operation", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, EnableBitrateScore: true, @@ -71,7 +70,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // no data and not enough unmute time should return default state which is EXCELLENT quality @@ -477,8 +476,6 @@ func TestConnectionQuality(t *testing.T) { t.Run("quality scorer dependent rtt", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: false, IncludeJitter: true, ReceiverProvider: trp, @@ -487,7 +484,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // RTT does not knock quality down because it is dependent and hence not taken into account @@ -512,8 +509,6 @@ func TestConnectionQuality(t *testing.T) { t.Run("quality scorer dependent jitter", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: false, ReceiverProvider: trp, @@ -522,7 +517,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // Jitter does not knock quality down because it is dependent and hence not taken into account @@ -684,8 +679,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: tc.mimeType, - IsFECEnabled: tc.isFECEnabled, IncludeRTT: true, IncludeJitter: true, ReceiverProvider: trp, @@ -694,7 +687,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(tc.mimeType, tc.isFECEnabled, now.Add(-duration)) for _, eq := range tc.expectedQualities { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ @@ -784,8 +777,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "video/vp8", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, EnableBitrateScore: true, @@ -795,7 +786,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(webrtc.MimeTypeVP8, false, now) for _, tr := range tc.transitions { cs.AddBitrateTransitionAt(tr.bitrate, now.Add(tr.offset)) @@ -879,8 +870,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "video/vp8", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, ReceiverProvider: trp, @@ -889,7 +878,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(webrtc.MimeTypeVP8, false, now) for _, tr := range tc.transitions { cs.AddLayerTransitionAt(tr.distance, now.Add(tr.offset)) diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index f911e6d53..4e834fc94 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -69,7 +69,7 @@ type windowStat struct { lastRTCPAt time.Time } -func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJitter bool) float64 { +func (w *windowStat) calculatePacketScore(aplw float64, includeRTT bool, includeJitter bool) float64 { // this is based on simplified E-model based on packet loss, rtt, jitter as // outlined at https://www.pingman.com/kb/article/how-is-mos-calculated-in-pingplotter-pro-50.html. effectiveDelay := 0.0 @@ -119,7 +119,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ if w.packets+w.packetsPadding > 0 { lossEffect = float64(actualLost) * 100.0 / float64(w.packets+w.packetsPadding) } - lossEffect *= plw + lossEffect *= aplw score := cMaxScore - delayEffect - lossEffect if score < 0.0 { @@ -190,7 +190,6 @@ func (w *windowStat) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------ type qualityScorerParams struct { - PacketLossWeight float64 IncludeRTT bool IncludeJitter bool EnableBitrateScore bool @@ -203,6 +202,8 @@ type qualityScorer struct { lock sync.RWMutex lastUpdateAt time.Time + packetLossWeight float64 + score float64 stat windowStat @@ -238,25 +239,22 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer { } } -func (q *qualityScorer) startAtLocked(at time.Time) { +func (q *qualityScorer) StartAt(packetLossWeight float64, at time.Time) { + q.lock.Lock() + defer q.lock.Unlock() + + q.packetLossWeight = packetLossWeight q.lastUpdateAt = at } -func (q *qualityScorer) StartAt(at time.Time) { +func (q *qualityScorer) Start(packetLossWeight float64) { + q.StartAt(packetLossWeight, time.Now()) +} + +func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.startAtLocked(at) -} - -func (q *qualityScorer) Start() { - q.lock.Lock() - defer q.lock.Unlock() - - q.startAtLocked(time.Now()) -} - -func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at // muting when LOST should not push quality to EXCELLENT @@ -268,39 +266,25 @@ func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { } } -func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateMuteAtLocked(isMuted, at) -} - func (q *qualityScorer) UpdateMute(isMuted bool) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateMuteAtLocked(isMuted, time.Now()) -} - -func (q *qualityScorer) addBitrateTransitionAtLocked(bitrate int64, at time.Time) { - q.aggregateBitrate.AddSampleAt(bitrate, at) + q.UpdateMuteAt(isMuted, time.Now()) } func (q *qualityScorer) AddBitrateTransitionAt(bitrate int64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addBitrateTransitionAtLocked(bitrate, at) + q.aggregateBitrate.AddSampleAt(bitrate, at) } func (q *qualityScorer) AddBitrateTransition(bitrate int64) { + q.AddBitrateTransitionAt(bitrate, time.Now()) +} + +func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addBitrateTransitionAtLocked(bitrate, time.Now()) -} - -func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { if isMuted { if !q.isLayerMuted() { q.aggregateBitrate.Reset() @@ -316,21 +300,14 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { } } -func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateLayerMuteAtLocked(isMuted, at) -} - func (q *qualityScorer) UpdateLayerMute(isMuted bool) { + q.UpdateLayerMuteAt(isMuted, time.Now()) +} + +func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.updateLayerMuteAtLocked(isMuted, time.Now()) -} - -func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { if isPaused { if !q.isPaused() { q.aggregateBitrate.Reset() @@ -346,39 +323,25 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { } } -func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updatePauseAtLocked(isPaused, at) -} - func (q *qualityScorer) UpdatePause(isPaused bool) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updatePauseAtLocked(isPaused, time.Now()) -} - -func (q *qualityScorer) addLayerTransitionAtLocked(distance float64, at time.Time) { - q.layerDistance.AddSampleAt(distance, at) + q.UpdatePauseAt(isPaused, time.Now()) } func (q *qualityScorer) AddLayerTransitionAt(distance float64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addLayerTransitionAtLocked(distance, at) + q.layerDistance.AddSampleAt(distance, at) } func (q *qualityScorer) AddLayerTransition(distance float64) { + q.AddLayerTransitionAt(distance, time.Now()) +} + +func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addLayerTransitionAtLocked(distance, time.Now()) -} - -func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // always update transitions expectedBits, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at) if err != nil { @@ -403,7 +366,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { return } - plw := q.getPacketLossWeight(stat) + aplw := q.getAdjustedPacketLossWeight(stat) reason := "none" var score, packetScore, bitrateScore, layerScore float64 if stat.packets+stat.packetsPadding == 0 { @@ -415,7 +378,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { score = qualityTransitionScore[livekit.ConnectionQuality_POOR] } } else { - packetScore = stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) + packetScore = stat.calculatePacketScore(aplw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore = stat.calculateBitrateScore(expectedBits, q.params.EnableBitrateScore) layerScore = math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*cDistanceWeight)), 0.0) @@ -462,7 +425,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { "bitrateScore", bitrateScore, "quality", currCQ, "stat", stat, - "packetLossWeight", plw, + "packetLossWeight", q.packetLossWeight, + "adjustedPacketLossWeight", aplw, "modePPS", q.ppsMode*int(cPPSQuantization), "expectedBits", expectedBits, "expectedDistance", expectedDistance, @@ -484,18 +448,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { q.lastUpdateAt = at } -func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateAtLocked(stat, at) -} - func (q *qualityScorer) Update(stat *windowStat) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateAtLocked(stat, time.Now()) + q.UpdateAt(stat, time.Now()) } func (q *qualityScorer) isMuted() bool { @@ -535,9 +489,9 @@ func (q *qualityScorer) isPaused() bool { return !q.pausedAt.IsZero() && (q.resumedAt.IsZero() || q.pausedAt.After(q.resumedAt)) } -func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { +func (q *qualityScorer) getAdjustedPacketLossWeight(stat *windowStat) float64 { if stat == nil || stat.duration <= 0 { - return q.params.PacketLossWeight + return q.packetLossWeight } // packet loss is weighted by comparing against mode of packet rate seen. @@ -569,14 +523,14 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { } if q.ppsMode == 0 || q.ppsMode == len(q.ppsHistogram)-1 { - return q.params.PacketLossWeight + return q.packetLossWeight } packetRatio := pps / (float64(q.ppsMode) * cPPSQuantization) if packetRatio > 1.0 { packetRatio = 1.0 } - return math.Sqrt(packetRatio) * q.params.PacketLossWeight + return math.Sqrt(packetRatio) * q.packetLossWeight } func (q *qualityScorer) GetScoreAndQuality() (float32, livekit.ConnectionQuality) { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index ce2f0a921..2fc1ac990 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -58,7 +58,6 @@ type TrackSender interface { // ID is the globally unique identifier for this Track. ID() string SubscriberID() livekit.ParticipantID - TrackInfoAvailable() HandleRTCPSenderReportData( payloadType webrtc.PayloadType, isSVC bool, @@ -382,8 +381,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId() d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change - IsFECEnabled: strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"), SenderProvider: d, Logger: d.params.Logger.WithValues("direction", "down"), }) @@ -480,11 +477,14 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, return } - if strings.EqualFold(matchedUpstreamCodec.MimeType, "audio/red") { + isFECEnabled := false + if strings.EqualFold(matchedUpstreamCodec.MimeType, MimeTypeAudioRed) { d.isRED = true for _, c := range d.upstreamCodecs { + isFECEnabled = strings.Contains(strings.ToLower(c.SDPFmtpLine), "useinbandfec=1") + // assume upstream primary codec is opus since we only support it for audio now - if strings.EqualFold(c.MimeType, "audio/opus") { + if strings.EqualFold(c.MimeType, webrtc.MimeTypeOpus) { d.upstreamPrimaryPT = uint8(c.PayloadType) break } @@ -498,12 +498,15 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.params.Logger.Errorw("failed to parse primary and secondary payload type for RED", err, "matchedCodec", codec) } d.primaryPT = uint8(primaryPT) + } else if strings.HasPrefix(strings.ToLower(matchedUpstreamCodec.MimeType), "audio/") { + isFECEnabled = strings.Contains(strings.ToLower(matchedUpstreamCodec.SDPFmtpLine), "fec") } logFields := []interface{}{ "codecs", d.upstreamCodecs, "matchCodec", codec, "ssrc", t.SSRC(), + "isFECEnabled", isFECEnabled, } if d.isRED { logFields = append(logFields, @@ -537,6 +540,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.bindLock.Unlock() d.forwarder.DetermineCodec(codec.RTPCodecCapability, d.params.Receiver.HeaderExtensions()) + d.connectionStats.Start(codec.MimeType, isFECEnabled) d.params.Logger.Debugw("downtrack bound") } @@ -593,14 +597,6 @@ func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error { return nil } -func (d *DownTrack) TrackInfoAvailable() { - ti := d.params.Receiver.TrackInfo() - if ti == nil { - return - } - d.connectionStats.Start(ti) -} - func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) { d.streamAllocatorLock.Lock() d.streamAllocatorListener = listener @@ -1534,13 +1530,13 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan var getBlankFrame func(bool) ([]byte, error) switch d.mime { - case "audio/opus": + case strings.ToLower(webrtc.MimeTypeOpus): getBlankFrame = d.getOpusBlankFrame - case "audio/red": + case strings.ToLower(MimeTypeAudioRed): getBlankFrame = d.getOpusRedBlankFrame - case "video/vp8": + case strings.ToLower(webrtc.MimeTypeVP8): getBlankFrame = d.getVP8BlankFrame - case "video/h264": + case strings.ToLower(webrtc.MimeTypeH264): getBlankFrame = d.getH264BlankFrame default: close(done) @@ -1548,7 +1544,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } frameRate := uint32(30) - if d.mime == "audio/opus" || d.mime == "audio/red" { + if d.mime == strings.ToLower(webrtc.MimeTypeOpus) || d.mime == strings.ToLower(MimeTypeAudioRed) { frameRate = 50 } @@ -2121,7 +2117,7 @@ func (d *DownTrack) sendPaddingOnMute() { if d.kind == webrtc.RTPCodecTypeVideo { d.sendPaddingOnMuteForVideo() - } else if d.mime == "audio/opus" { + } else if d.mime == strings.ToLower(webrtc.MimeTypeOpus) { d.sendSilentFrameOnMuteForOpus() } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 4c410ca22..c7abc8ed2 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -219,8 +219,6 @@ func NewWebRTCReceiver( }) w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - MimeType: w.codec.MimeType, - IsFECEnabled: strings.EqualFold(w.codec.MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "fec"), ReceiverProvider: w, Logger: w.logger.WithValues("direction", "up"), }) @@ -229,7 +227,11 @@ func NewWebRTCReceiver( w.onStatsUpdate(w, stat) } }) - w.connectionStats.Start(trackInfo) + w.connectionStats.Start( + w.codec.MimeType, + // TODO: technically not correct to declare FEC on when RED. Need the primary codec's fmtp line to check. + strings.EqualFold(w.codec.MimeType, MimeTypeAudioRed) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"), + ) w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) w.streamTrackerManager.SetListener(w) @@ -417,7 +419,6 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer()) track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen()) diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index e04d56f82..c6ded2a0e 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -107,8 +107,6 @@ func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() - r.downTrackSpreader.Store(track) r.logger.Debugw("red primary receiver downtrack added", "subscriberID", track.SubscriberID()) return nil diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 4af4fb5f5..81121fb2e 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -98,8 +98,6 @@ func (r *RedReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() - r.downTrackSpreader.Store(track) r.logger.Debugw("red receiver downtrack added", "subscriberID", track.SubscriberID()) return nil