Fix filtering candidates (#901)

* Filter candidate after setting description

* comment

* Fix filtering candidates

- For offer/answer from remote, do filtering before setting remote
  description so that Pion does not see filtered candidates
- For offer/answer originating from server, do filtering after setting
  local description (comments in code) so that remote side does not
  see filtered candidates.
- Make logging a little consistent and use right context.

* Comment

* TCP fallback config and UT (broken now)

* log SDP only when preferring TCP

* Remove TCP fallback test attempt
This commit is contained in:
Raja Subramanian
2022-08-10 10:42:46 +05:30
committed by GitHub
parent 67d3f21122
commit 49cf15cdca
5 changed files with 79 additions and 33 deletions

View File

@@ -38,7 +38,7 @@ type Config struct {
Video VideoConfig `yaml:"video,omitempty"`
Room RoomConfig `yaml:"room,omitempty"`
TURN TURNConfig `yaml:"turn,omitempty"`
Ingress IngressConfig `yaml:ingress,omitempty"`
Ingress IngressConfig `yaml:"ingress,omitempty"`
WebHook WebHookConfig `yaml:"webhook,omitempty"`
NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
@@ -74,6 +74,9 @@ type RTCConfig struct {
// for testing, disable UDP
ForceTCP bool `yaml:"force_tcp,omitempty"`
// allow TCP fallback
AllowTCPFallback bool `yaml:"allow_tcp_fallback,omitempty"`
}
type TURNServer struct {

View File

@@ -84,6 +84,7 @@ type ParticipantParams struct {
Region string
Migration bool
AdaptiveStream bool
AllowTCPFallback bool
}
type ParticipantImpl struct {
@@ -546,17 +547,24 @@ func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant))
}
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) error {
func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) error {
p.lock.Lock()
if p.MigrateState() == types.MigrateStateInit {
p.pendingOffer = &sdp
p.pendingOffer = &offer
p.lock.Unlock()
return nil
}
p.lock.Unlock()
p.params.Logger.Infow("received pub offer", "state", p.State().String(), "sdp", sdp.SDP)
if err := p.publisher.SetRemoteDescription(sdp); err != nil {
// filter before setting remote description so that pion does not see filtered remote candidates
if p.iceConfig.PreferPubTcp {
p.publisher.Logger().Infow("remote offer (unfiltered)", "sdp", offer.SDP)
}
modifiedOffer := p.publisher.FilterCandidates(offer)
if p.iceConfig.PreferPubTcp {
p.publisher.Logger().Infow("remote offer (filtered)", "sdp", modifiedOffer.SDP)
}
if err := p.publisher.SetRemoteDescription(modifiedOffer); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1)
return err
}
@@ -576,17 +584,24 @@ func (p *ParticipantImpl) createPublsiherAnswerAndSend() error {
return errors.Wrap(err, "could not create answer")
}
p.params.Logger.Infow("sending pub answer (unfiltered)", "state", p.State().String(), "sdp", answer.SDP)
answer = p.publisher.FilterCandidates(answer)
p.params.Logger.Infow("sending pub answer (filtered)", "state", p.State().String(), "sdp", answer.SDP)
if p.iceConfig.PreferPubTcp {
p.publisher.Logger().Infow("local answer (unfiltered)", "sdp", answer.SDP)
}
if err = p.publisher.pc.SetLocalDescription(answer); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
return errors.Wrap(err, "could not set local description")
}
p.params.Logger.Debugw("sending answer to client")
//
// Filter after setting local description as pion expects the answer
// to match between CreateAnswer and SetLocalDescription.
// Filtered answer is sent to remote so that remote does not
// see filtered candidates.
//
answer = p.publisher.FilterCandidates(answer)
if p.iceConfig.PreferPubTcp {
p.publisher.Logger().Infow("local answer (filtered)", "sdp", answer.SDP)
}
err = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
@@ -682,13 +697,20 @@ func (p *ParticipantImpl) SetMigrateInfo(previousAnswer *webrtc.SessionDescripti
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
if sdp.Type != webrtc.SDPTypeAnswer {
func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return ErrUnexpectedOffer
}
p.params.Logger.Infow("setting subPC answer", "answer", sdp)
if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
// filter before setting remote description so that pion does not see filtered remote candidates
if p.iceConfig.PreferSubTcp {
p.subscriber.Logger().Infow("remote answer (unfiltered)", "sdp", answer.SDP)
}
modifiedAnswer := p.subscriber.FilterCandidates(answer)
if p.iceConfig.PreferSubTcp {
p.subscriber.Logger().Infow("remote answer (filtered)", "sdp", modifiedAnswer.SDP)
}
if err := p.subscriber.SetRemoteDescription(modifiedAnswer); err != nil {
return errors.Wrap(err, "could not set remote description")
}
@@ -698,20 +720,22 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
// AddICECandidate adds candidates for remote peer
func (p *ParticipantImpl) AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error {
var filterOut bool
var pcTransport *PCTransport
p.lock.RLock()
if target == livekit.SignalTarget_SUBSCRIBER {
if p.iceConfig.PreferSubTcp && !strings.Contains(candidate.Candidate, "tcp") {
filterOut = true
p.subscriber.Logger().Infow("filtering out candidate", "candidate", candidate.Candidate)
pcTransport = p.subscriber
}
} else if target == livekit.SignalTarget_PUBLISHER {
if p.iceConfig.PreferPubTcp && !strings.Contains(candidate.Candidate, "tcp") {
filterOut = true
p.publisher.Logger().Infow("filtering out candidate", "candidate", candidate.Candidate)
pcTransport = p.publisher
}
}
p.lock.RUnlock()
if filterOut {
pcTransport.Logger().Infow("filtering out remote candidate", "candidate", candidate.Candidate)
return nil
}
@@ -1225,8 +1249,6 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
return
}
p.params.Logger.Infow("sending server offer to participant", "offer", offer)
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),
@@ -1361,11 +1383,13 @@ func (p *ParticipantImpl) handleConnectionFailed(isPrimary bool) {
} else {
pcTransport.Logger().Infow("short ICE connection", "pair", pair, "duration", duration)
}
pcTransport.Logger().Infow("restricting transport to TCP on both peer connections")
p.SetICEConfig(types.IceConfig{
PreferPubTcp: true,
PreferSubTcp: true,
})
if p.params.AllowTCPFallback {
pcTransport.Logger().Infow("restricting transport to TCP on both peer connections")
p.SetICEConfig(types.IceConfig{
PreferPubTcp: true,
PreferSubTcp: true,
})
}
}
}

