From 0cecd81906bdc5897e8b2efa83cf5eb55fb15817 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 9 Jun 2021 17:50:12 -0700 Subject: [PATCH] fix negotiation timing issue, potential deadlock in ICE restart --- pkg/rtc/transport.go | 27 +++++++++++++++++---------- pkg/rtc/transport_test.go | 20 +++++++++++++++++++- test/scenarios.go | 5 +---- version/version.go | 2 +- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 12e8e980d..ca5f4bccb 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -96,7 +96,8 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { t.lock.Lock() defer t.lock.Unlock() if t.restartAfterGathering { - if err := t.CreateAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil { + logger.Debugw("restarting ICE after ICE gathering") + if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil { logger.Warnw("could not restart ICE", err) } } @@ -133,6 +134,10 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { return err } + // negotiated, reset flag + lastState := t.negotiationState + t.negotiationState = negotiationStateNone + for _, c := range t.pendingCandidates { if err := t.pc.AddICECandidate(c); err != nil { return err @@ -140,16 +145,13 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { } t.pendingCandidates = nil - // negotiated, reset flag - state := t.negotiationState - t.negotiationState = negotiationStateNone - if state == negotiationRetry { - // need to Negotiate again, do it immediately - if err := t.CreateAndSendOffer(nil); err != nil { + // only initiate when we are the offerer + if lastState == negotiationRetry && sd.Type == webrtc.SDPTypeAnswer { + logger.Debugw("re-negotiate after answering") + if err := t.createAndSendOffer(nil); err != nil { logger.Errorw("could not negotiate", err) } } - return nil } @@ -167,6 +169,13 @@ func (t *PCTransport) Negotiate() { } func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error { + t.lock.Lock() + defer t.lock.Unlock() + return t.createAndSendOffer(options) +} + +// creates and sends offer assuming lock has been acquired +func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { if t.onOffer == nil { return nil } @@ -174,8 +183,6 @@ func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error { return nil } - t.lock.Lock() - defer t.lock.Unlock() iceRestart := options != nil && options.ICERestart // if restart is requested, and we are not ready, then continue afterwards diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index 343de8521..34e839fb9 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -3,6 +3,7 @@ package rtc import ( "sync/atomic" "testing" + "time" "github.com/livekit/livekit-server/pkg/testutils" livekit "github.com/livekit/livekit-server/proto" @@ -76,7 +77,7 @@ func TestNegotiationTiming(t *testing.T) { handleICEExchange(t, transportA, transportB) offer := atomic.Value{} transportA.OnOffer(func(sd webrtc.SessionDescription) { - offer.Store(sd) + offer.Store(&sd) }) // initial offer @@ -90,6 +91,23 @@ func TestNegotiationTiming(t *testing.T) { // third try, should've stayed at retry require.NoError(t, transportA.CreateAndSendOffer(nil)) require.Equal(t, negotiationRetry, transportA.negotiationState) + + time.Sleep(5 * time.Millisecond) + actualOffer, ok := offer.Load().(*webrtc.SessionDescription) + + require.True(t, ok) + require.NoError(t, transportB.SetRemoteDescription(*actualOffer)) + answer, err := transportB.pc.CreateAnswer(nil) + require.NoError(t, err) + require.NoError(t, transportB.pc.SetLocalDescription(answer)) + require.NoError(t, transportA.SetRemoteDescription(answer)) + time.Sleep(5 * time.Millisecond) + + // it should still be negotiating again + require.Equal(t, negotiationStateClient, transportA.negotiationState) + offer2, ok := offer.Load().(*webrtc.SessionDescription) + require.True(t, ok) + require.False(t, offer2 == actualOffer) } func handleOfferFunc(t *testing.T, current, other *PCTransport) func(sd webrtc.SessionDescription) { diff --git a/test/scenarios.go b/test/scenarios.go index d76c544e7..bf20b5ef7 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -72,7 +72,7 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) { writers = publishTracksForClients(t, c2) defer stopWriters(writers...) - success = testutils.WithTimeout(t, "new c2 tracks should be published again", func() bool { + testutils.WithTimeout(t, "new c2 tracks should be published again", func() bool { tracks := c3.SubscribedTracks() if len(tracks[c2.ID()]) != 2 { return false @@ -82,9 +82,6 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) { } return true }) - if !success { - t.FailNow() - } } func scenarioReceiveBeforePublish(t *testing.T) { diff --git a/version/version.go b/version/version.go index 438835cf0..f204f9aef 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -const Version = "0.9.9" +const Version = "0.9.10-SNAPSHOT"