mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
fix negotiation timing issue, potential deadlock in ICE restart
This commit is contained in:
@@ -96,7 +96,8 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if t.restartAfterGathering {
|
||||
if err := t.CreateAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
|
||||
logger.Debugw("restarting ICE after ICE gathering")
|
||||
if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
|
||||
logger.Warnw("could not restart ICE", err)
|
||||
}
|
||||
}
|
||||
@@ -133,6 +134,10 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// negotiated, reset flag
|
||||
lastState := t.negotiationState
|
||||
t.negotiationState = negotiationStateNone
|
||||
|
||||
for _, c := range t.pendingCandidates {
|
||||
if err := t.pc.AddICECandidate(c); err != nil {
|
||||
return err
|
||||
@@ -140,16 +145,13 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
|
||||
}
|
||||
t.pendingCandidates = nil
|
||||
|
||||
// negotiated, reset flag
|
||||
state := t.negotiationState
|
||||
t.negotiationState = negotiationStateNone
|
||||
if state == negotiationRetry {
|
||||
// need to Negotiate again, do it immediately
|
||||
if err := t.CreateAndSendOffer(nil); err != nil {
|
||||
// only initiate when we are the offerer
|
||||
if lastState == negotiationRetry && sd.Type == webrtc.SDPTypeAnswer {
|
||||
logger.Debugw("re-negotiate after answering")
|
||||
if err := t.createAndSendOffer(nil); err != nil {
|
||||
logger.Errorw("could not negotiate", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -167,6 +169,13 @@ func (t *PCTransport) Negotiate() {
|
||||
}
|
||||
|
||||
func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return t.createAndSendOffer(options)
|
||||
}
|
||||
|
||||
// creates and sends offer assuming lock has been acquired
|
||||
func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
|
||||
if t.onOffer == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -174,8 +183,6 @@ func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
iceRestart := options != nil && options.ICERestart
|
||||
|
||||
// if restart is requested, and we are not ready, then continue afterwards
|
||||
|
||||
@@ -3,6 +3,7 @@ package rtc
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/testutils"
|
||||
livekit "github.com/livekit/livekit-server/proto"
|
||||
@@ -76,7 +77,7 @@ func TestNegotiationTiming(t *testing.T) {
|
||||
handleICEExchange(t, transportA, transportB)
|
||||
offer := atomic.Value{}
|
||||
transportA.OnOffer(func(sd webrtc.SessionDescription) {
|
||||
offer.Store(sd)
|
||||
offer.Store(&sd)
|
||||
})
|
||||
|
||||
// initial offer
|
||||
@@ -90,6 +91,23 @@ func TestNegotiationTiming(t *testing.T) {
|
||||
// third try, should've stayed at retry
|
||||
require.NoError(t, transportA.CreateAndSendOffer(nil))
|
||||
require.Equal(t, negotiationRetry, transportA.negotiationState)
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
actualOffer, ok := offer.Load().(*webrtc.SessionDescription)
|
||||
|
||||
require.True(t, ok)
|
||||
require.NoError(t, transportB.SetRemoteDescription(*actualOffer))
|
||||
answer, err := transportB.pc.CreateAnswer(nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, transportB.pc.SetLocalDescription(answer))
|
||||
require.NoError(t, transportA.SetRemoteDescription(answer))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// it should still be negotiating again
|
||||
require.Equal(t, negotiationStateClient, transportA.negotiationState)
|
||||
offer2, ok := offer.Load().(*webrtc.SessionDescription)
|
||||
require.True(t, ok)
|
||||
require.False(t, offer2 == actualOffer)
|
||||
}
|
||||
|
||||
func handleOfferFunc(t *testing.T, current, other *PCTransport) func(sd webrtc.SessionDescription) {
|
||||
|
||||
@@ -72,7 +72,7 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) {
|
||||
writers = publishTracksForClients(t, c2)
|
||||
defer stopWriters(writers...)
|
||||
|
||||
success = testutils.WithTimeout(t, "new c2 tracks should be published again", func() bool {
|
||||
testutils.WithTimeout(t, "new c2 tracks should be published again", func() bool {
|
||||
tracks := c3.SubscribedTracks()
|
||||
if len(tracks[c2.ID()]) != 2 {
|
||||
return false
|
||||
@@ -82,9 +82,6 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) {
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !success {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func scenarioReceiveBeforePublish(t *testing.T) {
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
package version
|
||||
|
||||
const Version = "0.9.9"
|
||||
const Version = "0.9.10-SNAPSHOT"
|
||||
|
||||
Reference in New Issue
Block a user