diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 7cb9504b1..d8cf2daee 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -43,6 +43,9 @@ const ( iceFailedTimeout = 25 * time.Second // pion's default iceKeepaliveInterval = 2 * time.Second // pion's default + minTcpICEConnectTimeout = 5 * time.Second + maxTcpICEConnectTimeout = 12 * time.Second // js-sdk has a default 15s timeout for first connection, let server detect failure earlier before that + maxConnectTimeoutAfterICE = 20 * time.Second // max duration for waiting pc to connect after ICE is connected shortConnectionThreshold = 90 * time.Second @@ -159,8 +162,10 @@ type PCTransport struct { iceConnectedAt time.Time firstConnectedAt time.Time connectedAt time.Time + tcpICETimer *time.Timer connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected resetShortConnOnICERestart atomic.Bool + signalingRTT atomic.Uint32 // milliseconds onFullyEstablished func() @@ -403,10 +408,34 @@ func (t *PCTransport) createPeerConnection() error { return nil } +func (t *PCTransport) SetSignalingRTT(rtt uint32) { + t.signalingRTT.Store(rtt) +} + func (t *PCTransport) setICEStartedAt(at time.Time) { t.lock.Lock() if t.iceStartedAt.IsZero() { t.iceStartedAt = at + + // set failure timer for tcp ice connection based on signaling RTT + if t.preferTCP.Load() { + signalingRTT := t.signalingRTT.Load() + if signalingRTT < 1000 { + tcpICETimeout := time.Duration(signalingRTT*8) * time.Millisecond + if tcpICETimeout < minTcpICEConnectTimeout { + tcpICETimeout = minTcpICEConnectTimeout + } else if tcpICETimeout > maxTcpICEConnectTimeout { + tcpICETimeout = maxTcpICEConnectTimeout + } + t.params.Logger.Debugw("set tcp ice connect timer", "timeout", tcpICETimeout, "signalRTT", signalingRTT) + t.tcpICETimer = time.AfterFunc(tcpICETimeout, func() { + if t.pc.ICEConnectionState() == webrtc.ICEConnectionStateChecking { + t.params.Logger.Infow("tcp ice connect timeout", "timeout", tcpICETimeout, "signalRTT", signalingRTT) + t.handleConnectionFailed(true) + } + }) + } + } } t.lock.Unlock() } @@ -438,9 +467,15 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) { if iceState == webrtc.ICEConnectionStateConnected && state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && !t.isFullyEstablished() { t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration) - t.handleConnectionFailed() + t.handleConnectionFailed(false) } }) + + // clear tcp ice connect timer + if t.tcpICETimer != nil { + t.tcpICETimer.Stop() + t.tcpICETimer = nil + } } t.lock.Unlock() } @@ -455,6 +490,10 @@ func (t *PCTransport) resetShortConn() { t.connectAfterICETimer.Stop() t.connectAfterICETimer = nil } + if t.tcpICETimer != nil { + t.tcpICETimer.Stop() + t.tcpICETimer = nil + } t.lock.Unlock() } @@ -527,15 +566,21 @@ func (t *PCTransport) onICECandidateTrickle(c *webrtc.ICECandidate) { }) } -func (t *PCTransport) handleConnectionFailed() { - isShort, duration := t.isShortConnection(time.Now()) - if isShort { - pair, err := t.getSelectedPair() - if err != nil { - t.params.Logger.Errorw("short ICE connection", err, "duration", duration) - } else { - t.params.Logger.Infow("short ICE connection", "pair", pair, "duration", duration) +func (t *PCTransport) handleConnectionFailed(forceShortConn bool) { + isShort := forceShortConn + if !isShort { + var duration time.Duration + isShort, duration = t.isShortConnection(time.Now()) + if isShort { + pair, err := t.getSelectedPair() + if err != nil { + t.params.Logger.Errorw("short ICE connection", err, "duration", duration) + } else { + t.params.Logger.Infow("short ICE connection", "pair", pair, "duration", duration) + } } + } else { + t.params.Logger.Infow("force short ICE connection") } if onFailed := t.getOnFailed(); onFailed != nil { @@ -563,7 +608,7 @@ func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionSta t.params.Logger.Debugw("peer connection state change", "state", state.String()) switch state { case webrtc.PeerConnectionStateConnected: - t.clearConnTimerAfterICE() + t.clearConnTimer() isInitialConnection := t.setConnectedAt(time.Now()) if isInitialConnection { if onInitialConnected := t.getOnInitialConnected(); onInitialConnected != nil { @@ -574,9 +619,9 @@ func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionSta } case webrtc.PeerConnectionStateFailed: t.params.Logger.Infow("peer connection failed") - t.clearConnTimerAfterICE() + t.clearConnTimer() t.logICECandidates() - t.handleConnectionFailed() + t.handleConnectionFailed(false) } } @@ -853,16 +898,20 @@ func (t *PCTransport) Close() { _ = t.pc.Close() - t.clearConnTimerAfterICE() + t.clearConnTimer() } -func (t *PCTransport) clearConnTimerAfterICE() { +func (t *PCTransport) clearConnTimer() { t.lock.Lock() defer t.lock.Unlock() if t.connectAfterICETimer != nil { t.connectAfterICETimer.Stop() t.connectAfterICETimer = nil } + if t.tcpICETimer != nil { + t.tcpICETimer.Stop() + t.tcpICETimer = nil + } } func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 44d3cdd83..381b8224a 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -698,6 +698,8 @@ func (t *TransportManager) onMediaLossUpdate(loss uint8) { func (t *TransportManager) UpdateSignalingRTT(rtt uint32) { t.signalingRTT = rtt + t.publisher.SetSignalingRTT(rtt) + t.subscriber.SetSignalingRTT(rtt) // TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established // within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp.