From 5edb42a9fd4e95e01bb004b07d687a1d7cc926ef Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 31 Oct 2022 09:40:20 +0800 Subject: [PATCH] experiment fallback to tcp when udp unstable (#1119) * fallback to tcp when udp unstable --- pkg/rtc/mediatracksubscriptions.go | 5 ++ pkg/rtc/participant.go | 17 ++++++ pkg/rtc/transportmanager.go | 59 +++++++++++++++++-- pkg/rtc/types/interfaces.go | 2 + .../typesfakes/fake_local_participant.go | 42 +++++++++++++ 5 files changed, 121 insertions(+), 4 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 7b665b760..0b797a294 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/rtcerr" "go.uber.org/atomic" @@ -168,6 +169,10 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * go sub.UpdateRTT(rtt) }) + downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { + sub.OnReceiverReport(dt, report) + }) + var transceiver *webrtc.RTPTransceiver var sender *webrtc.RTPSender diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 2b660055c..376ddc0c9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -412,6 +412,22 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { p.TransportManager.HandleOffer(offer, shouldPend) } +// HandleAnswer handles a client answer response, with subscriber PC, server initiates the +// offer and client answers +func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { + p.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) + + /* from server received join request to client answer + * 1. server send join response & offer + * ... swap candidates + * 2. client send answer + */ + signalConnCost := time.Since(p.ConnectedAt()).Milliseconds() + p.TransportManager.UpdateRTT(uint32(signalConnCost), false) + + p.TransportManager.HandleAnswer(answer) +} + func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error { p.params.Logger.Infow("sending answer", "transport", livekit.SignalTarget_PUBLISHER) answer = p.configurePublisherAnswer(answer) @@ -990,6 +1006,7 @@ func (p *ParticipantImpl) UpdateRTT(rtt uint32) { p.rttUpdatedAt = now p.lastRTT = rtt p.lock.Unlock() + p.TransportManager.UpdateRTT(rtt, true) for _, pt := range p.GetPublishedTracks() { pt.(types.LocalMediaTrack).SetRTT(rtt) diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 5446fc7e5..19effc63c 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -1,6 +1,7 @@ package rtc import ( + "math/bits" "strings" "sync" @@ -20,6 +21,12 @@ import ( const ( failureCountThreshold = 2 + + // when RR report loss percentage over this threshold, we consider it is a unstable event + udpLossFracUnstable = 25 + // if in last 32 times RR, the unstable report count over this threshold, the connection is unstable + udpLossUnstableCountThreshold = 20 + tcpGoodRTT = 80 ) type TransportManagerParams struct { @@ -56,6 +63,10 @@ type TransportManager struct { lastPublisherOffer atomic.Value iceConfig types.IceConfig + mediaLossProxy *MediaLossProxy + udpLossUnstableCount uint32 + tcpRTT, udpRTT uint32 + onPublisherInitialConnected func() onSubscriberInitialConnected func() onPrimaryTransportInitialConnected func() @@ -66,8 +77,10 @@ type TransportManager struct { func NewTransportManager(params TransportManagerParams) (*TransportManager, error) { t := &TransportManager{ - params: params, + params: params, + mediaLossProxy: NewMediaLossProxy(MediaLossProxyParams{Logger: params.Logger}), } + t.mediaLossProxy.OnMediaLossUpdate(t.onMediaLossUpdate) enabledCodecs := make([]*livekit.Codec, 0, len(params.EnabledCodecs)) for _, c := range params.EnabledCodecs { @@ -397,10 +410,7 @@ func (t *TransportManager) ProcessPendingPublisherOffer() { } } -// HandleAnswer handles a client answer response, with subscriber PC, server initiates the -// offer and client answers func (t *TransportManager) HandleAnswer(answer webrtc.SessionDescription) { - t.params.Logger.Infow("received answer", "transport", livekit.SignalTarget_SUBSCRIBER) t.subscriber.HandleRemoteDescription(answer) } @@ -453,6 +463,10 @@ func (t *TransportManager) configureICE(iceConfig types.IceConfig, reset bool) { t.isTransportReconfigured = !reset t.lock.Unlock() + if iceConfig.PreferSub != types.PreferNone { + t.mediaLossProxy.OnMediaLossUpdate(nil) + } + t.publisher.SetPreferTCP(iceConfig.PreferPub == types.PreferTcp) t.subscriber.SetPreferTCP(iceConfig.PreferSub == types.PreferTcp) @@ -609,3 +623,40 @@ func (t *TransportManager) ProcessPendingPublisherDataChannels() { } } } + +func (t *TransportManager) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { + t.mediaLossProxy.HandleMaxLossFeedback(dt, report) +} + +func (t *TransportManager) onMediaLossUpdate(loss uint8) { + t.lock.Lock() + t.udpLossUnstableCount <<= 1 + if loss >= uint8(255*udpLossFracUnstable/100) { + t.udpLossUnstableCount |= 1 + if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold { + if t.udpRTT > 0 && t.tcpRTT < uint32(float32(t.udpRTT)*1.3) && t.tcpRTT < tcpGoodRTT { + t.udpLossUnstableCount = 0 + t.lock.Unlock() + t.params.Logger.Infow("udp connection unstable, switch to tcp") + t.handleConnectionFailed(true) + if t.onAnyTransportFailed != nil { + t.onAnyTransportFailed() + } + return + } + } + } + t.lock.Unlock() +} + +func (t *TransportManager) UpdateRTT(rtt uint32, isUDP bool) { + if isUDP { + if t.udpRTT == 0 { + t.udpRTT = rtt + } else { + t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2) + } + } else { + t.tcpRTT = rtt + } +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 92a1d605b..79e9649b1 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/livekit/protocol/auth" @@ -303,6 +304,7 @@ type LocalParticipant interface { OnSubscribedTo(callback func(LocalParticipant, livekit.ParticipantID)) OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) OnClaimsChanged(callback func(LocalParticipant)) + OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) // session migration MaybeStartMigration(force bool, onStart func()) bool diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index a963fe020..33d8e9b63 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -11,6 +11,7 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/pion/rtcp" webrtc "github.com/pion/webrtc/v3" ) @@ -434,6 +435,12 @@ type FakeLocalParticipant struct { onParticipantUpdateArgsForCall []struct { arg1 func(types.LocalParticipant) } + OnReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport) + onReceiverReportMutex sync.RWMutex + onReceiverReportArgsForCall []struct { + arg1 *sfu.DownTrack + arg2 *rtcp.ReceiverReport + } OnStateChangeStub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) onStateChangeMutex sync.RWMutex onStateChangeArgsForCall []struct { @@ -2976,6 +2983,39 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) { + fake.onReceiverReportMutex.Lock() + fake.onReceiverReportArgsForCall = append(fake.onReceiverReportArgsForCall, struct { + arg1 *sfu.DownTrack + arg2 *rtcp.ReceiverReport + }{arg1, arg2}) + stub := fake.OnReceiverReportStub + fake.recordInvocation("OnReceiverReport", []interface{}{arg1, arg2}) + fake.onReceiverReportMutex.Unlock() + if stub != nil { + fake.OnReceiverReportStub(arg1, arg2) + } +} + +func (fake *FakeLocalParticipant) OnReceiverReportCallCount() int { + fake.onReceiverReportMutex.RLock() + defer fake.onReceiverReportMutex.RUnlock() + return len(fake.onReceiverReportArgsForCall) +} + +func (fake *FakeLocalParticipant) OnReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) { + fake.onReceiverReportMutex.Lock() + defer fake.onReceiverReportMutex.Unlock() + fake.OnReceiverReportStub = stub +} + +func (fake *FakeLocalParticipant) OnReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) { + fake.onReceiverReportMutex.RLock() + defer fake.onReceiverReportMutex.RUnlock() + argsForCall := fake.onReceiverReportArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) { fake.onStateChangeMutex.Lock() fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct { @@ -4807,6 +4847,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onICEConfigChangedMutex.RUnlock() fake.onParticipantUpdateMutex.RLock() defer fake.onParticipantUpdateMutex.RUnlock() + fake.onReceiverReportMutex.RLock() + defer fake.onReceiverReportMutex.RUnlock() fake.onStateChangeMutex.RLock() defer fake.onStateChangeMutex.RUnlock() fake.onSubscribedToMutex.RLock()