diff --git a/pkg/config/config.go b/pkg/config/config.go index f263d4dc6..260f9de04 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,7 +38,7 @@ type Config struct { Video VideoConfig `yaml:"video,omitempty"` Room RoomConfig `yaml:"room,omitempty"` TURN TURNConfig `yaml:"turn,omitempty"` - Ingress IngressConfig `yaml:ingress,omitempty"` + Ingress IngressConfig `yaml:"ingress,omitempty"` WebHook WebHookConfig `yaml:"webhook,omitempty"` NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"` KeyFile string `yaml:"key_file,omitempty"` @@ -74,6 +74,9 @@ type RTCConfig struct { // for testing, disable UDP ForceTCP bool `yaml:"force_tcp,omitempty"` + + // allow TCP fallback + AllowTCPFallback bool `yaml:"allow_tcp_fallback,omitempty"` } type TURNServer struct { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6d9c14dc4..8f15b139b 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -84,6 +84,7 @@ type ParticipantParams struct { Region string Migration bool AdaptiveStream bool + AllowTCPFallback bool } type ParticipantImpl struct { @@ -546,17 +547,24 @@ func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant)) } // HandleOffer an offer from remote participant, used when clients make the initial connection -func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) error { +func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) error { p.lock.Lock() if p.MigrateState() == types.MigrateStateInit { - p.pendingOffer = &sdp + p.pendingOffer = &offer p.lock.Unlock() return nil } p.lock.Unlock() - p.params.Logger.Infow("received pub offer", "state", p.State().String(), "sdp", sdp.SDP) - if err := p.publisher.SetRemoteDescription(sdp); err != nil { + // filter before setting remote description so that pion does not see filtered remote candidates + if p.iceConfig.PreferPubTcp { + p.publisher.Logger().Infow("remote offer (unfiltered)", "sdp", offer.SDP) + } + modifiedOffer := p.publisher.FilterCandidates(offer) + if p.iceConfig.PreferPubTcp { + p.publisher.Logger().Infow("remote offer (filtered)", "sdp", modifiedOffer.SDP) + } + if err := p.publisher.SetRemoteDescription(modifiedOffer); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1) return err } @@ -576,17 +584,24 @@ func (p *ParticipantImpl) createPublsiherAnswerAndSend() error { return errors.Wrap(err, "could not create answer") } - p.params.Logger.Infow("sending pub answer (unfiltered)", "state", p.State().String(), "sdp", answer.SDP) - answer = p.publisher.FilterCandidates(answer) - p.params.Logger.Infow("sending pub answer (filtered)", "state", p.State().String(), "sdp", answer.SDP) - + if p.iceConfig.PreferPubTcp { + p.publisher.Logger().Infow("local answer (unfiltered)", "sdp", answer.SDP) + } if err = p.publisher.pc.SetLocalDescription(answer); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) return errors.Wrap(err, "could not set local description") } - p.params.Logger.Debugw("sending answer to client") - + // + // Filter after setting local description as pion expects the answer + // to match between CreateAnswer and SetLocalDescription. + // Filtered answer is sent to remote so that remote does not + // see filtered candidates. + // + answer = p.publisher.FilterCandidates(answer) + if p.iceConfig.PreferPubTcp { + p.publisher.Logger().Infow("local answer (filtered)", "sdp", answer.SDP) + } err = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ Answer: ToProtoSessionDescription(answer), @@ -682,13 +697,20 @@ func (p *ParticipantImpl) SetMigrateInfo(previousAnswer *webrtc.SessionDescripti // HandleAnswer handles a client answer response, with subscriber PC, server initiates the // offer and client answers -func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { - if sdp.Type != webrtc.SDPTypeAnswer { +func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) error { + if answer.Type != webrtc.SDPTypeAnswer { return ErrUnexpectedOffer } - p.params.Logger.Infow("setting subPC answer", "answer", sdp) - if err := p.subscriber.SetRemoteDescription(sdp); err != nil { + // filter before setting remote description so that pion does not see filtered remote candidates + if p.iceConfig.PreferSubTcp { + p.subscriber.Logger().Infow("remote answer (unfiltered)", "sdp", answer.SDP) + } + modifiedAnswer := p.subscriber.FilterCandidates(answer) + if p.iceConfig.PreferSubTcp { + p.subscriber.Logger().Infow("remote answer (filtered)", "sdp", modifiedAnswer.SDP) + } + if err := p.subscriber.SetRemoteDescription(modifiedAnswer); err != nil { return errors.Wrap(err, "could not set remote description") } @@ -698,20 +720,22 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { // AddICECandidate adds candidates for remote peer func (p *ParticipantImpl) AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { var filterOut bool + var pcTransport *PCTransport p.lock.RLock() if target == livekit.SignalTarget_SUBSCRIBER { if p.iceConfig.PreferSubTcp && !strings.Contains(candidate.Candidate, "tcp") { filterOut = true - p.subscriber.Logger().Infow("filtering out candidate", "candidate", candidate.Candidate) + pcTransport = p.subscriber } } else if target == livekit.SignalTarget_PUBLISHER { if p.iceConfig.PreferPubTcp && !strings.Contains(candidate.Candidate, "tcp") { filterOut = true - p.publisher.Logger().Infow("filtering out candidate", "candidate", candidate.Candidate) + pcTransport = p.publisher } } p.lock.RUnlock() if filterOut { + pcTransport.Logger().Infow("filtering out remote candidate", "candidate", candidate.Candidate) return nil } @@ -1225,8 +1249,6 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) { return } - p.params.Logger.Infow("sending server offer to participant", "offer", offer) - err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), @@ -1361,11 +1383,13 @@ func (p *ParticipantImpl) handleConnectionFailed(isPrimary bool) { } else { pcTransport.Logger().Infow("short ICE connection", "pair", pair, "duration", duration) } - pcTransport.Logger().Infow("restricting transport to TCP on both peer connections") - p.SetICEConfig(types.IceConfig{ - PreferPubTcp: true, - PreferSubTcp: true, - }) + if p.params.AllowTCPFallback { + pcTransport.Logger().Infow("restricting transport to TCP on both peer connections") + p.SetICEConfig(types.IceConfig{ + PreferPubTcp: true, + PreferSubTcp: true, + }) + } } } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 2038dae3d..ea0090286 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -183,20 +183,22 @@ func (p *ParticipantImpl) SendRefreshToken(token string) error { func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) { var filterOut bool + var pcTransport *PCTransport p.lock.RLock() if target == livekit.SignalTarget_SUBSCRIBER { if p.iceConfig.PreferSubTcp && c.Protocol != webrtc.ICEProtocolTCP { filterOut = true - p.subscriber.Logger().Infow("filtering out candidate", "candidate", c) + pcTransport = p.subscriber } } else if target == livekit.SignalTarget_PUBLISHER { if p.iceConfig.PreferPubTcp && c.Protocol != webrtc.ICEProtocolTCP { filterOut = true - p.publisher.Logger().Infow("filtering out candidate", "candidate", c) + pcTransport = p.publisher } } p.lock.RUnlock() if filterOut { + pcTransport.Logger().Infow("filtering out local candidate", "candidate", c) return } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 946702118..0059c49ee 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -37,7 +37,7 @@ const ( iceFailedTimeout = 25 * time.Second // pion's default iceKeepaliveInterval = 2 * time.Second // pion's default - shortConnectionThreshold = 2 * time.Minute + shortConnectionThreshold = 90 * time.Second ) var ( @@ -228,7 +228,13 @@ func (t *PCTransport) Logger() logger.Logger { func (t *PCTransport) SetICEConnectedAt(at time.Time) { t.lock.Lock() - t.iceConnectedAt = at + if t.iceConnectedAt.IsZero() { + // + // Record initial connection time. + // This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`. + // + t.iceConnectedAt = at + } t.lock.Unlock() } @@ -563,10 +569,9 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return err } - t.params.Logger.Infow("local offer (unfiltered)", "sdp", offer.SDP) - offer = t.filterCandidates(offer) - t.params.Logger.Infow("local offer (filtered)", "sdp", offer.SDP) - + if t.preferTCP { + t.params.Logger.Infow("local offer (unfiltered)", "sdp", offer.SDP) + } err = t.pc.SetLocalDescription(offer) if err != nil { prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) @@ -574,6 +579,17 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return err } + // + // Filter after setting local description as pion expects the offer + // to match between CreateOffer and SetLocalDescription. + // Filtered offer is sent to remote so that remote does not + // see filtered candidates. + // + offer = t.filterCandidates(offer) + if t.preferTCP { + t.params.Logger.Infow("local offer (filtered)", "sdp", offer.SDP) + } + // indicate waiting for client t.negotiationState = negotiationStateClient t.restartAfterGathering = false diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e29c6a355..d2c8d4b30 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -25,7 +25,7 @@ const ( roomPurgeSeconds = 24 * 60 * 60 tokenRefreshInterval = 5 * time.Minute tokenDefaultTTL = 10 * time.Minute - iceConfigTTL = 60 * time.Minute + iceConfigTTL = 5 * time.Minute ) type iceConfigCacheEntry struct { @@ -270,6 +270,7 @@ func (r *RoomManager) StartSession( ClientConf: clientConf, Region: pi.Region, AdaptiveStream: pi.AdaptiveStream, + AllowTCPFallback: r.config.RTC.AllowTCPFallback, }) if err != nil { return err