From d8cf54399fe69b1bbb8ddea818f2c7e1b0735387 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 22 Apr 2025 11:42:05 +0530 Subject: [PATCH] Determine TURN connection type and no fallback for TURN/TLS. (#3612) --- pkg/rtc/room.go | 2 +- pkg/rtc/transport.go | 4 ++ pkg/rtc/transportmanager.go | 45 +++++++++++++++--- pkg/rtc/types/ice.go | 95 ++++++++++++++++++++++++++++++------- 4 files changed, 122 insertions(+), 24 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 06618d858..9797a0b07 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -463,7 +463,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me infos := p.GetICEConnectionInfo() for _, info := range infos { if info.Type != types.ICEConnectionTypeUnknown { - meta.ConnectionType = string(info.Type) + meta.ConnectionType = info.Type.String() break } } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index bb7e4b32c..9ec024dc3 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1135,6 +1135,10 @@ func (t *PCTransport) GetICEConnectionInfo() *types.ICEConnectionInfo { return t.connectionDetails.GetInfo() } +func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType { + return t.connectionDetails.GetConnectionType() +} + func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error { return t.pc.WriteRTCP(pkts) } diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 68cc05b60..b6a0cc7ef 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -623,6 +623,14 @@ func (t *TransportManager) getTransport(isPrimary bool) *PCTransport { return pcTransport } +func (t *TransportManager) getLowestPriorityConnectionType() types.ICEConnectionType { + ctype := t.publisher.GetICEConnectionType() + if stype := t.subscriber.GetICEConnectionType(); stype > ctype { + ctype = stype + } + return ctype +} + func (t *TransportManager) handleConnectionFailed(isShortLived bool) { if !t.params.AllowTCPFallback || t.params.UseOneShotSignallingMode { return @@ -650,6 +658,8 @@ func (t *TransportManager) handleConnectionFailed(isShortLived bool) { return } + lowestPriorityConnectionType := t.getLowestPriorityConnectionType() + // // Checking only `PreferenceSubscriber` field although any connection failure (PUBLISHER OR SUBSCRIBER) will // flow through here. @@ -657,13 +667,36 @@ func (t *TransportManager) handleConnectionFailed(isShortLived bool) { // As both transports are switched to the same type on any failure, checking just subscriber should be fine. // getNext := func(ic *livekit.ICEConfig) livekit.ICECandidateType { - if ic.PreferenceSubscriber == livekit.ICECandidateType_ICT_NONE && t.params.ClientInfo.SupportsICETCP() && t.canUseICETCP() { - return livekit.ICECandidateType_ICT_TCP - } else if ic.PreferenceSubscriber != livekit.ICECandidateType_ICT_TLS && t.params.TURNSEnabled { - return livekit.ICECandidateType_ICT_TLS - } else { - return livekit.ICECandidateType_ICT_NONE + switch lowestPriorityConnectionType { + case types.ICEConnectionTypeUDP: + // try ICE/TCP if ICE/UDP failed + if ic.PreferenceSubscriber == livekit.ICECandidateType_ICT_NONE { + if t.params.ClientInfo.SupportsICETCP() && t.canUseICETCP() { + return livekit.ICECandidateType_ICT_TCP + } else if t.params.TURNSEnabled { + // fallback to TURN/TLS if TCP is not supported + return livekit.ICECandidateType_ICT_TLS + } + } + + case types.ICEConnectionTypeTCP: + // try TURN/TLS if ICE/TCP failed, + // the configuration could have been ICT_NONE or ICT_TCP, + // in either case, fallback to TURN/TLS + if t.params.TURNSEnabled { + return livekit.ICECandidateType_ICT_TLS + } else { + // keep the current config + return ic.PreferenceSubscriber + } + + case types.ICEConnectionTypeTURN: + // TURN/TLS is the most permissive option, if that fails there is nowhere to go to + // the configuration could have been ICT_NONE or ICT_TLS, + // keep the current config + return ic.PreferenceSubscriber } + return livekit.ICECandidateType_ICT_NONE } var preferNext livekit.ICECandidateType diff --git a/pkg/rtc/types/ice.go b/pkg/rtc/types/ice.go index 3deede93a..27ea3f1d1 100644 --- a/pkg/rtc/types/ice.go +++ b/pkg/rtc/types/ice.go @@ -29,15 +29,38 @@ import ( "github.com/livekit/protocol/logger" ) -type ICEConnectionType string +type ICEConnectionType int const ( - ICEConnectionTypeUDP ICEConnectionType = "udp" - ICEConnectionTypeTCP ICEConnectionType = "tcp" - ICEConnectionTypeTURN ICEConnectionType = "turn" - ICEConnectionTypeUnknown ICEConnectionType = "unknown" + // this is in ICE priority highest -> lowest ordering + // WARNING: Keep this ordering as it is used to find lowest priority connection type. + ICEConnectionTypeUnknown ICEConnectionType = iota + ICEConnectionTypeUDP + ICEConnectionTypeTCP + ICEConnectionTypeTURN ) +func (i ICEConnectionType) String() string { + switch i { + case ICEConnectionTypeUnknown: + return "unknown" + + case ICEConnectionTypeUDP: + return "udp" + + case ICEConnectionTypeTCP: + return "tcp" + + case ICEConnectionTypeTURN: + return "turn" + + default: + return "unknown" + } +} + +// -------------------------------------------- + type ICECandidateExtended struct { // only one of local or remote is set. This is due to type foo in Pion Local *webrtc.ICECandidate @@ -108,6 +131,13 @@ func (d *ICEConnectionDetails) GetInfo() *ICEConnectionInfo { return info } +func (d *ICEConnectionDetails) GetConnectionType() ICEConnectionType { + d.lock.Lock() + defer d.lock.Unlock() + + return d.Type +} + func (d *ICEConnectionDetails) AddLocalCandidate(c *webrtc.ICECandidate, filtered, trickle bool) { d.lock.Lock() defer d.lock.Unlock() @@ -166,6 +196,7 @@ func (d *ICEConnectionDetails) AddRemoteICECandidate(candidate ice.Candidate, fi Filtered: filtered, Trickle: trickle, }) + d.updateConnectionTypeLocked() } func (d *ICEConnectionDetails) Clear() { @@ -198,12 +229,12 @@ func (d *ICEConnectionDetails) SetSelectedPair(pair *webrtc.ICECandidatePair) { d.Remote = append(d.Remote, &ICECandidateExtended{ Remote: candidate, Filtered: false, - Trickle: true, + Trickle: false, }) remoteIdx = len(d.Remote) - 1 } - remote := d.Remote[remoteIdx] - remote.SelectedOrder = d.selectedCount + d.Remote[remoteIdx].SelectedOrder = d.selectedCount + d.updateConnectionTypeLocked() localIdx := slices.IndexFunc(d.Local, func(e *ICECandidateExtended) bool { return isCandidateEqualTo(e.Local, pair.Local) @@ -213,26 +244,56 @@ func (d *ICEConnectionDetails) SetSelectedPair(pair *webrtc.ICECandidatePair) { // should not happen return } - local := d.Local[localIdx] - local.SelectedOrder = d.selectedCount + d.Local[localIdx].SelectedOrder = d.selectedCount +} - d.Type = ICEConnectionTypeUDP - if pair.Remote.Protocol == webrtc.ICEProtocolTCP { +func (d *ICEConnectionDetails) updateConnectionTypeLocked() { + highestSelectedOrder := -1 + var selectedRemoteCandidate *ICECandidateExtended + for _, remote := range d.Remote { + if remote.SelectedOrder == 0 { + continue + } + + if remote.SelectedOrder > highestSelectedOrder { + highestSelectedOrder = remote.SelectedOrder + selectedRemoteCandidate = remote + } + } + + if selectedRemoteCandidate == nil { + return + } + + remoteCandidate := selectedRemoteCandidate.Remote + switch remoteCandidate.NetworkType() { + case ice.NetworkTypeUDP4, ice.NetworkTypeUDP6: + d.Type = ICEConnectionTypeUDP + + case ice.NetworkTypeTCP4, ice.NetworkTypeTCP6: d.Type = ICEConnectionTypeTCP } - if pair.Remote.Typ == webrtc.ICECandidateTypeRelay { + + switch remoteCandidate.Type() { + case ice.CandidateTypeRelay: d.Type = ICEConnectionTypeTURN - } else if pair.Remote.Typ == webrtc.ICECandidateTypePrflx { + + case ice.CandidateTypePeerReflexive: // if the remote relay candidate pings us *before* we get a relay candidate, // Pion would have created a prflx candidate with the same address as the relay candidate. // to report an accurate connection type, we'll compare to see if existing relay candidates match for _, other := range d.Remote { or := other.Remote if or.Type() == ice.CandidateTypeRelay && - pair.Remote.Address == or.Address() && - pair.Remote.Port == uint16(or.Port()) && - pair.Remote.Protocol.String() == or.NetworkType().NetworkShort() { + remoteCandidate.Address() == or.Address() && + // NOTE: port is not compared as relayed address reported by TURN ALLOCATE from + // pion/turn server -> client and later sent from client -> server via ICE Trickle does not + // match port of `prflx` candidate learnt via TURN path. TODO-INVESTIGATE: how and why doesn't + // port match? + //remoteCanddiate.Port() == or.Port() && + remoteCandidate.NetworkType().NetworkShort() == or.NetworkType().NetworkShort() { d.Type = ICEConnectionTypeTURN + break } } }