From fe4da84ee9b2f7a8a9141f0ac9b8a69751aff557 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 4 Jan 2023 16:01:04 +0800 Subject: [PATCH] send padding packets of muted uptrack for go sdk (#1283) --- pkg/rtc/clientinfo.go | 9 +++++ pkg/rtc/participant.go | 4 +++ pkg/sfu/downtrack.go | 67 +++++++++++++++++++++++++++++++------- pkg/sfu/streamallocator.go | 2 +- 4 files changed, 70 insertions(+), 12 deletions(-) diff --git a/pkg/rtc/clientinfo.go b/pkg/rtc/clientinfo.go index 1c3b367ff..04cdc26bb 100644 --- a/pkg/rtc/clientinfo.go +++ b/pkg/rtc/clientinfo.go @@ -19,6 +19,10 @@ func (c ClientInfo) isSafari() bool { return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Browser, "safari") } +func (c ClientInfo) isGo() bool { + return c.ClientInfo != nil && c.ClientInfo.Sdk == livekit.ClientInfo_GO +} + func (c ClientInfo) SupportsAudioRED() bool { return !c.isFirefox() && !c.isSafari() } @@ -27,6 +31,11 @@ func (c ClientInfo) SupportPrflxOverRelay() bool { return !c.isFirefox() } +// GoSDK(pion) relies on rtp packets to fire ontrack event, browsers and native (libwebrtc) rely on sdp +func (c ClientInfo) FireTrackByRTPPacket() bool { + return c.isGo() +} + // CompareVersion compares two semver versions // returning 1 if current version is greater than version // 0 if they are the same, and -1 if it's an earlier version diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 953071f34..c3110f103 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -970,6 +970,10 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack, sou settings := p.subscribedTracksSettings[subTrack.ID()] p.lock.Unlock() + if p.params.ClientInfo.FireTrackByRTPPacket() { + subTrack.DownTrack().SetActivePaddingOnMuteUpTrack() + } + subTrack.OnBind(func() { if p.TransportManager.HasSubscriberEverConnected() { subTrack.DownTrack().SetConnected() diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 42c664ed2..5aabf2e69 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -51,6 +51,10 @@ const ( flushTimeout = 1 * time.Second maxPadding = 2000 + + waitBeforeSendPaddingOnMute = 100 * time.Millisecond + paddingOnMuteInterval = 100 * time.Millisecond + maxPaddingOnMute = 50 ) var ( @@ -159,6 +163,7 @@ type DownTrack struct { listenerLock sync.RWMutex isClosed atomic.Bool connected atomic.Bool + bindAndConnectedOnce atomic.Bool rtpStats *buffer.RTPStats @@ -178,6 +183,8 @@ type DownTrack struct { isNACKThrottled atomic.Bool + activePaddingOnMuteUpTrack atomic.Bool + // RTCP callbacks onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC) @@ -319,15 +326,16 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.sequencer = newSequencer(d.maxTrack, maxPadding, d.logger) } + d.bound.Store(true) d.codec = codec.RTPCodecCapability d.forwarder.DetermineCodec(d.codec) if d.onBind != nil { d.onBind() } - d.bound.Store(true) d.bindLock.Unlock() d.logger.Debugw("downtrack bound") + d.onBindAndConnected() return codec, nil } @@ -419,6 +427,9 @@ func (d *DownTrack) stopKeyFrameRequester() { } func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { + if d.IsClosed() { + return + } interval := 2 * d.rtpStats.GetRtt() if interval < keyFrameIntervalMin { interval = keyFrameIntervalMin @@ -542,8 +553,8 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack -func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { - if !d.rtpStats.IsActive() { +func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { + if !d.rtpStats.IsActive() && !paddingOnMute { return 0 } @@ -563,7 +574,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { // can be sent only on frame boundaries, writing on disabled tracks // will give more options. // LK-TODO-END - if d.forwarder.IsMuted() { + if d.forwarder.IsMuted() && !paddingOnMute { return 0 } @@ -618,7 +629,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { for _, f := range d.onPaddingSent { f(d, size) } - d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano()) + + if !paddingOnMute { + d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano()) + } // // Register with sequencer with invalid layer so that NACKs for these can be filtered out. @@ -1216,15 +1230,17 @@ func (d *DownTrack) handleRTCP(bytes []byte) { func (d *DownTrack) SetConnected() { if !d.connected.Swap(true) { - if d.bound.Load() && d.kind == webrtc.RTPCodecTypeVideo { - targetLayers := d.forwarder.TargetLayers() - if targetLayers != InvalidLayers { - d.receiver.SendPLI(targetLayers.Spatial, true) - } - } + d.onBindAndConnected() } } +// SetActivePaddingOnMuteUpTrack will enable padding on the track when its uptrack is muted. +// Pion will not fire OnTrack event until it receives packet for the track, +// so we send padding packets to help pion client (go-sdk) to fire the event. +func (d *DownTrack) SetActivePaddingOnMuteUpTrack() { + d.activePaddingOnMuteUpTrack.Store(true) +} + func (d *DownTrack) retransmitPackets(nacks []uint16) { if d.sequencer == nil { return @@ -1478,3 +1494,32 @@ func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint return } + +func (d *DownTrack) onBindAndConnected() { + if d.connected.Load() && d.bound.Load() && d.kind == webrtc.RTPCodecTypeVideo && !d.bindAndConnectedOnce.Swap(true) { + targetLayers := d.forwarder.TargetLayers() + if targetLayers != InvalidLayers { + d.receiver.SendPLI(targetLayers.Spatial, true) + } + + if d.activePaddingOnMuteUpTrack.Load() { + go d.sendPaddingOnMute() + } + } +} + +func (d *DownTrack) sendPaddingOnMute() { + d.logger.Debugw("sending padding on mute") + // let uptrack have chance to send packet before we send padding + time.Sleep(waitBeforeSendPaddingOnMute) + + for i := 0; i < maxPaddingOnMute; i++ { + if d.rtpStats.IsActive() || d.IsClosed() { + return + } + + d.WritePaddingRTP(20, true) + + time.Sleep(paddingOnMuteInterval) + } +} diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index dc9063be6..f4a995fab 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -1351,7 +1351,7 @@ func (t *Track) SetMaxLayers(layers VideoLayers) bool { } func (t *Track) WritePaddingRTP(bytesToSend int) int { - return t.downTrack.WritePaddingRTP(bytesToSend) + return t.downTrack.WritePaddingRTP(bytesToSend, false) } func (t *Track) AllocateOptimal(allowOvershoot bool) VideoAllocation {