diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index a492750a7..787a6b4ac 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -161,11 +161,20 @@ func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error { } state := t.negotiationState.Load().(int) + // when there's an ongoing negotiation, let it finish and not disrupt its state if state == negotiationStateClient { - logger.Debugw("skipping negotiation, trying again later") - t.negotiationState.Store(negotiationRetry) - return nil + currentSD := t.pc.CurrentRemoteDescription() + if options != nil && options.ICERestart && currentSD != nil { + logger.Debugw("recovering from client negotiation state") + if err := t.pc.SetRemoteDescription(*currentSD); err != nil { + return err + } + } else { + logger.Debugw("skipping negotiation, trying again later") + t.negotiationState.Store(negotiationRetry) + return nil + } } offer, err := t.pc.CreateOffer(options) diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go new file mode 100644 index 000000000..ab016b5e6 --- /dev/null +++ b/pkg/rtc/transport_test.go @@ -0,0 +1,82 @@ +package rtc + +import ( + "testing" + "time" + + livekit "github.com/livekit/livekit-server/proto" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +func TestMissingAnswerDuringICERestart(t *testing.T) { + params := TransportParams{ + Target: livekit.SignalTarget_PUBLISHER, + Config: &WebRTCConfig{}, + Stats: nil, + } + transportA, err := NewPCTransport(params) + require.NoError(t, err) + _, err = transportA.pc.CreateDataChannel("test", nil) + require.NoError(t, err) + transportB, err := NewPCTransport(params) + require.NoError(t, err) + + // exchange ICE + transportA.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + if candidate == nil { + return + } + t.Logf("got ICE candidate from A: %v", candidate) + require.NoError(t, transportB.AddICECandidate(candidate.ToJSON())) + }) + transportB.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + if candidate == nil { + return + } + t.Logf("got ICE candidate from B: %v", candidate) + require.NoError(t, transportA.AddICECandidate(candidate.ToJSON())) + }) + // set offer/answer + handleOffer := handleOfferFunc(t, transportA, transportB) + transportA.OnOffer(handleOffer) + + // first establish connection + require.NoError(t, transportA.CreateAndSendOffer(nil)) + + // ensure we are connected the first time + time.Sleep(10 * time.Millisecond) + require.Equal(t, webrtc.ICEConnectionStateConnected, transportA.pc.ICEConnectionState()) + require.Equal(t, webrtc.ICEConnectionStateConnected, transportB.pc.ICEConnectionState()) + + // offer again, but missed + transportA.OnOffer(func(sd webrtc.SessionDescription) {}) + require.NoError(t, transportA.CreateAndSendOffer(nil)) + require.Equal(t, webrtc.SignalingStateHaveLocalOffer, transportA.pc.SignalingState()) + require.Equal(t, negotiationStateClient, transportA.negotiationState.Load().(int)) + + // now restart ICE + t.Logf("creating offer with ICE restart") + transportA.OnOffer(handleOffer) + require.NoError(t, transportA.CreateAndSendOffer(&webrtc.OfferOptions{ + ICERestart: true, + })) + + time.Sleep(50 * time.Millisecond) + require.Equal(t, webrtc.ICEConnectionStateConnected, transportA.pc.ICEConnectionState()) + require.Equal(t, webrtc.ICEConnectionStateConnected, transportB.pc.ICEConnectionState()) +} + +func handleOfferFunc(t *testing.T, current, other *PCTransport) func(sd webrtc.SessionDescription) { + return func(sd webrtc.SessionDescription) { + t.Logf("handling offer") + t.Logf("setting other remote description") + require.NoError(t, other.SetRemoteDescription(sd)) + answer, err := other.pc.CreateAnswer(nil) + require.NoError(t, err) + require.NoError(t, other.pc.SetLocalDescription(answer)) + + t.Logf("setting answer on current") + require.NoError(t, current.SetRemoteDescription(answer)) + } +}