Set FEC enabled properly in connection stats module. (#3098)

* Set FEC enabled properly in connection stats module.

With RED, the FEC indication is in primary codec.

Also, clean up some bits that were not necessary (TrackInfoAvailable is
not needed)

TODO: There are still a couple of things to figure out
- If codec is RED, Opus is added as second codec synthetically using
  33098337fc/pkg/rtc/mediaengine.go (L31)
  which hard codecs FEC enabled. Ideally, we should get the primary
  codec parameters from SDP offer.
- The WebRTCReceiver does not have information about primary codec. For
  now, just setting FEC to true when RED is enabled. It is okay as it
  just affects when we declare quality drops, but ideally the primary
  codec should be retrieved from SDP offer.

* clean up and comment

* full prop check
This commit is contained in:
Raja Subramanian
2024-10-15 17:39:42 +05:30
committed by GitHub
parent 33098337fc
commit 8b604df32a
9 changed files with 109 additions and 180 deletions

View File

@@ -2750,31 +2750,6 @@ func (p *ParticipantImpl) SupportsTransceiverReuse() bool {
return p.ProtocolVersion().SupportsTransceiverReuse() && !p.SupportsSyncStreamID()
}
func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) {
s := &sdp.SessionDescription{
MediaDescriptions: []*sdp.MediaDescription{m},
}
for _, payloadStr := range m.MediaName.Formats {
payloadType, err := strconv.ParseUint(payloadStr, 10, 8)
if err != nil {
return nil, err
}
codec, err := s.GetCodecForPayloadType(uint8(payloadType))
if err != nil {
if payloadType == 0 {
continue
}
return nil, err
}
out = append(out, codec)
}
return out, nil
}
func (p *ParticipantImpl) SendDataPacket(kind livekit.DataPacket_Kind, encoded []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
@@ -2928,3 +2903,30 @@ func (p *ParticipantImpl) HandleMetrics(senderParticipantID livekit.ParticipantI
return p.TransportManager.SendDataPacket(livekit.DataPacket_RELIABLE, dpData)
}
// ----------------------------------------------
func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) {
s := &sdp.SessionDescription{
MediaDescriptions: []*sdp.MediaDescription{m},
}
for _, payloadStr := range m.MediaName.Formats {
payloadType, err := strconv.ParseUint(payloadStr, 10, 8)
if err != nil {
return nil, err
}
codec, err := s.GetCodecForPayloadType(uint8(payloadType))
if err != nil {
if payloadType == 0 {
continue
}
return nil, err
}
out = append(out, codec)
}
return out, nil
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -126,9 +127,7 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo
}
func (r *WrappedReceiver) Codecs() []webrtc.RTPCodecParameters {
codecs := make([]webrtc.RTPCodecParameters, len(r.codecs))
copy(codecs, r.codecs)
return codecs
return slices.Clone(r.codecs)
}
func (r *WrappedReceiver) DeleteDownTrack(participantID livekit.ParticipantID) {

View File

@@ -49,8 +49,6 @@ type ConnectionStatsSenderProvider interface {
type ConnectionStatsParams struct {
UpdateInterval time.Duration
MimeType string
IsFECEnabled bool
IncludeRTT bool
IncludeJitter bool
EnableBitrateScore bool
@@ -62,6 +60,8 @@ type ConnectionStatsParams struct {
type ConnectionStats struct {
params ConnectionStatsParams
codecMimeType atomic.String
isStarted atomic.Bool
isVideo atomic.Bool
@@ -80,7 +80,6 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats {
return &ConnectionStats{
params: params,
scorer: newQualityScorer(qualityScorerParams{
PacketLossWeight: getPacketLossWeight(params.MimeType, params.IsFECEnabled), // LK-TODO: have to notify codec change?
IncludeRTT: params.IncludeRTT,
IncludeJitter: params.IncludeJitter,
EnableBitrateScore: params.EnableBitrateScore,
@@ -89,27 +88,20 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats {
}
}
func (cs *ConnectionStats) start(trackInfo *livekit.TrackInfo) {
cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO)
func (cs *ConnectionStats) StartAt(codecMimeType string, isFECEnabled bool, at time.Time) {
if cs.isStarted.Swap(true) {
return
}
cs.isVideo.Store(strings.HasPrefix(strings.ToLower(codecMimeType), "video/"))
cs.codecMimeType.Store(codecMimeType)
cs.scorer.StartAt(getPacketLossWeight(codecMimeType, isFECEnabled), at)
go cs.updateStatsWorker()
}
func (cs *ConnectionStats) StartAt(trackInfo *livekit.TrackInfo, at time.Time) {
if cs.isStarted.Swap(true) {
return
}
cs.scorer.StartAt(at)
cs.start(trackInfo)
}
func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo) {
if cs.isStarted.Swap(true) {
return
}
cs.scorer.Start()
cs.start(trackInfo)
func (cs *ConnectionStats) Start(codecMimeType string, isFECEnabled bool) {
cs.StartAt(codecMimeType, isFECEnabled, time.Now())
}
func (cs *ConnectionStats) Close() {
@@ -348,7 +340,7 @@ func (cs *ConnectionStats) getStat() {
cs.onStatsUpdate(cs, &livekit.AnalyticsStat{
Score: score,
Streams: analyticsStreams,
Mime: cs.params.MimeType,
Mime: cs.codecMimeType.Load(),
})
}
}

View File

@@ -19,6 +19,7 @@ import (
"testing"
"time"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -60,8 +61,6 @@ func TestConnectionQuality(t *testing.T) {
trp := newTestReceiverProvider()
t.Run("quality scorer operation", func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: "audio/opus",
IsFECEnabled: false,
IncludeRTT: true,
IncludeJitter: true,
EnableBitrateScore: true,
@@ -71,7 +70,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// no data and not enough unmute time should return default state which is EXCELLENT quality
@@ -477,8 +476,6 @@ func TestConnectionQuality(t *testing.T) {
t.Run("quality scorer dependent rtt", func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: "audio/opus",
IsFECEnabled: false,
IncludeRTT: false,
IncludeJitter: true,
ReceiverProvider: trp,
@@ -487,7 +484,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// RTT does not knock quality down because it is dependent and hence not taken into account
@@ -512,8 +509,6 @@ func TestConnectionQuality(t *testing.T) {
t.Run("quality scorer dependent jitter", func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: "audio/opus",
IsFECEnabled: false,
IncludeRTT: true,
IncludeJitter: false,
ReceiverProvider: trp,
@@ -522,7 +517,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration))
cs.UpdateMuteAt(false, now.Add(-1*time.Second))
// Jitter does not knock quality down because it is dependent and hence not taken into account
@@ -684,8 +679,6 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: tc.mimeType,
IsFECEnabled: tc.isFECEnabled,
IncludeRTT: true,
IncludeJitter: true,
ReceiverProvider: trp,
@@ -694,7 +687,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
cs.StartAt(tc.mimeType, tc.isFECEnabled, now.Add(-duration))
for _, eq := range tc.expectedQualities {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
@@ -784,8 +777,6 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: "video/vp8",
IsFECEnabled: false,
IncludeRTT: true,
IncludeJitter: true,
EnableBitrateScore: true,
@@ -795,7 +786,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
cs.StartAt(webrtc.MimeTypeVP8, false, now)
for _, tr := range tc.transitions {
cs.AddBitrateTransitionAt(tr.bitrate, now.Add(tr.offset))
@@ -879,8 +870,6 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := NewConnectionStats(ConnectionStatsParams{
MimeType: "video/vp8",
IsFECEnabled: false,
IncludeRTT: true,
IncludeJitter: true,
ReceiverProvider: trp,
@@ -889,7 +878,7 @@ func TestConnectionQuality(t *testing.T) {
duration := 5 * time.Second
now := time.Now()
cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now)
cs.StartAt(webrtc.MimeTypeVP8, false, now)
for _, tr := range tc.transitions {
cs.AddLayerTransitionAt(tr.distance, now.Add(tr.offset))

View File

@@ -69,7 +69,7 @@ type windowStat struct {
lastRTCPAt time.Time
}
func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJitter bool) float64 {
func (w *windowStat) calculatePacketScore(aplw float64, includeRTT bool, includeJitter bool) float64 {
// this is based on simplified E-model based on packet loss, rtt, jitter as
// outlined at https://www.pingman.com/kb/article/how-is-mos-calculated-in-pingplotter-pro-50.html.
effectiveDelay := 0.0
@@ -119,7 +119,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ
if w.packets+w.packetsPadding > 0 {
lossEffect = float64(actualLost) * 100.0 / float64(w.packets+w.packetsPadding)
}
lossEffect *= plw
lossEffect *= aplw
score := cMaxScore - delayEffect - lossEffect
if score < 0.0 {
@@ -190,7 +190,6 @@ func (w *windowStat) MarshalLogObject(e zapcore.ObjectEncoder) error {
// ------------------------------------------
type qualityScorerParams struct {
PacketLossWeight float64
IncludeRTT bool
IncludeJitter bool
EnableBitrateScore bool
@@ -203,6 +202,8 @@ type qualityScorer struct {
lock sync.RWMutex
lastUpdateAt time.Time
packetLossWeight float64
score float64
stat windowStat
@@ -238,25 +239,22 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer {
}
}
func (q *qualityScorer) startAtLocked(at time.Time) {
func (q *qualityScorer) StartAt(packetLossWeight float64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.packetLossWeight = packetLossWeight
q.lastUpdateAt = at
}
func (q *qualityScorer) StartAt(at time.Time) {
func (q *qualityScorer) Start(packetLossWeight float64) {
q.StartAt(packetLossWeight, time.Now())
}
func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.startAtLocked(at)
}
func (q *qualityScorer) Start() {
q.lock.Lock()
defer q.lock.Unlock()
q.startAtLocked(time.Now())
}
func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) {
if isMuted {
q.mutedAt = at
// muting when LOST should not push quality to EXCELLENT
@@ -268,39 +266,25 @@ func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) {
}
}
func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateMuteAtLocked(isMuted, at)
}
func (q *qualityScorer) UpdateMute(isMuted bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateMuteAtLocked(isMuted, time.Now())
}
func (q *qualityScorer) addBitrateTransitionAtLocked(bitrate int64, at time.Time) {
q.aggregateBitrate.AddSampleAt(bitrate, at)
q.UpdateMuteAt(isMuted, time.Now())
}
func (q *qualityScorer) AddBitrateTransitionAt(bitrate int64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addBitrateTransitionAtLocked(bitrate, at)
q.aggregateBitrate.AddSampleAt(bitrate, at)
}
func (q *qualityScorer) AddBitrateTransition(bitrate int64) {
q.AddBitrateTransitionAt(bitrate, time.Now())
}
func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addBitrateTransitionAtLocked(bitrate, time.Now())
}
func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) {
if isMuted {
if !q.isLayerMuted() {
q.aggregateBitrate.Reset()
@@ -316,21 +300,14 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) {
}
}
func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateLayerMuteAtLocked(isMuted, at)
}
func (q *qualityScorer) UpdateLayerMute(isMuted bool) {
q.UpdateLayerMuteAt(isMuted, time.Now())
}
func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateLayerMuteAtLocked(isMuted, time.Now())
}
func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) {
if isPaused {
if !q.isPaused() {
q.aggregateBitrate.Reset()
@@ -346,39 +323,25 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) {
}
}
func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updatePauseAtLocked(isPaused, at)
}
func (q *qualityScorer) UpdatePause(isPaused bool) {
q.lock.Lock()
defer q.lock.Unlock()
q.updatePauseAtLocked(isPaused, time.Now())
}
func (q *qualityScorer) addLayerTransitionAtLocked(distance float64, at time.Time) {
q.layerDistance.AddSampleAt(distance, at)
q.UpdatePauseAt(isPaused, time.Now())
}
func (q *qualityScorer) AddLayerTransitionAt(distance float64, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addLayerTransitionAtLocked(distance, at)
q.layerDistance.AddSampleAt(distance, at)
}
func (q *qualityScorer) AddLayerTransition(distance float64) {
q.AddLayerTransitionAt(distance, time.Now())
}
func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.addLayerTransitionAtLocked(distance, time.Now())
}
func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
// always update transitions
expectedBits, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at)
if err != nil {
@@ -403,7 +366,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
return
}
plw := q.getPacketLossWeight(stat)
aplw := q.getAdjustedPacketLossWeight(stat)
reason := "none"
var score, packetScore, bitrateScore, layerScore float64
if stat.packets+stat.packetsPadding == 0 {
@@ -415,7 +378,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
score = qualityTransitionScore[livekit.ConnectionQuality_POOR]
}
} else {
packetScore = stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter)
packetScore = stat.calculatePacketScore(aplw, q.params.IncludeRTT, q.params.IncludeJitter)
bitrateScore = stat.calculateBitrateScore(expectedBits, q.params.EnableBitrateScore)
layerScore = math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*cDistanceWeight)), 0.0)
@@ -462,7 +425,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
"bitrateScore", bitrateScore,
"quality", currCQ,
"stat", stat,
"packetLossWeight", plw,
"packetLossWeight", q.packetLossWeight,
"adjustedPacketLossWeight", aplw,
"modePPS", q.ppsMode*int(cPPSQuantization),
"expectedBits", expectedBits,
"expectedDistance", expectedDistance,
@@ -484,18 +448,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) {
q.lastUpdateAt = at
}
func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateAtLocked(stat, at)
}
func (q *qualityScorer) Update(stat *windowStat) {
q.lock.Lock()
defer q.lock.Unlock()
q.updateAtLocked(stat, time.Now())
q.UpdateAt(stat, time.Now())
}
func (q *qualityScorer) isMuted() bool {
@@ -535,9 +489,9 @@ func (q *qualityScorer) isPaused() bool {
return !q.pausedAt.IsZero() && (q.resumedAt.IsZero() || q.pausedAt.After(q.resumedAt))
}
func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 {
func (q *qualityScorer) getAdjustedPacketLossWeight(stat *windowStat) float64 {
if stat == nil || stat.duration <= 0 {
return q.params.PacketLossWeight
return q.packetLossWeight
}
// packet loss is weighted by comparing against mode of packet rate seen.
@@ -569,14 +523,14 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 {
}
if q.ppsMode == 0 || q.ppsMode == len(q.ppsHistogram)-1 {
return q.params.PacketLossWeight
return q.packetLossWeight
}
packetRatio := pps / (float64(q.ppsMode) * cPPSQuantization)
if packetRatio > 1.0 {
packetRatio = 1.0
}
return math.Sqrt(packetRatio) * q.params.PacketLossWeight
return math.Sqrt(packetRatio) * q.packetLossWeight
}
func (q *qualityScorer) GetScoreAndQuality() (float32, livekit.ConnectionQuality) {

View File

@@ -58,7 +58,6 @@ type TrackSender interface {
// ID is the globally unique identifier for this Track.
ID() string
SubscriberID() livekit.ParticipantID
TrackInfoAvailable()
HandleRTCPSenderReportData(
payloadType webrtc.PayloadType,
isSVC bool,
@@ -382,8 +381,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId()
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change
IsFECEnabled: strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"),
SenderProvider: d,
Logger: d.params.Logger.WithValues("direction", "down"),
})
@@ -480,11 +477,14 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
return
}
if strings.EqualFold(matchedUpstreamCodec.MimeType, "audio/red") {
isFECEnabled := false
if strings.EqualFold(matchedUpstreamCodec.MimeType, MimeTypeAudioRed) {
d.isRED = true
for _, c := range d.upstreamCodecs {
isFECEnabled = strings.Contains(strings.ToLower(c.SDPFmtpLine), "useinbandfec=1")
// assume upstream primary codec is opus since we only support it for audio now
if strings.EqualFold(c.MimeType, "audio/opus") {
if strings.EqualFold(c.MimeType, webrtc.MimeTypeOpus) {
d.upstreamPrimaryPT = uint8(c.PayloadType)
break
}
@@ -498,12 +498,15 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.params.Logger.Errorw("failed to parse primary and secondary payload type for RED", err, "matchedCodec", codec)
}
d.primaryPT = uint8(primaryPT)
} else if strings.HasPrefix(strings.ToLower(matchedUpstreamCodec.MimeType), "audio/") {
isFECEnabled = strings.Contains(strings.ToLower(matchedUpstreamCodec.SDPFmtpLine), "fec")
}
logFields := []interface{}{
"codecs", d.upstreamCodecs,
"matchCodec", codec,
"ssrc", t.SSRC(),
"isFECEnabled", isFECEnabled,
}
if d.isRED {
logFields = append(logFields,
@@ -537,6 +540,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.bindLock.Unlock()
d.forwarder.DetermineCodec(codec.RTPCodecCapability, d.params.Receiver.HeaderExtensions())
d.connectionStats.Start(codec.MimeType, isFECEnabled)
d.params.Logger.Debugw("downtrack bound")
}
@@ -593,14 +597,6 @@ func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error {
return nil
}
func (d *DownTrack) TrackInfoAvailable() {
ti := d.params.Receiver.TrackInfo()
if ti == nil {
return
}
d.connectionStats.Start(ti)
}
func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) {
d.streamAllocatorLock.Lock()
d.streamAllocatorListener = listener
@@ -1534,13 +1530,13 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
var getBlankFrame func(bool) ([]byte, error)
switch d.mime {
case "audio/opus":
case strings.ToLower(webrtc.MimeTypeOpus):
getBlankFrame = d.getOpusBlankFrame
case "audio/red":
case strings.ToLower(MimeTypeAudioRed):
getBlankFrame = d.getOpusRedBlankFrame
case "video/vp8":
case strings.ToLower(webrtc.MimeTypeVP8):
getBlankFrame = d.getVP8BlankFrame
case "video/h264":
case strings.ToLower(webrtc.MimeTypeH264):
getBlankFrame = d.getH264BlankFrame
default:
close(done)
@@ -1548,7 +1544,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
}
frameRate := uint32(30)
if d.mime == "audio/opus" || d.mime == "audio/red" {
if d.mime == strings.ToLower(webrtc.MimeTypeOpus) || d.mime == strings.ToLower(MimeTypeAudioRed) {
frameRate = 50
}
@@ -2121,7 +2117,7 @@ func (d *DownTrack) sendPaddingOnMute() {
if d.kind == webrtc.RTPCodecTypeVideo {
d.sendPaddingOnMuteForVideo()
} else if d.mime == "audio/opus" {
} else if d.mime == strings.ToLower(webrtc.MimeTypeOpus) {
d.sendSilentFrameOnMuteForOpus()
}
}

View File

@@ -219,8 +219,6 @@ func NewWebRTCReceiver(
})
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
MimeType: w.codec.MimeType,
IsFECEnabled: strings.EqualFold(w.codec.MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "fec"),
ReceiverProvider: w,
Logger: w.logger.WithValues("direction", "up"),
})
@@ -229,7 +227,11 @@ func NewWebRTCReceiver(
w.onStatsUpdate(w, stat)
}
})
w.connectionStats.Start(trackInfo)
w.connectionStats.Start(
w.codec.MimeType,
// TODO: technically not correct to declare FEC on when RED. Need the primary codec's fmtp line to check.
strings.EqualFold(w.codec.MimeType, MimeTypeAudioRed) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"),
)
w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig)
w.streamTrackerManager.SetListener(w)
@@ -417,7 +419,6 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
track.TrackInfoAvailable()
track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer())
track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen())

View File

@@ -107,8 +107,6 @@ func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error {
r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
track.TrackInfoAvailable()
r.downTrackSpreader.Store(track)
r.logger.Debugw("red primary receiver downtrack added", "subscriberID", track.SubscriberID())
return nil

View File

@@ -98,8 +98,6 @@ func (r *RedReceiver) AddDownTrack(track TrackSender) error {
r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID())
}
track.TrackInfoAvailable()
r.downTrackSpreader.Store(track)
r.logger.Debugw("red receiver downtrack added", "subscriberID", track.SubscriberID())
return nil