diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 8ab9b5ed5..145d8cc59 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -103,7 +103,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * t.subscribedTracksMu.Unlock() sub.VerifySubscribeParticipantInfo(subTrack.PublisherID(), subTrack.PublisherVersion()) sub.AddMuxAudioTrack(subTrack.PublisherID(), trackID, wr) - return nil + return subTrack, nil } var rtcpFeedback []webrtc.RTCPFeedback diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6a795ec26..51bca7c9a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -844,8 +844,9 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo subscribedTracks := p.SubscriptionManager.GetSubscribedTracks() subscriberScores := make(map[livekit.TrackID]float32, len(subscribedTracks)) + // TODO-mux: calculate score for muxed tracks for _, subTrack := range subscribedTracks { - if subTrack.IsMuted() || subTrack.MediaTrack().IsMuted() { + if subTrack.IsMuted() || subTrack.MediaTrack().IsMuted() || subTrack.DownTrack() == nil { continue } score := subTrack.DownTrack().GetConnectionScore() @@ -1286,10 +1287,18 @@ func (p *ParticipantImpl) subscriberRTCPWorker() { var srs []rtcp.Packet var sd []rtcp.SourceDescriptionChunk subscribedTracks := p.SubscriptionManager.GetSubscribedTracks() + downtracks := p.audioForwarder.GetDowntracks() p.lock.RLock() for _, subTrack := range subscribedTracks { - sr := subTrack.DownTrack().CreateSenderReport() - chunks := subTrack.DownTrack().CreateSourceDescriptionChunks() + if subTrack.DownTrack() == nil { + continue + } + downtracks = append(downtracks, subTrack.DownTrack()) + } + + for _, dt := range downtracks { + sr := dt.CreateSenderReport() + chunks := dt.CreateSourceDescriptionChunks() if sr == nil || chunks == nil { continue } @@ -2133,7 +2142,7 @@ func (p *ParticipantImpl) addDowntrack() error { // }) // Bind callback can happen from replaceTrack, so set it up early - downTrack.OnBind(func() { + downTrack.OnBinding(func() { p.audioForwarder.AddDownTrack(downTrack) // wr.DetermineReceiver(downTrack.Codec()) // if reusingTransceiver.Load() { @@ -2158,7 +2167,7 @@ func (p *ParticipantImpl) addDowntrack() error { }) downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) { - go p.UpdateRTT(rtt) + go p.UpdateMediaRTT(rtt) }) downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 3a08c731f..c3e6cee92 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -51,7 +51,7 @@ func NewSubscribedTrack(params SubscribedTrackParams) *SubscribedTrack { s := &SubscribedTrack{ params: params, logger: params.Subscriber.GetLogger().WithValues( - "trackID", params.DownTrack.ID(), + "trackID", params.MediaTrack.ID(), "publisherID", params.PublisherID, "publisher", params.PublisherIdentity, ), diff --git a/pkg/sfu/audioselection/forwarder.go b/pkg/sfu/audioselection/forwarder.go index 5a684623e..d3a7f5212 100644 --- a/pkg/sfu/audioselection/forwarder.go +++ b/pkg/sfu/audioselection/forwarder.go @@ -90,9 +90,18 @@ func (f *SelectionForwarder) AddDownTrack(dt *sfu.DownTrack) { f.lock.Unlock() } +// TODO-mux: impelement func (f *SelectionForwarder) RemoveDownTrack(dt *sfu.DownTrack) { } +func (f *SelectionForwarder) GetDowntracks() []*sfu.DownTrack { + f.lock.RLock() + defer f.lock.RUnlock() + tracks := make([]*sfu.DownTrack, len(f.downtracks)) + copy(tracks, f.downtracks) + return tracks +} + // OnForwardMappingChanged is called when the forward mapping is changed, used to update the relationship between downtracks and sources func (f *SelectionForwarder) OnForwardMappingChanged(h func(muxInfo []*livekit.AudioTrackMuxInfo)) { f.onForwardMappingChanged = h diff --git a/pkg/sfu/audioselection/nullreceiver.go b/pkg/sfu/audioselection/nullreceiver.go index 24c1e7520..2f6e3ed18 100644 --- a/pkg/sfu/audioselection/nullreceiver.go +++ b/pkg/sfu/audioselection/nullreceiver.go @@ -4,6 +4,7 @@ import ( "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/livekit" ) @@ -35,8 +36,8 @@ func (r *NullReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter { func (r *NullReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { return 0, nil } -func (r *NullReceiver) GetLayeredBitrate() sfu.Bitrates { - return sfu.Bitrates{} +func (r *NullReceiver) GetLayeredBitrate() ([]int32, sfu.Bitrates) { + return nil, sfu.Bitrates{} } func (r *NullReceiver) GetAudioLevel() (smooth, loudest float64, active bool) { return 0, 0, false @@ -70,3 +71,11 @@ func (r *NullReceiver) GetRedReceiver() sfu.TrackReceiver { func (r *NullReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return nil } + +func (r *NullReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { + return nil +} + +func (r *NullReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + return ts, nil +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 4f7ef920e..ea3d96250 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -259,7 +259,9 @@ func NewDownTrack( d.receiverLock.Lock() d.receiver = r d.receiverLock.Unlock() - d.forwarder = NewForwarder(d.kind, d.logger, d.receiver.GetReferenceLayerRTPTimestamp) + d.forwarder = NewForwarder(d.kind, d.logger, func(ts uint32, layer, referenceLayer int32) (uint32, error) { + return d.getReceiver().GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) + }) d.forwarder.OnParkedLayersExpired(func() { if d.onSubscriptionChanged != nil { d.onSubscriptionChanged(d)