experiment fallback to tcp when udp unstable (#1119)

* fallback to tcp when udp unstable
This commit is contained in:
cnderrauber
2022-10-31 09:40:20 +08:00
committed by GitHub
parent 0aa8684f1c
commit 5edb42a9fd
5 changed files with 121 additions and 4 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"sync"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
"go.uber.org/atomic"
@@ -168,6 +169,10 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
go sub.UpdateRTT(rtt)
})
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
sub.OnReceiverReport(dt, report)
})
var transceiver *webrtc.RTPTransceiver
var sender *webrtc.RTPSender

View File

@@ -412,6 +412,22 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
p.TransportManager.HandleOffer(offer, shouldPend)
}
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
p.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER)
/* from server received join request to client answer
* 1. server send join response & offer
* ... swap candidates
* 2. client send answer
*/
signalConnCost := time.Since(p.ConnectedAt()).Milliseconds()
p.TransportManager.UpdateRTT(uint32(signalConnCost), false)
p.TransportManager.HandleAnswer(answer)
}
func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error {
p.params.Logger.Infow("sending answer", "transport", livekit.SignalTarget_PUBLISHER)
answer = p.configurePublisherAnswer(answer)
@@ -990,6 +1006,7 @@ func (p *ParticipantImpl) UpdateRTT(rtt uint32) {
p.rttUpdatedAt = now
p.lastRTT = rtt
p.lock.Unlock()
p.TransportManager.UpdateRTT(rtt, true)
for _, pt := range p.GetPublishedTracks() {
pt.(types.LocalMediaTrack).SetRTT(rtt)

View File

@@ -1,6 +1,7 @@
package rtc
import (
"math/bits"
"strings"
"sync"
@@ -20,6 +21,12 @@ import (
const (
failureCountThreshold = 2
// when RR report loss percentage over this threshold, we consider it is a unstable event
udpLossFracUnstable = 25
// if in last 32 times RR, the unstable report count over this threshold, the connection is unstable
udpLossUnstableCountThreshold = 20
tcpGoodRTT = 80
)
type TransportManagerParams struct {
@@ -56,6 +63,10 @@ type TransportManager struct {
lastPublisherOffer atomic.Value
iceConfig types.IceConfig
mediaLossProxy *MediaLossProxy
udpLossUnstableCount uint32
tcpRTT, udpRTT uint32
onPublisherInitialConnected func()
onSubscriberInitialConnected func()
onPrimaryTransportInitialConnected func()
@@ -66,8 +77,10 @@ type TransportManager struct {
func NewTransportManager(params TransportManagerParams) (*TransportManager, error) {
t := &TransportManager{
params: params,
params: params,
mediaLossProxy: NewMediaLossProxy(MediaLossProxyParams{Logger: params.Logger}),
}
t.mediaLossProxy.OnMediaLossUpdate(t.onMediaLossUpdate)
enabledCodecs := make([]*livekit.Codec, 0, len(params.EnabledCodecs))
for _, c := range params.EnabledCodecs {
@@ -397,10 +410,7 @@ func (t *TransportManager) ProcessPendingPublisherOffer() {
}
}
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (t *TransportManager) HandleAnswer(answer webrtc.SessionDescription) {
t.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER)
t.subscriber.HandleRemoteDescription(answer)
}
@@ -453,6 +463,10 @@ func (t *TransportManager) configureICE(iceConfig types.IceConfig, reset bool) {
t.isTransportReconfigured = !reset
t.lock.Unlock()
if iceConfig.PreferSub != types.PreferNone {
t.mediaLossProxy.OnMediaLossUpdate(nil)
}
t.publisher.SetPreferTCP(iceConfig.PreferPub == types.PreferTcp)
t.subscriber.SetPreferTCP(iceConfig.PreferSub == types.PreferTcp)
@@ -609,3 +623,40 @@ func (t *TransportManager) ProcessPendingPublisherDataChannels() {
}
}
}
func (t *TransportManager) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
t.mediaLossProxy.HandleMaxLossFeedback(dt, report)
}
func (t *TransportManager) onMediaLossUpdate(loss uint8) {
t.lock.Lock()
t.udpLossUnstableCount <<= 1
if loss >= uint8(255*udpLossFracUnstable/100) {
t.udpLossUnstableCount |= 1
if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold {
if t.udpRTT > 0 && t.tcpRTT < uint32(float32(t.udpRTT)*1.3) && t.tcpRTT < tcpGoodRTT {
t.udpLossUnstableCount = 0
t.lock.Unlock()
t.params.Logger.Infow("udp connection unstable, switch to tcp")
t.handleConnectionFailed(true)
if t.onAnyTransportFailed != nil {
t.onAnyTransportFailed()
}
return
}
}
}
t.lock.Unlock()
}
func (t *TransportManager) UpdateRTT(rtt uint32, isUDP bool) {
if isUDP {
if t.udpRTT == 0 {
t.udpRTT = rtt
} else {
t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2)
}
} else {
t.tcpRTT = rtt
}
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/livekit/protocol/auth"
@@ -303,6 +304,7 @@ type LocalParticipant interface {
OnSubscribedTo(callback func(LocalParticipant, livekit.ParticipantID))
OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
// session migration
MaybeStartMigration(force bool, onStart func()) bool

View File

@@ -11,6 +11,7 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
webrtc "github.com/pion/webrtc/v3"
)
@@ -434,6 +435,12 @@ type FakeLocalParticipant struct {
onParticipantUpdateArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport)
onReceiverReportMutex sync.RWMutex
onReceiverReportArgsForCall []struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}
OnStateChangeStub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State))
onStateChangeMutex sync.RWMutex
onStateChangeArgsForCall []struct {
@@ -2976,6 +2983,39 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) {
fake.onReceiverReportMutex.Lock()
fake.onReceiverReportArgsForCall = append(fake.onReceiverReportArgsForCall, struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}{arg1, arg2})
stub := fake.OnReceiverReportStub
fake.recordInvocation("OnReceiverReport", []interface{}{arg1, arg2})
fake.onReceiverReportMutex.Unlock()
if stub != nil {
fake.OnReceiverReportStub(arg1, arg2)
}
}
func (fake *FakeLocalParticipant) OnReceiverReportCallCount() int {
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
return len(fake.onReceiverReportArgsForCall)
}
func (fake *FakeLocalParticipant) OnReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) {
fake.onReceiverReportMutex.Lock()
defer fake.onReceiverReportMutex.Unlock()
fake.OnReceiverReportStub = stub
}
func (fake *FakeLocalParticipant) OnReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) {
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
argsForCall := fake.onReceiverReportArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) {
fake.onStateChangeMutex.Lock()
fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct {
@@ -4807,6 +4847,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onICEConfigChangedMutex.RUnlock()
fake.onParticipantUpdateMutex.RLock()
defer fake.onParticipantUpdateMutex.RUnlock()
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
fake.onStateChangeMutex.RLock()
defer fake.onStateChangeMutex.RUnlock()
fake.onSubscribedToMutex.RLock()