From 6489237e3399e93016eb487cae92bcd5255f2423 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 14 Sep 2025 09:41:40 +0530 Subject: [PATCH] Simulcast audio fixes (#3925) * Simulcast audio fixes * clean up --- pkg/rtc/mediaengine.go | 6 ++--- pkg/rtc/mediatrackreceiver.go | 8 ++++-- pkg/rtc/mediatracksubscriptions.go | 2 +- pkg/rtc/participant.go | 2 +- pkg/rtc/participant_internal_test.go | 38 ++++++++++++++++++++-------- pkg/rtc/participant_sdp.go | 5 ++-- pkg/rtc/subscribedtrack.go | 10 +++++--- pkg/rtc/utils.go | 8 ++++++ pkg/rtc/wrappedreceiver.go | 36 ++++++++------------------ pkg/sfu/buffer/buffer.go | 2 +- pkg/sfu/mime/mimetype.go | 10 +++----- 11 files changed, 70 insertions(+), 57 deletions(-) diff --git a/pkg/rtc/mediaengine.go b/pkg/rtc/mediaengine.go index 3cf4f3ed5..e1e7180a2 100644 --- a/pkg/rtc/mediaengine.go +++ b/pkg/rtc/mediaengine.go @@ -45,7 +45,7 @@ var ( PayloadType: 63, } - pcmuCodecParameters = webrtc.RTPCodecParameters{ + PCMUCodecParameters = webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: mime.MimeTypePCMU.String(), ClockRate: 8000, @@ -53,7 +53,7 @@ var ( PayloadType: 0, } - pcmaCodecParameters = webrtc.RTPCodecParameters{ + PCMACodecParameters = webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: mime.MimeTypePCMA.String(), ClockRate: 8000, @@ -166,7 +166,7 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac } } - for _, codec := range []webrtc.RTPCodecParameters{pcmuCodecParameters, pcmaCodecParameters} { + for _, codec := range []webrtc.RTPCodecParameters{PCMUCodecParameters, PCMACodecParameters} { if !IsCodecEnabled(codecs, codec.RTPCodecCapability) { continue } diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 0281cee99..b43e3d9c7 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -581,7 +581,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su StreamId: streamId, UpstreamCodecs: potentialCodecs, Logger: tLogger, - DisableRed: t.TrackInfo().GetDisableRed() || !t.params.AudioConfig.ActiveREDEncoding, + DisableRed: !IsRedEnabled(t.TrackInfo()) || !t.params.AudioConfig.ActiveREDEncoding, IsEncrypted: t.IsEncrypted(), }) subID := sub.ID() @@ -598,7 +598,11 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su t.lock.RUnlock() if remove { - t.params.Logger.Debugw("removing subscriber on a not-open track", "subscriberID", subID, "isExpectedToResume", isExpectedToResume) + t.params.Logger.Debugw( + "removing subscriber on a not-open track", + "subscriberID", subID, + "isExpectedToResume", isExpectedToResume, + ) _ = t.MediaTrackSubscriptions.RemoveSubscriber(subID, isExpectedToResume) return nil, ErrNotOpen } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index de1781c53..f350596c2 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -207,7 +207,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * info := t.params.MediaTrack.ToProto() addTrackParams := types.AddTrackParams{ Stereo: slices.Contains(info.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO), - Red: !info.DisableRed, + Red: IsRedEnabled(info), } codecs := wr.Codecs() if addTrackParams.Red && (len(codecs) == 1 && mime.IsMimeTypeStringOpus(codecs[0].MimeType)) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index bb4f4e9bc..edb217ece 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2930,7 +2930,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l mimeType = altCodec } if videoLayerMode == livekit.VideoLayer_MODE_UNUSED { - if mime.IsMimeTypeStringSVC(mimeType) { + if mime.IsMimeTypeStringSVCCapable(mimeType) { videoLayerMode = livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM } else { if p.params.ClientInfo.isOBS() { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 7bf05045e..5cd20565d 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -637,14 +637,29 @@ func TestPreferAudioCodecForRed(t *testing.T) { require.NoError(t, err) defer pc.Close() - for i, disableRed := range []bool{false, true} { + for idx, disableRed := range []bool{false, true, false, true} { t.Run(fmt.Sprintf("disableRed=%v", disableRed), func(t *testing.T) { - trackCid := fmt.Sprintf("audiotrack%d", i) - participant.AddTrack(&livekit.AddTrackRequest{ - Type: livekit.TrackType_AUDIO, - DisableRed: disableRed, - Cid: trackCid, - }) + trackCid := fmt.Sprintf("audiotrack%d", idx) + req := &livekit.AddTrackRequest{ + Type: livekit.TrackType_AUDIO, + Cid: trackCid, + } + if idx < 2 { + req.DisableRed = disableRed + } else { + codec := "red" + if disableRed { + codec = "opus" + } + req.SimulcastCodecs = []*livekit.SimulcastCodec{ + { + Codec: codec, + Cid: trackCid, + }, + } + } + participant.AddTrack(req) + track, err := webrtc.NewTrackLocalStaticRTP( webrtc.RTPCodecCapability{MimeType: "audio/opus"}, trackCid, @@ -659,8 +674,10 @@ func TestPreferAudioCodecForRed(t *testing.T) { require.NoError(t, err) codecs := transceiver.Sender().GetParameters().Codecs for i, c := range codecs { - if c.MimeType == "audio/opus" && i != 0 { - codecs[0], codecs[i] = codecs[i], codecs[0] + if c.MimeType == "audio/opus" { + if i != 0 { + codecs[0], codecs[i] = codecs[i], codecs[0] + } break } } @@ -694,7 +711,6 @@ func TestPreferAudioCodecForRed(t *testing.T) { Sdp: sdp.SDP, Id: offerId, }) - require.Eventually( t, func() bool { @@ -710,7 +726,7 @@ func TestPreferAudioCodecForRed(t *testing.T) { var audioSectionIndex int for _, m := range parsed.MediaDescriptions { if m.MediaName.Media == "audio" { - if audioSectionIndex == i { + if audioSectionIndex == idx { codecs, err := lksdp.CodecsFromMediaDescription(m) require.NoError(t, err) // nack is always enabled. if red is preferred, server will not generate nack request diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index 8dd5a4725..6dc20f605 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -303,10 +303,11 @@ func (p *ParticipantImpl) setCodecPreferencesOpusRedForPublisher( continue } - // if RED is disabled for this track, don't prefer RED codec in offer + preferRED := IsRedEnabled(ti) + // if RED is enabled for this track, prefer RED codec in offer for _, codec := range codecs { // codec contain opus/red - if !ti.DisableRed && + if preferRED && mime.IsMimeTypeCodecStringRED(codec.Name) && strings.Contains(codec.Fmtp, strconv.FormatInt(int64(opusPayload), 10)) { configureReceiverCodecs(transceiver, "audio/red", true) diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index ac1c60e93..0aeda7199 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -433,7 +433,7 @@ func (t *SubscribedTrack) OnRttUpdate(rtt uint32) { } func (t *SubscribedTrack) OnCodecNegotiated(codec webrtc.RTPCodecCapability) { - if t.params.WrappedReceiver.DetermineReceiver(codec) { + if !t.params.WrappedReceiver.DetermineReceiver(codec) { return } @@ -447,10 +447,14 @@ func (t *SubscribedTrack) OnCodecNegotiated(codec webrtc.RTPCodecCapability) { livekit.VideoQuality_HIGH, t.params.MediaTrack.ToProto(), ) - t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial) + if t.params.OnSubscriberMaxQualityChange != nil { + t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial) + } case livekit.TrackType_AUDIO: - t.params.OnSubscriberAudioCodecChange(t.downTrack.SubscriberID(), mimeType, true) + if t.params.OnSubscriberAudioCodecChange != nil { + t.params.OnSubscriberAudioCodecChange(t.downTrack.SubscriberID(), mimeType, true) + } } }() } diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index d190a8ac8..8a70c6e9f 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -192,3 +192,11 @@ func ChunkProtoBatch[T proto.Message](batch []T, target int) [][]T { } return chunks } + +func IsRedEnabled(ti *livekit.TrackInfo) bool { + if len(ti.Codecs) != 0 && ti.Codecs[0].MimeType != "" { + return mime.IsMimeTypeStringRED(ti.Codecs[0].MimeType) + } + + return !ti.GetDisableRed() +} diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index d3a2aeb95..fc8edbb45 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -88,7 +88,7 @@ func (r *WrappedReceiver) StreamID() string { return r.params.StreamId } -// DetermineReceiver determines the receiver of negotiated codec and return ready state of the receiver +// DetermineReceiver determines the receiver of negotiated codec and return if there is a match func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) bool { r.lock.Lock() @@ -113,33 +113,21 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo } } if trackReceiver == nil { - r.params.Logger.Errorw("can't determine receiver for codec", nil, "codec", codec.MimeType) - if len(r.receivers) > 0 { - trackReceiver = r.receivers[0] - } + r.lock.Unlock() + r.params.Logger.Warnw("can't determine receiver for codec", nil, "codec", codec.MimeType) + return false } r.TrackReceiver = trackReceiver - var onReadyCallbacks []func() - if trackReceiver != nil { - onReadyCallbacks = r.onReadyCallbacks - r.onReadyCallbacks = nil - } + onReadyCallbacks := r.onReadyCallbacks + r.onReadyCallbacks = nil r.lock.Unlock() - if trackReceiver != nil { - for _, f := range onReadyCallbacks { - trackReceiver.AddOnReady(f) - } - - if s, ok := trackReceiver.(*simulcastReceiver); ok { - if d, ok := s.TrackReceiver.(*DummyReceiver); ok { - return d.IsReady() - } - } - return true + for _, f := range onReadyCallbacks { + trackReceiver.AddOnReady(f) } - return false + + return true } func (r *WrappedReceiver) Codecs() []webrtc.RTPCodecParameters { @@ -451,10 +439,6 @@ func (d *DummyReceiver) AddOnReady(f func()) { } } -func (d *DummyReceiver) IsReady() bool { - return d.receiver.Load() != nil -} - func (d *DummyReceiver) AddOnCodecStateChange(f func(codec webrtc.RTPCodecParameters, state sfu.ReceiverCodecState)) { var receiver sfu.TrackReceiver d.downTrackLock.Lock() diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 587b315bd..4805fde5f 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -331,7 +331,7 @@ func (b *Buffer) OnCodecChange(fn func(webrtc.RTPCodecParameters)) { } func (b *Buffer) createDDParserAndFrameRateCalculator() { - if mime.IsMimeTypeSVC(b.mime) || b.mime == mime.MimeTypeVP8 { + if mime.IsMimeTypeSVCCapable(b.mime) || b.mime == mime.MimeTypeVP8 { frc := NewFrameRateCalculatorDD(b.clockRate, b.logger) for i := range b.frameRateCalculator { b.frameRateCalculator[i] = frc.GetFrameRateCalculatorForSpatial(int32(i)) diff --git a/pkg/sfu/mime/mimetype.go b/pkg/sfu/mime/mimetype.go index cada587ae..d336bccf0 100644 --- a/pkg/sfu/mime/mimetype.go +++ b/pkg/sfu/mime/mimetype.go @@ -320,11 +320,7 @@ func IsMimeTypeVideo(mimeType MimeType) bool { return strings.HasPrefix(mimeType.String(), MimeTypePrefixVideo) } -// SVC-TODO: Have to use more conditions to differentiate between -// SVC-TODO: SVC and non-SVC (could be single layer or simulcast). -// SVC-TODO: May only need to differentiate between simulcast and non-simulcast -// SVC-TODO: i. e. may be possible to treat single layer as SVC to get proper/intended functionality. -func IsMimeTypeSVC(mimeType MimeType) bool { +func IsMimeTypeSVCCapable(mimeType MimeType) bool { switch mimeType { case MimeTypeAV1, MimeTypeVP9: return true @@ -332,8 +328,8 @@ func IsMimeTypeSVC(mimeType MimeType) bool { return false } -func IsMimeTypeStringSVC(mime string) bool { - return IsMimeTypeSVC(NormalizeMimeType(mime)) +func IsMimeTypeStringSVCCapable(mime string) bool { + return IsMimeTypeSVCCapable(NormalizeMimeType(mime)) } func IsMimeTypeStringRED(mime string) bool {