diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index b4f29cc65..d786a2185 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2750,31 +2750,6 @@ func (p *ParticipantImpl) SupportsTransceiverReuse() bool { return p.ProtocolVersion().SupportsTransceiverReuse() && !p.SupportsSyncStreamID() } -func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) { - s := &sdp.SessionDescription{ - MediaDescriptions: []*sdp.MediaDescription{m}, - } - - for _, payloadStr := range m.MediaName.Formats { - payloadType, err := strconv.ParseUint(payloadStr, 10, 8) - if err != nil { - return nil, err - } - - codec, err := s.GetCodecForPayloadType(uint8(payloadType)) - if err != nil { - if payloadType == 0 { - continue - } - return nil, err - } - - out = append(out, codec) - } - - return out, nil -} - func (p *ParticipantImpl) SendDataPacket(kind livekit.DataPacket_Kind, encoded []byte) error { if p.State() != livekit.ParticipantInfo_ACTIVE { return ErrDataChannelUnavailable @@ -2928,3 +2903,30 @@ func (p *ParticipantImpl) HandleMetrics(senderParticipantID livekit.ParticipantI return p.TransportManager.SendDataPacket(livekit.DataPacket_RELIABLE, dpData) } + +// ---------------------------------------------- + +func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) { + s := &sdp.SessionDescription{ + MediaDescriptions: []*sdp.MediaDescription{m}, + } + + for _, payloadStr := range m.MediaName.Formats { + payloadType, err := strconv.ParseUint(payloadStr, 10, 8) + if err != nil { + return nil, err + } + + codec, err := s.GetCodecForPayloadType(uint8(payloadType)) + if err != nil { + if payloadType == 0 { + continue + } + return nil, err + } + + out = append(out, codec) + } + + return out, nil +} diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index cfde44114..6db1a7d80 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -22,6 +22,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "golang.org/x/exp/slices" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -126,9 +127,7 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo } func (r *WrappedReceiver) Codecs() []webrtc.RTPCodecParameters { - codecs := make([]webrtc.RTPCodecParameters, len(r.codecs)) - copy(codecs, r.codecs) - return codecs + return slices.Clone(r.codecs) } func (r *WrappedReceiver) DeleteDownTrack(participantID livekit.ParticipantID) { diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index d2dba70d8..aab576451 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -49,8 +49,6 @@ type ConnectionStatsSenderProvider interface { type ConnectionStatsParams struct { UpdateInterval time.Duration - MimeType string - IsFECEnabled bool IncludeRTT bool IncludeJitter bool EnableBitrateScore bool @@ -62,6 +60,8 @@ type ConnectionStatsParams struct { type ConnectionStats struct { params ConnectionStatsParams + codecMimeType atomic.String + isStarted atomic.Bool isVideo atomic.Bool @@ -80,7 +80,6 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { return &ConnectionStats{ params: params, scorer: newQualityScorer(qualityScorerParams{ - PacketLossWeight: getPacketLossWeight(params.MimeType, params.IsFECEnabled), // LK-TODO: have to notify codec change? IncludeRTT: params.IncludeRTT, IncludeJitter: params.IncludeJitter, EnableBitrateScore: params.EnableBitrateScore, @@ -89,27 +88,20 @@ func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { } } -func (cs *ConnectionStats) start(trackInfo *livekit.TrackInfo) { - cs.isVideo.Store(trackInfo.Type == livekit.TrackType_VIDEO) +func (cs *ConnectionStats) StartAt(codecMimeType string, isFECEnabled bool, at time.Time) { + if cs.isStarted.Swap(true) { + return + } + + cs.isVideo.Store(strings.HasPrefix(strings.ToLower(codecMimeType), "video/")) + cs.codecMimeType.Store(codecMimeType) + cs.scorer.StartAt(getPacketLossWeight(codecMimeType, isFECEnabled), at) + go cs.updateStatsWorker() } -func (cs *ConnectionStats) StartAt(trackInfo *livekit.TrackInfo, at time.Time) { - if cs.isStarted.Swap(true) { - return - } - - cs.scorer.StartAt(at) - cs.start(trackInfo) -} - -func (cs *ConnectionStats) Start(trackInfo *livekit.TrackInfo) { - if cs.isStarted.Swap(true) { - return - } - - cs.scorer.Start() - cs.start(trackInfo) +func (cs *ConnectionStats) Start(codecMimeType string, isFECEnabled bool) { + cs.StartAt(codecMimeType, isFECEnabled, time.Now()) } func (cs *ConnectionStats) Close() { @@ -348,7 +340,7 @@ func (cs *ConnectionStats) getStat() { cs.onStatsUpdate(cs, &livekit.AnalyticsStat{ Score: score, Streams: analyticsStreams, - Mime: cs.params.MimeType, + Mime: cs.codecMimeType.Load(), }) } } diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 90a4ac015..fa95fbdca 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -60,8 +61,6 @@ func TestConnectionQuality(t *testing.T) { trp := newTestReceiverProvider() t.Run("quality scorer operation", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, EnableBitrateScore: true, @@ -71,7 +70,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // no data and not enough unmute time should return default state which is EXCELLENT quality @@ -477,8 +476,6 @@ func TestConnectionQuality(t *testing.T) { t.Run("quality scorer dependent rtt", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: false, IncludeJitter: true, ReceiverProvider: trp, @@ -487,7 +484,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // RTT does not knock quality down because it is dependent and hence not taken into account @@ -512,8 +509,6 @@ func TestConnectionQuality(t *testing.T) { t.Run("quality scorer dependent jitter", func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "audio/opus", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: false, ReceiverProvider: trp, @@ -522,7 +517,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(webrtc.MimeTypeOpus, false, now.Add(-duration)) cs.UpdateMuteAt(false, now.Add(-1*time.Second)) // Jitter does not knock quality down because it is dependent and hence not taken into account @@ -684,8 +679,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: tc.mimeType, - IsFECEnabled: tc.isFECEnabled, IncludeRTT: true, IncludeJitter: true, ReceiverProvider: trp, @@ -694,7 +687,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration)) + cs.StartAt(tc.mimeType, tc.isFECEnabled, now.Add(-duration)) for _, eq := range tc.expectedQualities { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ @@ -784,8 +777,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "video/vp8", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, EnableBitrateScore: true, @@ -795,7 +786,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(webrtc.MimeTypeVP8, false, now) for _, tr := range tc.transitions { cs.AddBitrateTransitionAt(tr.bitrate, now.Add(tr.offset)) @@ -879,8 +870,6 @@ func TestConnectionQuality(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cs := NewConnectionStats(ConnectionStatsParams{ - MimeType: "video/vp8", - IsFECEnabled: false, IncludeRTT: true, IncludeJitter: true, ReceiverProvider: trp, @@ -889,7 +878,7 @@ func TestConnectionQuality(t *testing.T) { duration := 5 * time.Second now := time.Now() - cs.StartAt(&livekit.TrackInfo{Type: livekit.TrackType_VIDEO}, now) + cs.StartAt(webrtc.MimeTypeVP8, false, now) for _, tr := range tc.transitions { cs.AddLayerTransitionAt(tr.distance, now.Add(tr.offset)) diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index f911e6d53..4e834fc94 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -69,7 +69,7 @@ type windowStat struct { lastRTCPAt time.Time } -func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJitter bool) float64 { +func (w *windowStat) calculatePacketScore(aplw float64, includeRTT bool, includeJitter bool) float64 { // this is based on simplified E-model based on packet loss, rtt, jitter as // outlined at https://www.pingman.com/kb/article/how-is-mos-calculated-in-pingplotter-pro-50.html. effectiveDelay := 0.0 @@ -119,7 +119,7 @@ func (w *windowStat) calculatePacketScore(plw float64, includeRTT bool, includeJ if w.packets+w.packetsPadding > 0 { lossEffect = float64(actualLost) * 100.0 / float64(w.packets+w.packetsPadding) } - lossEffect *= plw + lossEffect *= aplw score := cMaxScore - delayEffect - lossEffect if score < 0.0 { @@ -190,7 +190,6 @@ func (w *windowStat) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------ type qualityScorerParams struct { - PacketLossWeight float64 IncludeRTT bool IncludeJitter bool EnableBitrateScore bool @@ -203,6 +202,8 @@ type qualityScorer struct { lock sync.RWMutex lastUpdateAt time.Time + packetLossWeight float64 + score float64 stat windowStat @@ -238,25 +239,22 @@ func newQualityScorer(params qualityScorerParams) *qualityScorer { } } -func (q *qualityScorer) startAtLocked(at time.Time) { +func (q *qualityScorer) StartAt(packetLossWeight float64, at time.Time) { + q.lock.Lock() + defer q.lock.Unlock() + + q.packetLossWeight = packetLossWeight q.lastUpdateAt = at } -func (q *qualityScorer) StartAt(at time.Time) { +func (q *qualityScorer) Start(packetLossWeight float64) { + q.StartAt(packetLossWeight, time.Now()) +} + +func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.startAtLocked(at) -} - -func (q *qualityScorer) Start() { - q.lock.Lock() - defer q.lock.Unlock() - - q.startAtLocked(time.Now()) -} - -func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { if isMuted { q.mutedAt = at // muting when LOST should not push quality to EXCELLENT @@ -268,39 +266,25 @@ func (q *qualityScorer) updateMuteAtLocked(isMuted bool, at time.Time) { } } -func (q *qualityScorer) UpdateMuteAt(isMuted bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateMuteAtLocked(isMuted, at) -} - func (q *qualityScorer) UpdateMute(isMuted bool) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateMuteAtLocked(isMuted, time.Now()) -} - -func (q *qualityScorer) addBitrateTransitionAtLocked(bitrate int64, at time.Time) { - q.aggregateBitrate.AddSampleAt(bitrate, at) + q.UpdateMuteAt(isMuted, time.Now()) } func (q *qualityScorer) AddBitrateTransitionAt(bitrate int64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addBitrateTransitionAtLocked(bitrate, at) + q.aggregateBitrate.AddSampleAt(bitrate, at) } func (q *qualityScorer) AddBitrateTransition(bitrate int64) { + q.AddBitrateTransitionAt(bitrate, time.Now()) +} + +func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addBitrateTransitionAtLocked(bitrate, time.Now()) -} - -func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { if isMuted { if !q.isLayerMuted() { q.aggregateBitrate.Reset() @@ -316,21 +300,14 @@ func (q *qualityScorer) updateLayerMuteAtLocked(isMuted bool, at time.Time) { } } -func (q *qualityScorer) UpdateLayerMuteAt(isMuted bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateLayerMuteAtLocked(isMuted, at) -} - func (q *qualityScorer) UpdateLayerMute(isMuted bool) { + q.UpdateLayerMuteAt(isMuted, time.Now()) +} + +func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.updateLayerMuteAtLocked(isMuted, time.Now()) -} - -func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { if isPaused { if !q.isPaused() { q.aggregateBitrate.Reset() @@ -346,39 +323,25 @@ func (q *qualityScorer) updatePauseAtLocked(isPaused bool, at time.Time) { } } -func (q *qualityScorer) UpdatePauseAt(isPaused bool, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updatePauseAtLocked(isPaused, at) -} - func (q *qualityScorer) UpdatePause(isPaused bool) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updatePauseAtLocked(isPaused, time.Now()) -} - -func (q *qualityScorer) addLayerTransitionAtLocked(distance float64, at time.Time) { - q.layerDistance.AddSampleAt(distance, at) + q.UpdatePauseAt(isPaused, time.Now()) } func (q *qualityScorer) AddLayerTransitionAt(distance float64, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addLayerTransitionAtLocked(distance, at) + q.layerDistance.AddSampleAt(distance, at) } func (q *qualityScorer) AddLayerTransition(distance float64) { + q.AddLayerTransitionAt(distance, time.Now()) +} + +func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) { q.lock.Lock() defer q.lock.Unlock() - q.addLayerTransitionAtLocked(distance, time.Now()) -} - -func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { // always update transitions expectedBits, _, err := q.aggregateBitrate.GetAggregateAndRestartAt(at) if err != nil { @@ -403,7 +366,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { return } - plw := q.getPacketLossWeight(stat) + aplw := q.getAdjustedPacketLossWeight(stat) reason := "none" var score, packetScore, bitrateScore, layerScore float64 if stat.packets+stat.packetsPadding == 0 { @@ -415,7 +378,7 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { score = qualityTransitionScore[livekit.ConnectionQuality_POOR] } } else { - packetScore = stat.calculatePacketScore(plw, q.params.IncludeRTT, q.params.IncludeJitter) + packetScore = stat.calculatePacketScore(aplw, q.params.IncludeRTT, q.params.IncludeJitter) bitrateScore = stat.calculateBitrateScore(expectedBits, q.params.EnableBitrateScore) layerScore = math.Max(math.Min(cMaxScore, cMaxScore-(expectedDistance*cDistanceWeight)), 0.0) @@ -462,7 +425,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { "bitrateScore", bitrateScore, "quality", currCQ, "stat", stat, - "packetLossWeight", plw, + "packetLossWeight", q.packetLossWeight, + "adjustedPacketLossWeight", aplw, "modePPS", q.ppsMode*int(cPPSQuantization), "expectedBits", expectedBits, "expectedDistance", expectedDistance, @@ -484,18 +448,8 @@ func (q *qualityScorer) updateAtLocked(stat *windowStat, at time.Time) { q.lastUpdateAt = at } -func (q *qualityScorer) UpdateAt(stat *windowStat, at time.Time) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateAtLocked(stat, at) -} - func (q *qualityScorer) Update(stat *windowStat) { - q.lock.Lock() - defer q.lock.Unlock() - - q.updateAtLocked(stat, time.Now()) + q.UpdateAt(stat, time.Now()) } func (q *qualityScorer) isMuted() bool { @@ -535,9 +489,9 @@ func (q *qualityScorer) isPaused() bool { return !q.pausedAt.IsZero() && (q.resumedAt.IsZero() || q.pausedAt.After(q.resumedAt)) } -func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { +func (q *qualityScorer) getAdjustedPacketLossWeight(stat *windowStat) float64 { if stat == nil || stat.duration <= 0 { - return q.params.PacketLossWeight + return q.packetLossWeight } // packet loss is weighted by comparing against mode of packet rate seen. @@ -569,14 +523,14 @@ func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { } if q.ppsMode == 0 || q.ppsMode == len(q.ppsHistogram)-1 { - return q.params.PacketLossWeight + return q.packetLossWeight } packetRatio := pps / (float64(q.ppsMode) * cPPSQuantization) if packetRatio > 1.0 { packetRatio = 1.0 } - return math.Sqrt(packetRatio) * q.params.PacketLossWeight + return math.Sqrt(packetRatio) * q.packetLossWeight } func (q *qualityScorer) GetScoreAndQuality() (float32, livekit.ConnectionQuality) { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index ce2f0a921..2fc1ac990 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -58,7 +58,6 @@ type TrackSender interface { // ID is the globally unique identifier for this Track. ID() string SubscriberID() livekit.ParticipantID - TrackInfoAvailable() HandleRTCPSenderReportData( payloadType webrtc.PayloadType, isSVC bool, @@ -382,8 +381,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId() d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change - IsFECEnabled: strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"), SenderProvider: d, Logger: d.params.Logger.WithValues("direction", "down"), }) @@ -480,11 +477,14 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, return } - if strings.EqualFold(matchedUpstreamCodec.MimeType, "audio/red") { + isFECEnabled := false + if strings.EqualFold(matchedUpstreamCodec.MimeType, MimeTypeAudioRed) { d.isRED = true for _, c := range d.upstreamCodecs { + isFECEnabled = strings.Contains(strings.ToLower(c.SDPFmtpLine), "useinbandfec=1") + // assume upstream primary codec is opus since we only support it for audio now - if strings.EqualFold(c.MimeType, "audio/opus") { + if strings.EqualFold(c.MimeType, webrtc.MimeTypeOpus) { d.upstreamPrimaryPT = uint8(c.PayloadType) break } @@ -498,12 +498,15 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.params.Logger.Errorw("failed to parse primary and secondary payload type for RED", err, "matchedCodec", codec) } d.primaryPT = uint8(primaryPT) + } else if strings.HasPrefix(strings.ToLower(matchedUpstreamCodec.MimeType), "audio/") { + isFECEnabled = strings.Contains(strings.ToLower(matchedUpstreamCodec.SDPFmtpLine), "fec") } logFields := []interface{}{ "codecs", d.upstreamCodecs, "matchCodec", codec, "ssrc", t.SSRC(), + "isFECEnabled", isFECEnabled, } if d.isRED { logFields = append(logFields, @@ -537,6 +540,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.bindLock.Unlock() d.forwarder.DetermineCodec(codec.RTPCodecCapability, d.params.Receiver.HeaderExtensions()) + d.connectionStats.Start(codec.MimeType, isFECEnabled) d.params.Logger.Debugw("downtrack bound") } @@ -593,14 +597,6 @@ func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error { return nil } -func (d *DownTrack) TrackInfoAvailable() { - ti := d.params.Receiver.TrackInfo() - if ti == nil { - return - } - d.connectionStats.Start(ti) -} - func (d *DownTrack) SetStreamAllocatorListener(listener DownTrackStreamAllocatorListener) { d.streamAllocatorLock.Lock() d.streamAllocatorListener = listener @@ -1534,13 +1530,13 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan var getBlankFrame func(bool) ([]byte, error) switch d.mime { - case "audio/opus": + case strings.ToLower(webrtc.MimeTypeOpus): getBlankFrame = d.getOpusBlankFrame - case "audio/red": + case strings.ToLower(MimeTypeAudioRed): getBlankFrame = d.getOpusRedBlankFrame - case "video/vp8": + case strings.ToLower(webrtc.MimeTypeVP8): getBlankFrame = d.getVP8BlankFrame - case "video/h264": + case strings.ToLower(webrtc.MimeTypeH264): getBlankFrame = d.getH264BlankFrame default: close(done) @@ -1548,7 +1544,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } frameRate := uint32(30) - if d.mime == "audio/opus" || d.mime == "audio/red" { + if d.mime == strings.ToLower(webrtc.MimeTypeOpus) || d.mime == strings.ToLower(MimeTypeAudioRed) { frameRate = 50 } @@ -2121,7 +2117,7 @@ func (d *DownTrack) sendPaddingOnMute() { if d.kind == webrtc.RTPCodecTypeVideo { d.sendPaddingOnMuteForVideo() - } else if d.mime == "audio/opus" { + } else if d.mime == strings.ToLower(webrtc.MimeTypeOpus) { d.sendSilentFrameOnMuteForOpus() } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 4c410ca22..c7abc8ed2 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -219,8 +219,6 @@ func NewWebRTCReceiver( }) w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - MimeType: w.codec.MimeType, - IsFECEnabled: strings.EqualFold(w.codec.MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "fec"), ReceiverProvider: w, Logger: w.logger.WithValues("direction", "up"), }) @@ -229,7 +227,11 @@ func NewWebRTCReceiver( w.onStatsUpdate(w, stat) } }) - w.connectionStats.Start(trackInfo) + w.connectionStats.Start( + w.codec.MimeType, + // TODO: technically not correct to declare FEC on when RED. Need the primary codec's fmtp line to check. + strings.EqualFold(w.codec.MimeType, MimeTypeAudioRed) || strings.Contains(strings.ToLower(w.codec.SDPFmtpLine), "useinbandfec=1"), + ) w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) w.streamTrackerManager.SetListener(w) @@ -417,7 +419,6 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { w.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() track.UpTrackMaxPublishedLayerChange(w.streamTrackerManager.GetMaxPublishedLayer()) track.UpTrackMaxTemporalLayerSeenChange(w.streamTrackerManager.GetMaxTemporalLayerSeen()) diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index e04d56f82..c6ded2a0e 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -107,8 +107,6 @@ func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() - r.downTrackSpreader.Store(track) r.logger.Debugw("red primary receiver downtrack added", "subscriberID", track.SubscriberID()) return nil diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 4af4fb5f5..81121fb2e 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -98,8 +98,6 @@ func (r *RedReceiver) AddDownTrack(track TrackSender) error { r.logger.Infow("subscriberID already exists, replacing downtrack", "subscriberID", track.SubscriberID()) } - track.TrackInfoAvailable() - r.downTrackSpreader.Store(track) r.logger.Debugw("red receiver downtrack added", "subscriberID", track.SubscriberID()) return nil