View File

@@ -183,20 +183,22 @@ func (p *ParticipantImpl) SendRefreshToken(token string) error {
func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) {
var filterOut bool
var pcTransport *PCTransport
p.lock.RLock()
if target == livekit.SignalTarget_SUBSCRIBER {
if p.iceConfig.PreferSubTcp && c.Protocol != webrtc.ICEProtocolTCP {
filterOut = true
p.subscriber.Logger().Infow("filtering out candidate", "candidate", c)
pcTransport = p.subscriber
}
} else if target == livekit.SignalTarget_PUBLISHER {
if p.iceConfig.PreferPubTcp && c.Protocol != webrtc.ICEProtocolTCP {
filterOut = true
p.publisher.Logger().Infow("filtering out candidate", "candidate", c)
pcTransport = p.publisher
}
}
p.lock.RUnlock()
if filterOut {
pcTransport.Logger().Infow("filtering out local candidate", "candidate", c)
return
}

View File

@@ -37,7 +37,7 @@ const (
iceFailedTimeout = 25 * time.Second // pion's default
iceKeepaliveInterval = 2 * time.Second // pion's default
shortConnectionThreshold = 2 * time.Minute
shortConnectionThreshold = 90 * time.Second
)
var (
@@ -228,7 +228,13 @@ func (t *PCTransport) Logger() logger.Logger {
func (t *PCTransport) SetICEConnectedAt(at time.Time) {
t.lock.Lock()
t.iceConnectedAt = at
if t.iceConnectedAt.IsZero() {
//
// Record initial connection time.
// This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`.
//
t.iceConnectedAt = at
}
t.lock.Unlock()
}
@@ -563,10 +569,9 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
return err
}
t.params.Logger.Infow("local offer (unfiltered)", "sdp", offer.SDP)
offer = t.filterCandidates(offer)
t.params.Logger.Infow("local offer (filtered)", "sdp", offer.SDP)
if t.preferTCP {
t.params.Logger.Infow("local offer (unfiltered)", "sdp", offer.SDP)
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
@@ -574,6 +579,17 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
return err
}
//
// Filter after setting local description as pion expects the offer
// to match between CreateOffer and SetLocalDescription.
// Filtered offer is sent to remote so that remote does not
// see filtered candidates.
//
offer = t.filterCandidates(offer)
if t.preferTCP {
t.params.Logger.Infow("local offer (filtered)", "sdp", offer.SDP)
}
// indicate waiting for client
t.negotiationState = negotiationStateClient
t.restartAfterGathering = false

View File

@@ -25,7 +25,7 @@ const (
roomPurgeSeconds = 24 * 60 * 60
tokenRefreshInterval = 5 * time.Minute
tokenDefaultTTL = 10 * time.Minute
iceConfigTTL = 60 * time.Minute
iceConfigTTL = 5 * time.Minute
)
type iceConfigCacheEntry struct {
@@ -270,6 +270,7 @@ func (r *RoomManager) StartSession(
ClientConf: clientConf,
Region: pi.Region,
AdaptiveStream: pi.AdaptiveStream,
AllowTCPFallback: r.config.RTC.AllowTCPFallback,
})
if err != nil {
return err