This commit is contained in:
cnderrauber
2022-12-28 15:38:08 +08:00
parent 06975d901a
commit 2c9d7fbbcb
6 changed files with 443 additions and 24 deletions
+20 -1
View File
@@ -94,6 +94,24 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
}
t.subscribedTracksMu.Unlock()
if t.params.MediaTrack.Kind() == livekit.TrackType_AUDIO /*&& audioselection.AudioCodecCanbeMux(*t.params.MediaTrack.ToProto(), wr.codecs) */ {
wr.DetermineReceiver(opusCodecCapability)
sub.AddMuxAudioTrack(trackID, wr)
subTrack := NewSubscribedTrack(SubscribedTrackParams{
PublisherID: t.params.MediaTrack.PublisherID(),
PublisherIdentity: t.params.MediaTrack.PublisherIdentity(),
PublisherVersion: t.params.MediaTrack.PublisherVersion(),
Subscriber: sub,
MediaTrack: t.params.MediaTrack,
DownTrack: nil,
AdaptiveStream: sub.GetAdaptiveStream(),
})
t.subscribedTracksMu.Lock()
t.subscribedTracks[subscriberID] = subTrack
t.subscribedTracksMu.Unlock()
return nil
}
var rtcpFeedback []webrtc.RTCPFeedback
switch t.params.MediaTrack.Kind() {
case livekit.TrackType_AUDIO:
@@ -285,7 +303,9 @@ func (t *MediaTrackSubscriptions) RemoveSubscriber(subscriberID livekit.Particip
func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.SubscribedTrack, willBeResumed bool) {
dt := subTrack.DownTrack()
sub := subTrack.Subscriber()
if dt == nil {
sub.RemoveMuxAudioTrack(t.params.MediaTrack.ID())
return
}
@@ -294,7 +314,6 @@ func (t *MediaTrackSubscriptions) closeSubscribedTrack(subTrack types.Subscribed
if willBeResumed {
tr := dt.GetTransceiver()
if tr != nil {
sub := subTrack.Subscriber()
sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState())
}
}
+124
View File
@@ -21,6 +21,8 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/supervisor"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/sfu/audioselection"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/telemetry"
@@ -37,6 +39,8 @@ const (
disconnectCleanupDuration = 15 * time.Second
migrationWaitDuration = 3 * time.Second
muxAudioTracks = 3
)
type pendingTrackInfo struct {
@@ -163,6 +167,8 @@ type ParticipantImpl struct {
trackPublisherVersion map[livekit.TrackID]uint32
supervisor *supervisor.ParticipantSupervisor
audioForwarder *audioselection.SelectionForwarder
}
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
@@ -194,6 +200,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
params.SID,
params.Telemetry),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
audioForwarder: audioselection.NewSelectionForwarder(audioselection.SelectionForwarderParams{
ActiveDowntracks: 3,
Logger: params.Logger,
ActiveLevelThreshold: audio.ConvertAudioLevel(35),
}),
}
p.version.Store(params.InitialVersion)
p.migrateState.Store(types.MigrateStateInit)
@@ -216,6 +227,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.setupUpTrackManager()
err = p.setupAudioForwarder()
if err != nil {
return nil, err
}
return p, nil
}
@@ -652,6 +668,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.TransportManager.Close()
}()
p.audioForwarder.Stop()
p.dataChannelStats.Report()
return nil
@@ -963,6 +980,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
if !isAlreadySubscribed && onSubscribedTo != nil {
onSubscribedTo(p, publisherID)
}
}
// RemoveSubscribedTrack removes a track to the participant's subscribed list
@@ -1297,6 +1315,7 @@ func (p *ParticipantImpl) onPrimaryTransportInitialConnected() {
func (p *ParticipantImpl) onPrimaryTransportFullyEstablished() {
p.updateState(livekit.ParticipantInfo_ACTIVE)
p.audioForwarder.Start()
}
func (p *ParticipantImpl) clearDisconnectTimer() {
@@ -2163,3 +2182,108 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e
return out, nil
}
func (p *ParticipantImpl) AddMuxAudioTrack(trackID livekit.TrackID, r sfu.TrackReceiver) {
p.audioForwarder.AddSource(trackID, r)
}
func (p *ParticipantImpl) RemoveMuxAudioTrack(trackID livekit.TrackID) {
p.audioForwarder.RemoveSource(trackID)
}
func (p *ParticipantImpl) setupAudioForwarder() error {
// 1. add downtracks
for i := 0; i < muxAudioTracks; i++ {
if err := p.addDowntrack(); err != nil {
p.params.Logger.Errorw("error adding downtrack", err)
return err
}
}
// 2. setup audio forwarder callbacks
p.audioForwarder.OnForwardMappingChanged(func(forwardMapping map[livekit.TrackID]livekit.TrackID) {
p.params.Logger.Debugw("forward mapping changed", "mappings", forwardMapping)
})
return nil
}
func (p *ParticipantImpl) addDowntrack() error {
codecs := []webrtc.RTPCodecParameters{
{
RTPCodecCapability: opusCodecCapability,
},
}
trackID := livekit.TrackID(utils.NewGuid(utils.TrackPrefix + "AX"))
streamID := PackStreamID(p.ID(), trackID)
downTrack, err := sfu.NewDownTrack(
codecs,
audioselection.NewNullReceiver(streamID, trackID),
p.GetBufferFactory(),
p.ID(),
p.params.Config.Receiver.PacketBufferSize,
LoggerWithTrack(p.GetLogger(), trackID, false),
)
if err != nil {
return err
}
// subTrack := NewSubscribedTrack(SubscribedTrackParams{
// PublisherID: t.params.MediaTrack.PublisherID(),
// PublisherIdentity: t.params.MediaTrack.PublisherIdentity(),
// PublisherVersion: t.params.MediaTrack.PublisherVersion(),
// Subscriber: sub,
// MediaTrack: t.params.MediaTrack,
// DownTrack: downTrack,
// AdaptiveStream: sub.GetAdaptiveStream(),
// })
// Bind callback can happen from replaceTrack, so set it up early
downTrack.OnBind(func() {
p.audioForwarder.AddDownTrack(downTrack)
// wr.DetermineReceiver(downTrack.Codec())
// if reusingTransceiver.Load() {
// downTrack.SeedState(dtState)
// }
// if err = wr.AddDownTrack(downTrack); err != nil && err != sfu.ErrReceiverClosed {
// sub.GetLogger().Errorw(
// "could not add down track", err,
// "publisher", subTrack.PublisherIdentity(),
// "publisherID", subTrack.PublisherID(),
// "trackID", trackID,
// )
// }
// go subTrack.Bound()
// subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted())
})
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
// t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat)
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
go p.UpdateRTT(rtt)
})
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
p.OnReceiverReport(dt, report)
})
sender, transceiver, err := p.AddTransceiverFromTrackToSubscriber(downTrack, types.AddTrackParams{})
if err != nil {
return err
}
sendParameters := sender.GetParameters()
downTrack.SetRTPHeaderExtensions(sendParameters.HeaderExtensions)
downTrack.SetTransceiver(transceiver)
downTrack.OnCloseHandler(func(willBeResumed bool) {
p.audioForwarder.RemoveDownTrack(downTrack)
})
return nil
}
+2
View File
@@ -267,6 +267,8 @@ type LocalParticipant interface {
UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error
GetSubscribedTracks() []SubscribedTrack
VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32)
AddMuxAudioTrack(trackID livekit.TrackID, r sfu.TrackReceiver)
RemoveMuxAudioTrack(trackID livekit.TrackID)
// returns list of participant identities that the current participant is subscribed to
GetSubscribedParticipants() []livekit.ParticipantID
+200 -11
View File
@@ -1,26 +1,215 @@
package audioselection
type AudioSource interface {
Activate()
Deactivate()
GetAudioLevel() uint8
import (
"sort"
"sync"
"time"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
const (
updateInterval = 100 * time.Millisecond
)
// type sfu.Receiver interface {
// Activate()
// Deactivate()
// GetAudioLevel() uint8
// ID() livekit.TrackID
// }
func AudioCodecCanbeMux(ti livekit.TrackInfo, codecs []webrtc.RTPCodecParameters) bool {
if len(codecs) == 0 || ti.Stereo {
return false
}
c := codecs[0]
return c.MimeType == webrtc.MimeTypeOpus
}
type SelectionForworderParams struct {
type SelectionForwarderParams struct {
ActiveDowntracks int
FadeDowntracks int
ActiveLevelThreshold float64
FadeoutTime time.Duration
FadeinTime time.Duration
Logger logger.Logger
RequestDownTrack func(sfu.TrackReceiver) *sfu.DownTrack
}
type SelectionForworder struct {
type sourceInfo struct {
trackID livekit.TrackID
receiver sfu.TrackReceiver
vad bool
active bool
audioLevel float64
downtrack *sfu.DownTrack
}
func NewSelectionForworder() *SelectionForworder {
return &SelectionForworder{}
type SelectionForwarder struct {
lock sync.RWMutex
params SelectionForwarderParams
sources []*sourceInfo
idleDowntracks []*sfu.DownTrack
downtracks []*sfu.DownTrack
close chan struct{}
onForwardMappingChanged func(forwardMapping map[livekit.TrackID]livekit.TrackID)
}
func (f *SelectionForworder) AddDownTrack() {
func NewSelectionForwarder(params SelectionForwarderParams) *SelectionForwarder {
return &SelectionForwarder{
params: params,
close: make(chan struct{}),
}
}
func (f *SelectionForworder) RemoveDownTrack() {
func (f *SelectionForwarder) Start() {
go f.process()
}
func (f *SelectionForworder) OnRequestDowntrack() {
func (f *SelectionForwarder) Stop() {
close(f.close)
}
func (f *SelectionForwarder) AddDownTrack(dt *sfu.DownTrack) {
f.lock.Lock()
f.downtracks = append(f.downtracks, dt)
f.idleDowntracks = append(f.idleDowntracks, dt)
f.lock.Unlock()
}
func (f *SelectionForwarder) RemoveDownTrack(dt *sfu.DownTrack) {
}
// OnForwardMappingChanged is called when the forward mapping is changed, used to update the relationship between downtracks and sources
func (f *SelectionForwarder) OnForwardMappingChanged(h func(forwardMapping map[livekit.TrackID]livekit.TrackID)) {
f.onForwardMappingChanged = h
}
func (f *SelectionForwarder) AddSource(trackID livekit.TrackID, source sfu.TrackReceiver) {
f.params.Logger.Debugw("adding source", "trackID", trackID)
f.lock.Lock()
f.sources = append(f.sources, &sourceInfo{trackID: trackID, receiver: source})
f.lock.Unlock()
}
func (f *SelectionForwarder) RemoveSource(trackID livekit.TrackID) {
f.lock.Lock()
for i, s := range f.sources {
if s.trackID == trackID {
if s.active {
f.deactiveSource(s)
}
f.sources[i] = f.sources[len(f.sources)-1]
f.sources = f.sources[:len(f.sources)-1]
break
}
}
f.lock.Unlock()
}
func (f *SelectionForwarder) MuteSource(source sfu.TrackReceiver, mute bool) {
}
func (f *SelectionForwarder) process() {
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.updateForward()
case <-f.close:
return
}
}
}
func (f *SelectionForwarder) updateForward() {
f.lock.Lock()
defer f.lock.Unlock()
if len(f.sources) == 0 {
return
}
for _, source := range f.sources {
source.audioLevel, _ = source.receiver.GetAudioLevel()
}
sort.Slice(f.sources, func(i, j int) bool {
return f.sources[i].audioLevel > f.sources[j].audioLevel
})
var activeteSources, idleSources []*sourceInfo
for i, source := range f.sources {
if i >= f.params.ActiveDowntracks && source.active {
idleSources = append(idleSources, source)
} else {
if source.audioLevel > f.params.ActiveLevelThreshold && !source.active {
activeteSources = append(activeteSources, source)
} else if source.audioLevel <= f.params.ActiveLevelThreshold && source.active {
idleSources = append(idleSources, source)
}
}
}
var forwardChanged bool
for _, source := range activeteSources {
if len(f.idleDowntracks) == 0 && len(idleSources) > 0 {
f.deactiveSource(idleSources[0])
idleSources = idleSources[1:]
}
if f.activeSource(source) {
forwardChanged = true
}
}
if forwardChanged && f.onForwardMappingChanged != nil {
forwardMapping := make(map[livekit.TrackID]livekit.TrackID)
for _, source := range f.sources {
if source.active && source.downtrack != nil {
forwardMapping[source.receiver.TrackID()] = livekit.TrackID(source.downtrack.ID())
}
}
f.onForwardMappingChanged(forwardMapping)
}
}
func (f *SelectionForwarder) activeSource(source *sourceInfo) bool {
f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID())
if len(f.idleDowntracks) == 0 {
if len(f.downtracks) < f.params.ActiveDowntracks {
dt := f.params.RequestDownTrack(source.receiver)
f.downtracks = append(f.downtracks, dt)
f.idleDowntracks = append(f.idleDowntracks, dt)
} else {
f.params.Logger.Warnw("no idle downtracks for active source", nil, "trackID", source.receiver.TrackID())
return false
}
}
source.active = true
source.downtrack = f.idleDowntracks[0]
f.idleDowntracks = f.idleDowntracks[1:]
source.downtrack.ResetReceiver(source.receiver)
source.receiver.AddDownTrack(source.downtrack)
return true
}
func (f *SelectionForwarder) deactiveSource(source *sourceInfo) {
f.params.Logger.Debugw("deactivate source", "trackID", source.receiver.TrackID())
source.active = false
dt := source.downtrack
source.downtrack = nil
if dt != nil {
dt.ResetReceiver(&NullReceiver{})
source.receiver.DeleteDownTrack(dt.SubscriberID())
f.idleDowntracks = append(f.idleDowntracks, dt)
}
}
+72
View File
@@ -0,0 +1,72 @@
package audioselection
import (
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/protocol/livekit"
)
type NullReceiver struct {
streamID string
trackID livekit.TrackID
}
func NewNullReceiver(streamID string, trackID livekit.TrackID) *NullReceiver {
return &NullReceiver{
streamID: streamID,
trackID: trackID,
}
}
func (r *NullReceiver) TrackID() livekit.TrackID {
return r.trackID
}
func (r *NullReceiver) StreamID() string {
return r.streamID
}
func (r *NullReceiver) Codec() webrtc.RTPCodecParameters {
return webrtc.RTPCodecParameters{}
}
func (r *NullReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter {
return nil
}
func (r *NullReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
return 0, nil
}
func (r *NullReceiver) GetLayeredBitrate() sfu.Bitrates {
return sfu.Bitrates{}
}
func (r *NullReceiver) GetAudioLevel() (float64, bool) {
return 0, false
}
func (r *NullReceiver) SendPLI(layer int32, force bool) {
}
func (r *NullReceiver) SetUpTrackPaused(paused bool) {
}
func (r *NullReceiver) SetMaxExpectedSpatialLayer(layer int32) {
}
func (r *NullReceiver) AddDownTrack(track sfu.TrackSender) error {
return nil
}
func (r *NullReceiver) DeleteDownTrack(participantID livekit.ParticipantID) {
}
func (r *NullReceiver) DebugInfo() map[string]interface{} {
return nil
}
func (r *NullReceiver) GetLayerDimension(layer int32) (uint32, uint32) {
return 0, 0
}
func (r *NullReceiver) TrackInfo() *livekit.TrackInfo {
return &livekit.TrackInfo{}
}
func (r *NullReceiver) GetPrimaryReceiverForRed() sfu.TrackReceiver {
return r
}
func (r *NullReceiver) GetRedReceiver() sfu.TrackReceiver {
return r
}
func (r *NullReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
return nil
}
+25 -12
View File
@@ -236,11 +236,11 @@ func NewDownTrack(
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
}
d.receiver = r
d.forwarder = NewForwarder(d.kind, d.logger)
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
@@ -271,6 +271,19 @@ func NewDownTrack(
return d, nil
}
func (d *DownTrack) ResetReceiver(r TrackReceiver) {
d.bindLock.Lock()
d.receiver = r
d.bindLock.Unlock()
// TODO: log stats
}
func (d *DownTrack) getReceiver() TrackReceiver {
d.bindLock.Lock()
defer d.bindLock.Unlock()
return d.receiver
}
// Bind is called by the PeerConnection after negotiation is complete
// This asserts that the code requested is supported by the remote peer.
// If so it sets up all the state (SSRC and PayloadType) to have a call
@@ -340,7 +353,7 @@ func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error {
}
func (d *DownTrack) TrackInfoAvailable() {
d.connectionStats.Start(d.receiver.TrackInfo())
d.connectionStats.Start(d.getReceiver().TrackInfo())
}
// ID is the unique identifier for this Track. This should be unique for the
@@ -431,7 +444,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
for {
if d.connected.Load() {
d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer)
d.receiver.SendPLI(layer, false)
d.getReceiver().SendPLI(layer, false)
d.rtpStats.UpdateLayerLockPliAndTime(1)
}
@@ -873,7 +886,7 @@ func (d *DownTrack) IsDeficient() bool {
}
func (d *DownTrack) BandwidthRequested() int64 {
return d.forwarder.BandwidthRequested(d.receiver.GetLayeredBitrate())
return d.forwarder.BandwidthRequested(d.getReceiver().GetLayeredBitrate())
}
func (d *DownTrack) DistanceToDesired() int32 {
@@ -881,13 +894,13 @@ func (d *DownTrack) DistanceToDesired() int32 {
}
func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation {
allocation := d.forwarder.AllocateOptimal(d.receiver.GetLayeredBitrate(), allowOvershoot)
allocation := d.forwarder.AllocateOptimal(d.getReceiver().GetLayeredBitrate(), allowOvershoot)
d.maybeStartKeyFrameRequester()
return allocation
}
func (d *DownTrack) ProvisionalAllocatePrepare() {
d.forwarder.ProvisionalAllocatePrepare(d.receiver.GetLayeredBitrate())
d.forwarder.ProvisionalAllocatePrepare(d.getReceiver().GetLayeredBitrate())
}
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, allowOvershoot bool) int64 {
@@ -913,19 +926,19 @@ func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
}
func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool) {
allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.receiver.GetLayeredBitrate(), allowOvershoot)
allocation, available := d.forwarder.AllocateNextHigher(availableChannelCapacity, d.getReceiver().GetLayeredBitrate(), allowOvershoot)
d.maybeStartKeyFrameRequester()
return allocation, available
}
func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool) {
transition, available := d.forwarder.GetNextHigherTransition(d.receiver.GetLayeredBitrate(), allowOvershoot)
transition, available := d.forwarder.GetNextHigherTransition(d.getReceiver().GetLayeredBitrate(), allowOvershoot)
d.logger.Debugw("stream: get next higher layer", "transition", transition, "available", available)
return transition, available
}
func (d *DownTrack) Pause() VideoAllocation {
allocation := d.forwarder.Pause(d.receiver.GetLayeredBitrate())
allocation := d.forwarder.Pause(d.getReceiver().GetLayeredBitrate())
d.maybeStartKeyFrameRequester()
return allocation
}
@@ -1128,7 +1141,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
targetLayers := d.forwarder.TargetLayers()
if targetLayers != InvalidLayers {
d.logger.Debugw("sending PLI RTCP", "layer", targetLayers.Spatial)
d.receiver.SendPLI(targetLayers.Spatial, false)
d.getReceiver().SendPLI(targetLayers.Spatial, false)
d.isNACKThrottled.Store(true)
d.rtpStats.UpdatePliTime()
pliOnce = false
@@ -1219,7 +1232,7 @@ func (d *DownTrack) SetConnected() {
if d.bound.Load() && d.kind == webrtc.RTPCodecTypeVideo {
targetLayers := d.forwarder.TargetLayers()
if targetLayers != InvalidLayers {
d.receiver.SendPLI(targetLayers.Spatial, true)
d.getReceiver().SendPLI(targetLayers.Spatial, true)
}
}
}
@@ -1266,7 +1279,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
}
pktBuff := *src
n, err := d.receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo)
n, err := d.getReceiver().ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo)
if err != nil {
if err == io.EOF {
break