From c905336fecfbcf0efad04e53f8686c716b758347 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 16 Jul 2024 09:50:55 -0700 Subject: [PATCH] mark final ice candidate (#2871) * add IsFinal flag to last ice candidate * deps --- go.mod | 2 +- go.sum | 6 ++---- pkg/rtc/participant.go | 4 +++- pkg/rtc/participant_signal.go | 9 ++++++--- pkg/rtc/utils.go | 4 +++- test/client/client.go | 14 ++++++++------ 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index ecf2012f9..6e2cdf3e9 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 - github.com/livekit/protocol v1.19.2-0.20240715223818-034b43c47a92 + github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 41bec8db5..810d0ca49 100644 --- a/go.sum +++ b/go.sum @@ -167,10 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240715210338-32175c16266f h1:XdZNoNxskyLf6PRK3nk/301D0x8KRxMo4xisnGOkFuc= -github.com/livekit/protocol v1.19.2-0.20240715210338-32175c16266f/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= -github.com/livekit/protocol v1.19.2-0.20240715223818-034b43c47a92 h1:cYJZChlmNM5hMqEBat+baJm3qYNtqery+9auC3DmMXA= -github.com/livekit/protocol v1.19.2-0.20240715223818-034b43c47a92/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae h1:Pj1/pdRCCZZYuKRTxbjYtw1eClmkoYxG3nhQNcVA5no= +github.com/livekit/protocol v1.19.2-0.20240716163657-b5404e5c9aae/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 0db061b73..bb2fc66bd 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -191,6 +191,8 @@ type ParticipantImpl struct { *UpTrackManager *SubscriptionManager + icQueue atomic.Pointer[webrtc.ICECandidate] + // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -1610,7 +1612,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { - if c == nil || p.IsDisconnected() || p.IsClosed() { + if p.IsDisconnected() || p.IsClosed() { return nil } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 272135fa0..64f5ab393 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -264,10 +264,13 @@ func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error { }) } -func (p *ParticipantImpl) sendICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { - trickle := ToProtoTrickle(c.ToJSON()) - trickle.Target = target +func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { + prevIC := p.icQueue.Swap(ic) + if prevIC == nil { + return nil + } + trickle := ToProtoTrickle(prevIC.ToJSON(), target, ic == nil) p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle)) return p.writeMessage(&livekit.SignalResponse{ diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 661646204..c5a54d6ac 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -101,10 +101,12 @@ func FromProtoSessionDescription(sd *livekit.SessionDescription) webrtc.SessionD } } -func ToProtoTrickle(candidateInit webrtc.ICECandidateInit) *livekit.TrickleRequest { +func ToProtoTrickle(candidateInit webrtc.ICECandidateInit, target livekit.SignalTarget, final bool) *livekit.TrickleRequest { data, _ := json.Marshal(candidateInit) return &livekit.TrickleRequest{ CandidateInit: string(data), + Target: target, + Final: final, } } diff --git a/test/client/client.go b/test/client/client.go index 1564b4b97..49dbcad2d 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -69,6 +69,8 @@ type RTCClient struct { signalRequestInterceptor SignalRequestInterceptor signalResponseInterceptor SignalResponseInterceptor + icQueue atomic.Pointer[webrtc.ICECandidate] + subscriberAsPrimary atomic.Bool publisherFullyEstablished atomic.Bool subscriberFullyEstablished atomic.Bool @@ -226,9 +228,6 @@ func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) { } publisherHandler.OnICECandidateCalls(func(ic *webrtc.ICECandidate, t livekit.SignalTarget) error { - if ic == nil { - return nil - } return c.SendIceCandidate(ic, livekit.SignalTarget_PUBLISHER) }) publisherHandler.OnOfferCalls(c.onOffer) @@ -556,11 +555,14 @@ func (c *RTCClient) sendRequest(msg *livekit.SignalRequest) error { } func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { - trickle := rtc.ToProtoTrickle(ic.ToJSON()) - trickle.Target = target + prevIC := c.icQueue.Swap(ic) + if prevIC == nil { + return nil + } + return c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Trickle{ - Trickle: trickle, + Trickle: rtc.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil), }, }) }