From 2c9d7fbbcb0d96b694628c047ff3eadd015666cb Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 28 Dec 2022 15:38:08 +0800 Subject: [PATCH] WIP --- pkg/rtc/mediatracksubscriptions.go | 21 ++- pkg/rtc/participant.go | 124 +++++++++++++++ pkg/rtc/types/interfaces.go | 2 + pkg/sfu/audioselection/forwarder.go | 211 +++++++++++++++++++++++-- pkg/sfu/audioselection/nullreceiver.go | 72 +++++++++ pkg/sfu/downtrack.go | 37 +++-- 6 files changed, 443 insertions(+), 24 deletions(-) create mode 100644 pkg/sfu/audioselection/nullreceiver.go diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index de6a94f5b..8e24b48ee 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -94,6 +94,24 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * } t.subscribedTracksMu.Unlock() + if t.params.MediaTrack.Kind() == livekit.TrackType_AUDIO /*&& audioselection.AudioCodecCanbeMux(*t.params.MediaTrack.ToProto(), wr.codecs) */ { + wr.DetermineReceiver(opusCodecCapability) + sub.AddMuxAudioTrack(trackID, wr) + subTrack := NewSubscribedTrack(SubscribedTrackParams{ + PublisherID: t.params.MediaTrack.PublisherID(), + PublisherIdentity: t.params.MediaTrack.PublisherIdentity(), + PublisherVersion: t.params.MediaTrack.PublisherVersion(), + Subscriber: sub, + MediaTrack: t.params.MediaTrack, + DownTrack: nil, + AdaptiveStream: sub.GetAdaptiveStream(), + }) + t.subscribedTracksMu.Lock() + t.subscribedTracks[subscriberID] = subTrack + t.subscribedTracksMu.Unlock() + return nil + } + var rtcpFeedback []webrtc.RTCPFeedback switch t.params.MediaTrack.Kind() { case livekit.TrackType_AUDIO: @@ -285,7 +303,9 @@ func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.Particip func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, willBeResumed bool) { dt := subTrack.DownTrack() + sub := subTrack.Subscriber() if dt == nil { + sub.RemoveMuxAudioTrack(t.params.MediaTrack.ID()) return } @@ -294,7 +314,6 @@ func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.Subscribed if willBeResumed { tr := dt.GetTransceiver() if tr != nil { - sub := subTrack.Subscriber() sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState()) } } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 765f22d99..bd45eb546 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -21,6 +21,8 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/supervisor" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/audio" + "github.com/livekit/livekit-server/pkg/sfu/audioselection" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/telemetry" @@ -37,6 +39,8 @@ const ( disconnectCleanupDuration = 15 * time.Second migrationWaitDuration = 3 * time.Second + + muxAudioTracks = 3 ) type pendingTrackInfo struct { @@ -163,6 +167,8 @@ type ParticipantImpl struct { trackPublisherVersion map[livekit.TrackID]uint32 supervisor *supervisor.ParticipantSupervisor + + audioForwarder *audioselection.SelectionForwarder } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -194,6 +200,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { params.SID, params.Telemetry), supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), + audioForwarder: audioselection.NewSelectionForwarder(audioselection.SelectionForwarderParams{ + ActiveDowntracks: 3, + Logger: params.Logger, + ActiveLevelThreshold: audio.ConvertAudioLevel(35), + }), } p.version.Store(params.InitialVersion) p.migrateState.Store(types.MigrateStateInit) @@ -216,6 +227,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.setupUpTrackManager() + err = p.setupAudioForwarder() + if err != nil { + return nil, err + } + return p, nil } @@ -652,6 +668,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.TransportManager.Close() }() + p.audioForwarder.Stop() p.dataChannelStats.Report() return nil @@ -963,6 +980,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { if !isAlreadySubscribed && onSubscribedTo != nil { onSubscribedTo(p, publisherID) } + } // RemoveSubscribedTrack removes a track to the participant's subscribed list @@ -1297,6 +1315,7 @@ func (p *ParticipantImpl) onPrimaryTransportInitialConnected() { func (p *ParticipantImpl) onPrimaryTransportFullyEstablished() { p.updateState(livekit.ParticipantInfo_ACTIVE) + p.audioForwarder.Start() } func (p *ParticipantImpl) clearDisconnectTimer() { @@ -2163,3 +2182,108 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e return out, nil } + +func (p *ParticipantImpl) AddMuxAudioTrack(trackID livekit.TrackID, r sfu.TrackReceiver) { + p.audioForwarder.AddSource(trackID, r) +} + +func (p *ParticipantImpl) RemoveMuxAudioTrack(trackID livekit.TrackID) { + p.audioForwarder.RemoveSource(trackID) + +} + +func (p *ParticipantImpl) setupAudioForwarder() error { + // 1. add downtracks + for i := 0; i < muxAudioTracks; i++ { + if err := p.addDowntrack(); err != nil { + p.params.Logger.Errorw("error adding downtrack", err) + return err + } + } + + // 2. setup audio forwarder callbacks + p.audioForwarder.OnForwardMappingChanged(func(forwardMapping map[livekit.TrackID]livekit.TrackID) { + p.params.Logger.Debugw("forward mapping changed", "mappings", forwardMapping) + }) + return nil +} + +func (p *ParticipantImpl) addDowntrack() error { + codecs := []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: opusCodecCapability, + }, + } + trackID := livekit.TrackID(utils.NewGuid(utils.TrackPrefix + "AX")) + streamID := PackStreamID(p.ID(), trackID) + downTrack, err := sfu.NewDownTrack( + codecs, + audioselection.NewNullReceiver(streamID, trackID), + p.GetBufferFactory(), + p.ID(), + p.params.Config.Receiver.PacketBufferSize, + LoggerWithTrack(p.GetLogger(), trackID, false), + ) + if err != nil { + return err + } + + // subTrack := NewSubscribedTrack(SubscribedTrackParams{ + // PublisherID: t.params.MediaTrack.PublisherID(), + // PublisherIdentity: t.params.MediaTrack.PublisherIdentity(), + // PublisherVersion: t.params.MediaTrack.PublisherVersion(), + // Subscriber: sub, + // MediaTrack: t.params.MediaTrack, + // DownTrack: downTrack, + // AdaptiveStream: sub.GetAdaptiveStream(), + // }) + + // Bind callback can happen from replaceTrack, so set it up early + downTrack.OnBind(func() { + p.audioForwarder.AddDownTrack(downTrack) + // wr.DetermineReceiver(downTrack.Codec()) + // if reusingTransceiver.Load() { + // downTrack.SeedState(dtState) + // } + // if err = wr.AddDownTrack(downTrack); err != nil && err != sfu.ErrReceiverClosed { + // sub.GetLogger().Errorw( + // "could not add down track", err, + // "publisher", subTrack.PublisherIdentity(), + // "publisherID", subTrack.PublisherID(), + // "trackID", trackID, + // ) + // } + + // go subTrack.Bound() + + // subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted()) + }) + + downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) { + // t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat) + }) + + downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) { + go p.UpdateRTT(rtt) + }) + + downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { + p.OnReceiverReport(dt, report) + }) + + sender, transceiver, err := p.AddTransceiverFromTrackToSubscriber(downTrack, types.AddTrackParams{}) + if err != nil { + return err + } + + sendParameters := sender.GetParameters() + downTrack.SetRTPHeaderExtensions(sendParameters.HeaderExtensions) + + downTrack.SetTransceiver(transceiver) + + downTrack.OnCloseHandler(func(willBeResumed bool) { + p.audioForwarder.RemoveDownTrack(downTrack) + }) + + return nil +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index f498718d3..283f90923 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -267,6 +267,8 @@ type LocalParticipant interface { UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error GetSubscribedTracks() []SubscribedTrack VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) + AddMuxAudioTrack(trackID livekit.TrackID, r sfu.TrackReceiver) + RemoveMuxAudioTrack(trackID livekit.TrackID) // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []livekit.ParticipantID diff --git a/pkg/sfu/audioselection/forwarder.go b/pkg/sfu/audioselection/forwarder.go index b24545999..6c25dc419 100644 --- a/pkg/sfu/audioselection/forwarder.go +++ b/pkg/sfu/audioselection/forwarder.go @@ -1,26 +1,215 @@ package audioselection -type AudioSource interface { - Activate() - Deactivate() - GetAudioLevel() uint8 +import ( + "sort" + "sync" + "time" + + "github.com/pion/webrtc/v3" + + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +const ( + updateInterval = 100 * time.Millisecond +) + +// type sfu.Receiver interface { +// Activate() +// Deactivate() +// GetAudioLevel() uint8 +// ID() livekit.TrackID +// } + +func AudioCodecCanbeMux(ti livekit.TrackInfo, codecs []webrtc.RTPCodecParameters) bool { + if len(codecs) == 0 || ti.Stereo { + return false + } + + c := codecs[0] + return c.MimeType == webrtc.MimeTypeOpus } -type SelectionForworderParams struct { +type SelectionForwarderParams struct { + ActiveDowntracks int + FadeDowntracks int + ActiveLevelThreshold float64 + FadeoutTime time.Duration + FadeinTime time.Duration + Logger logger.Logger + RequestDownTrack func(sfu.TrackReceiver) *sfu.DownTrack } -type SelectionForworder struct { +type sourceInfo struct { + trackID livekit.TrackID + receiver sfu.TrackReceiver + vad bool + active bool + audioLevel float64 + downtrack *sfu.DownTrack } -func NewSelectionForworder() *SelectionForworder { - return &SelectionForworder{} +type SelectionForwarder struct { + lock sync.RWMutex + params SelectionForwarderParams + sources []*sourceInfo + idleDowntracks []*sfu.DownTrack + downtracks []*sfu.DownTrack + close chan struct{} + + onForwardMappingChanged func(forwardMapping map[livekit.TrackID]livekit.TrackID) } -func (f *SelectionForworder) AddDownTrack() { +func NewSelectionForwarder(params SelectionForwarderParams) *SelectionForwarder { + return &SelectionForwarder{ + params: params, + close: make(chan struct{}), + } } -func (f *SelectionForworder) RemoveDownTrack() { +func (f *SelectionForwarder) Start() { + go f.process() } -func (f *SelectionForworder) OnRequestDowntrack() { +func (f *SelectionForwarder) Stop() { + close(f.close) +} + +func (f *SelectionForwarder) AddDownTrack(dt *sfu.DownTrack) { + f.lock.Lock() + f.downtracks = append(f.downtracks, dt) + f.idleDowntracks = append(f.idleDowntracks, dt) + f.lock.Unlock() +} + +func (f *SelectionForwarder) RemoveDownTrack(dt *sfu.DownTrack) { +} + +// OnForwardMappingChanged is called when the forward mapping is changed, used to update the relationship between downtracks and sources +func (f *SelectionForwarder) OnForwardMappingChanged(h func(forwardMapping map[livekit.TrackID]livekit.TrackID)) { + f.onForwardMappingChanged = h +} + +func (f *SelectionForwarder) AddSource(trackID livekit.TrackID, source sfu.TrackReceiver) { + f.params.Logger.Debugw("adding source", "trackID", trackID) + f.lock.Lock() + f.sources = append(f.sources, &sourceInfo{trackID: trackID, receiver: source}) + f.lock.Unlock() +} + +func (f *SelectionForwarder) RemoveSource(trackID livekit.TrackID) { + f.lock.Lock() + for i, s := range f.sources { + if s.trackID == trackID { + if s.active { + f.deactiveSource(s) + } + f.sources[i] = f.sources[len(f.sources)-1] + f.sources = f.sources[:len(f.sources)-1] + break + } + } + f.lock.Unlock() +} + +func (f *SelectionForwarder) MuteSource(source sfu.TrackReceiver, mute bool) { +} + +func (f *SelectionForwarder) process() { + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + f.updateForward() + + case <-f.close: + return + } + } +} + +func (f *SelectionForwarder) updateForward() { + f.lock.Lock() + defer f.lock.Unlock() + if len(f.sources) == 0 { + return + } + + for _, source := range f.sources { + source.audioLevel, _ = source.receiver.GetAudioLevel() + } + + sort.Slice(f.sources, func(i, j int) bool { + return f.sources[i].audioLevel > f.sources[j].audioLevel + }) + + var activeteSources, idleSources []*sourceInfo + for i, source := range f.sources { + if i >= f.params.ActiveDowntracks && source.active { + idleSources = append(idleSources, source) + } else { + if source.audioLevel > f.params.ActiveLevelThreshold && !source.active { + activeteSources = append(activeteSources, source) + } else if source.audioLevel <= f.params.ActiveLevelThreshold && source.active { + idleSources = append(idleSources, source) + } + } + } + + var forwardChanged bool + for _, source := range activeteSources { + if len(f.idleDowntracks) == 0 && len(idleSources) > 0 { + f.deactiveSource(idleSources[0]) + idleSources = idleSources[1:] + } + if f.activeSource(source) { + forwardChanged = true + } + } + + if forwardChanged && f.onForwardMappingChanged != nil { + forwardMapping := make(map[livekit.TrackID]livekit.TrackID) + for _, source := range f.sources { + if source.active && source.downtrack != nil { + forwardMapping[source.receiver.TrackID()] = livekit.TrackID(source.downtrack.ID()) + } + } + f.onForwardMappingChanged(forwardMapping) + } +} + +func (f *SelectionForwarder) activeSource(source *sourceInfo) bool { + f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID()) + if len(f.idleDowntracks) == 0 { + if len(f.downtracks) < f.params.ActiveDowntracks { + dt := f.params.RequestDownTrack(source.receiver) + f.downtracks = append(f.downtracks, dt) + f.idleDowntracks = append(f.idleDowntracks, dt) + } else { + f.params.Logger.Warnw("no idle downtracks for active source", nil, "trackID", source.receiver.TrackID()) + return false + } + } + source.active = true + source.downtrack = f.idleDowntracks[0] + f.idleDowntracks = f.idleDowntracks[1:] + source.downtrack.ResetReceiver(source.receiver) + source.receiver.AddDownTrack(source.downtrack) + return true +} + +func (f *SelectionForwarder) deactiveSource(source *sourceInfo) { + f.params.Logger.Debugw("deactivate source", "trackID", source.receiver.TrackID()) + source.active = false + dt := source.downtrack + source.downtrack = nil + if dt != nil { + dt.ResetReceiver(&NullReceiver{}) + source.receiver.DeleteDownTrack(dt.SubscriberID()) + f.idleDowntracks = append(f.idleDowntracks, dt) + } } diff --git a/pkg/sfu/audioselection/nullreceiver.go b/pkg/sfu/audioselection/nullreceiver.go new file mode 100644 index 000000000..32166ec8c --- /dev/null +++ b/pkg/sfu/audioselection/nullreceiver.go @@ -0,0 +1,72 @@ +package audioselection + +import ( + "github.com/pion/webrtc/v3" + + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/protocol/livekit" +) + +type NullReceiver struct { + streamID string + trackID livekit.TrackID +} + +func NewNullReceiver(streamID string, trackID livekit.TrackID) *NullReceiver { + return &NullReceiver{ + streamID: streamID, + trackID: trackID, + } +} + +func (r *NullReceiver) TrackID() livekit.TrackID { + return r.trackID +} +func (r *NullReceiver) StreamID() string { + return r.streamID +} + +func (r *NullReceiver) Codec() webrtc.RTPCodecParameters { + return webrtc.RTPCodecParameters{} +} +func (r *NullReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter { + return nil +} +func (r *NullReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { + return 0, nil +} +func (r *NullReceiver) GetLayeredBitrate() sfu.Bitrates { + return sfu.Bitrates{} +} +func (r *NullReceiver) GetAudioLevel() (float64, bool) { + return 0, false +} +func (r *NullReceiver) SendPLI(layer int32, force bool) { +} +func (r *NullReceiver) SetUpTrackPaused(paused bool) { +} +func (r *NullReceiver) SetMaxExpectedSpatialLayer(layer int32) { +} +func (r *NullReceiver) AddDownTrack(track sfu.TrackSender) error { + return nil +} +func (r *NullReceiver) DeleteDownTrack(participantID livekit.ParticipantID) { +} +func (r *NullReceiver) DebugInfo() map[string]interface{} { + return nil +} +func (r *NullReceiver) GetLayerDimension(layer int32) (uint32, uint32) { + return 0, 0 +} +func (r *NullReceiver) TrackInfo() *livekit.TrackInfo { + return &livekit.TrackInfo{} +} +func (r *NullReceiver) GetPrimaryReceiverForRed() sfu.TrackReceiver { + return r +} +func (r *NullReceiver) GetRedReceiver() sfu.TrackReceiver { + return r +} +func (r *NullReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { + return nil +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 8da91bed4..3a24f03e6 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -236,11 +236,11 @@ func NewDownTrack( maxTrack: mt, streamID: r.StreamID(), bufferFactory: bf, - receiver: r, upstreamCodecs: codecs, kind: kind, codec: codecs[0].RTPCodecCapability, } + d.receiver = r d.forwarder = NewForwarder(d.kind, d.logger) d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ @@ -271,6 +271,19 @@ func NewDownTrack( return d, nil } +func (d *DownTrack) ResetReceiver(r TrackReceiver) { + d.bindLock.Lock() + d.receiver = r + d.bindLock.Unlock() + // TODO: log stats +} + +func (d *DownTrack) getReceiver() TrackReceiver { + d.bindLock.Lock() + defer d.bindLock.Unlock() + return d.receiver +} + // Bind is called by the PeerConnection after negotiation is complete // This asserts that the code requested is supported by the remote peer. // If so it sets up all the state (SSRC and PayloadType) to have a call @@ -340,7 +353,7 @@ func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error { } func (d *DownTrack) TrackInfoAvailable() { - d.connectionStats.Start(d.receiver.TrackInfo()) + d.connectionStats.Start(d.getReceiver().TrackInfo()) } // ID is the unique identifier for this Track. This should be unique for the @@ -431,7 +444,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { for { if d.connected.Load() { d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) - d.receiver.SendPLI(layer, false) + d.getReceiver().SendPLI(layer, false) d.rtpStats.UpdateLayerLockPliAndTime(1) } @@ -873,7 +886,7 @@ func (d *DownTrack) IsDeficient() bool { } func (d *DownTrack) BandwidthRequested() int64 { - return d.forwarder.BandwidthRequested(d.receiver.GetLayeredBitrate()) + return d.forwarder.BandwidthRequested(d.getReceiver().GetLayeredBitrate()) } func (d *DownTrack) DistanceToDesired() int32 { @@ -881,13 +894,13 @@ func (d *DownTrack) DistanceToDesired() int32 { } func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation { - allocation := d.forwarder.AllocateOptimal(d.receiver.GetLayeredBitrate(), allowOvershoot) + allocation := d.forwarder.AllocateOptimal(d.getReceiver().GetLayeredBitrate(), allowOvershoot) d.maybeStartKeyFrameRequester() return allocation } func (d *DownTrack) ProvisionalAllocatePrepare() { - d.forwarder.ProvisionalAllocatePrepare(d.receiver.GetLayeredBitrate()) + d.forwarder.ProvisionalAllocatePrepare(d.getReceiver().GetLayeredBitrate()) } func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, allowOvershoot bool) int64 { @@ -913,19 +926,19 @@ func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation { } func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) { - allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetLayeredBitrate(), allowOvershoot) + allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.getReceiver().GetLayeredBitrate(), allowOvershoot) d.maybeStartKeyFrameRequester() return allocation, available } func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) { - transition, available := d.forwarder.GetNextHigherTransition(d.receiver.GetLayeredBitrate(), allowOvershoot) + transition, available := d.forwarder.GetNextHigherTransition(d.getReceiver().GetLayeredBitrate(), allowOvershoot) d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available) return transition, available } func (d *DownTrack) Pause() VideoAllocation { - allocation := d.forwarder.Pause(d.receiver.GetLayeredBitrate()) + allocation := d.forwarder.Pause(d.getReceiver().GetLayeredBitrate()) d.maybeStartKeyFrameRequester() return allocation } @@ -1128,7 +1141,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { targetLayers := d.forwarder.TargetLayers() if targetLayers != InvalidLayers { d.logger.Debugw("sending PLI RTCP", "layer", targetLayers.Spatial) - d.receiver.SendPLI(targetLayers.Spatial, false) + d.getReceiver().SendPLI(targetLayers.Spatial, false) d.isNACKThrottled.Store(true) d.rtpStats.UpdatePliTime() pliOnce = false @@ -1219,7 +1232,7 @@ func (d *DownTrack) SetConnected() { if d.bound.Load() && d.kind == webrtc.RTPCodecTypeVideo { targetLayers := d.forwarder.TargetLayers() if targetLayers != InvalidLayers { - d.receiver.SendPLI(targetLayers.Spatial, true) + d.getReceiver().SendPLI(targetLayers.Spatial, true) } } } @@ -1266,7 +1279,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { } pktBuff := *src - n, err := d.receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo) + n, err := d.getReceiver().ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo) if err != nil { if err == io.EOF { break