mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 16:15:25 +00:00
Refactor/cleanup of negotiation path
This commit is contained in:
@@ -132,7 +132,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
|
||||
c.subscriber.PeerConnection().OnDataChannel(func(channel *webrtc.DataChannel) {
|
||||
})
|
||||
|
||||
c.publisher.OnNegotiationNeeded(c.negotiate)
|
||||
c.publisher.OnOffer(c.onOffer)
|
||||
|
||||
c.publisher.PeerConnection().OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
|
||||
logger.Debugw("ICE state has changed", "state", connectionState.String(),
|
||||
@@ -414,7 +414,6 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string)
|
||||
if _, err = c.publisher.PeerConnection().AddTrack(track); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
writer = NewTrackWriter(c.ctx, track, path)
|
||||
|
||||
// write tracks only after ICE connectivity
|
||||
@@ -516,21 +515,11 @@ func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RTCClient) negotiate() {
|
||||
func (c *RTCClient) onOffer(offer webrtc.SessionDescription) {
|
||||
if c.localParticipant != nil {
|
||||
logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity)
|
||||
}
|
||||
|
||||
offer, err := c.publisher.PeerConnection().CreateOffer(nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.publisher.PeerConnection().SetLocalDescription(offer); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.SendRequest(&livekit.SignalRequest{
|
||||
_ = c.SendRequest(&livekit.SignalRequest{
|
||||
Message: &livekit.SignalRequest_Offer{
|
||||
Offer: rtc.ToProtoSessionDescription(offer),
|
||||
},
|
||||
|
||||
@@ -42,3 +42,5 @@ require (
|
||||
)
|
||||
|
||||
replace github.com/pion/webrtc/v3 => github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125
|
||||
|
||||
replace github.com/pion/ice/v2 => github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd
|
||||
|
||||
@@ -70,6 +70,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd h1:nQsWFvFzYepfS913kSWUEI95VBG3oIIEf2PCoXpakd8=
|
||||
github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0=
|
||||
github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125 h1:QXgTQ0EMskrDLMRqLMnkOnGPmS2oJPy7TGCJXIysdec=
|
||||
github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125/go.mod h1:vncfQFH7R8oOL0eZQKyQ10Qx7DPfGKoDKCzSsUz4CsE=
|
||||
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE=
|
||||
@@ -159,7 +161,6 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
|
||||
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8=
|
||||
@@ -312,13 +313,9 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0=
|
||||
github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg=
|
||||
github.com/pion/dtls/v2 v2.0.4/go.mod h1:qAkFscX0ZHoI1E07RfYPoRw3manThveu+mlTDdOxoGI=
|
||||
github.com/pion/dtls/v2 v2.0.8/go.mod h1:QuDII+8FVvk9Dp5t5vYIMTo7hh7uBkra+8QIm7QGm10=
|
||||
github.com/pion/dtls/v2 v2.0.9 h1:7Ow+V++YSZQMYzggI0P9vLJz/hUFcffsfGMfT/Qy+u8=
|
||||
github.com/pion/dtls/v2 v2.0.9/go.mod h1:O0Wr7si/Zj5/EBFlDzDd6UtVxx25CE1r7XM7BQKYQho=
|
||||
github.com/pion/ice/v2 v2.0.15/go.mod h1:ZIiVGevpgAxF/cXiIVmuIUtCb3Xs4gCzCbXB6+nFkSI=
|
||||
github.com/pion/ice/v2 v2.1.0 h1:h9lmmO+BoxPDn4k/qZRDKFqWLOMVABtIVV8TZba6sQQ=
|
||||
github.com/pion/ice/v2 v2.1.0/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0=
|
||||
github.com/pion/interceptor v0.0.12 h1:eC1iVneBIAQJEfaNAfDqAncJWhMDAnaXPRCJsltdokE=
|
||||
github.com/pion/interceptor v0.0.12/go.mod h1:qzeuWuD/ZXvPqOnxNcnhWfkCZ2e1kwwslicyyPnhoK4=
|
||||
github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q=
|
||||
@@ -327,7 +324,6 @@ github.com/pion/ion-sfu v1.9.7 h1:kWshVj7J5JZFBHsGmHWqbSiPbXBLjtsWUkfnVEQv5Dw=
|
||||
github.com/pion/ion-sfu v1.9.7/go.mod h1:BQf2nidxkQU5Qc37e13i+rc7Q+qt8CWp0HmYtsy00o8=
|
||||
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
|
||||
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
|
||||
github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0=
|
||||
github.com/pion/mdns v0.0.5 h1:Q2oj/JB3NqfzY9xGZ1fPzZzK7sDSD8rZPOvcIQ10BCw=
|
||||
github.com/pion/mdns v0.0.5/go.mod h1:UgssrvdD3mxpi8tMxAXbsppL3vJ4Jipw1mTCW+al01g=
|
||||
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
|
||||
@@ -345,10 +341,8 @@ github.com/pion/srtp/v2 v2.0.2 h1:664iGzVmaY7KYS5M0gleY0DscRo9ReDfTxQrq4UgGoU=
|
||||
github.com/pion/srtp/v2 v2.0.2/go.mod h1:VEyLv4CuxrwGY8cxM+Ng3bmVy8ckz/1t6A0q/msKOw0=
|
||||
github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg=
|
||||
github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA=
|
||||
github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8=
|
||||
github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE=
|
||||
github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A=
|
||||
github.com/pion/transport v0.12.1/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
|
||||
github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
|
||||
github.com/pion/transport v0.12.3 h1:vdBfvfU/0Wq8kd2yhUMSDB/x+O4Z9MYVl2fJ5BT4JZw=
|
||||
github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A=
|
||||
@@ -500,7 +494,6 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w=
|
||||
@@ -546,14 +539,12 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201026091529-146b70c837a4/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
|
||||
@@ -19,7 +19,7 @@ type MessageSink interface {
|
||||
|
||||
//counterfeiter:generate . MessageSource
|
||||
type MessageSource interface {
|
||||
// source exposes a one way channel to make it easier to use with select
|
||||
// ReadChan exposes a one way channel to make it easier to use with select
|
||||
ReadChan() <-chan proto.Message
|
||||
}
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) {
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// signal connection sets up paths to the RTC node, and starts to route messages to that message queue
|
||||
// StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
|
||||
func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
|
||||
// find the node where the room is hosted at
|
||||
rtcNode, err := r.GetNodeForRoom(roomName)
|
||||
@@ -248,8 +248,8 @@ func (r *RedisRouter) Stop() {
|
||||
return
|
||||
}
|
||||
logger.Debugw("stopping RedisRouter")
|
||||
r.pubsub.Close()
|
||||
r.UnregisterNode()
|
||||
_ = r.pubsub.Close()
|
||||
_ = r.UnregisterNode()
|
||||
r.cancel()
|
||||
}
|
||||
|
||||
|
||||
+1
-2
@@ -58,8 +58,7 @@ func NewWebRTCConfig(conf *config.RTCConfig, externalIP string) (*WebRTCConfig,
|
||||
bufferFactory := buffer.NewBufferFactory(conf.PacketBufferSize, zapr.NewLogger(logger.Desugar()))
|
||||
s.BufferFactory = bufferFactory.GetOrNew
|
||||
|
||||
networkTypes := []webrtc.NetworkType{}
|
||||
|
||||
networkTypes := make([]webrtc.NetworkType, 0, 4)
|
||||
if !conf.ForceTCP {
|
||||
networkTypes = append(networkTypes,
|
||||
webrtc.NetworkTypeUDP4,
|
||||
|
||||
@@ -109,8 +109,7 @@ func (t *MediaTrack) OnClose(f func()) {
|
||||
t.onClose = f
|
||||
}
|
||||
|
||||
// subscribes participant to current remoteTrack
|
||||
// creates and add necessary forwarders and starts them
|
||||
// AddSubscriber subscribes sub to current mediaTrack
|
||||
func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
@@ -200,7 +199,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// adds a new RTP receiver to the track, returns true if this is a new track
|
||||
// AddReceiver adds a new RTP receiver to the track, returns true if this is a new track
|
||||
func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
@@ -263,8 +262,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
})
|
||||
}
|
||||
|
||||
// removes peer from subscription
|
||||
// stop all forwarders to the peer
|
||||
// RemoveSubscriber removes participant from subscription
|
||||
// stop all forwarders to the client
|
||||
func (t *MediaTrack) RemoveSubscriber(participantId string) {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
@@ -301,9 +300,7 @@ func (t *MediaTrack) sendDownTrackBindingReports(participantId string, rtcpCh ch
|
||||
if chunks == nil {
|
||||
return
|
||||
}
|
||||
if chunks != nil {
|
||||
sd = append(sd, chunks...)
|
||||
}
|
||||
sd = append(sd, chunks...)
|
||||
|
||||
pkts := []rtcp.Packet{
|
||||
&rtcp.SourceDescription{Chunks: sd},
|
||||
|
||||
+27
-42
@@ -112,8 +112,7 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink,
|
||||
|
||||
p.publisher.pc.OnTrack(p.onMediaTrack)
|
||||
p.subscriber.pc.OnDataChannel(p.onDataChannel)
|
||||
|
||||
p.subscriber.OnNegotiationNeeded(p.negotiate)
|
||||
p.subscriber.OnOffer(p.onOffer)
|
||||
|
||||
return p, nil
|
||||
}
|
||||
@@ -139,7 +138,7 @@ func (p *ParticipantImpl) ConnectedAt() time.Time {
|
||||
return p.connectedAt.Load().(time.Time)
|
||||
}
|
||||
|
||||
// attach metadata to the participant
|
||||
// SetMetadata attaches metadata to the participant
|
||||
func (p *ParticipantImpl) SetMetadata(metadata string) {
|
||||
p.metadata = metadata
|
||||
|
||||
@@ -334,7 +333,6 @@ func (p *ParticipantImpl) Close() error {
|
||||
p.updateState(livekit.ParticipantInfo_DISCONNECTED)
|
||||
p.subscriber.pc.OnDataChannel(nil)
|
||||
p.subscriber.pc.OnICECandidate(nil)
|
||||
p.subscriber.pc.OnNegotiationNeeded(nil)
|
||||
p.subscriber.pc.OnTrack(nil)
|
||||
p.publisher.pc.OnICECandidate(nil)
|
||||
// ensure this is synchronized
|
||||
@@ -352,7 +350,7 @@ func (p *ParticipantImpl) Close() error {
|
||||
}
|
||||
|
||||
// Subscribes op to all publishedTracks
|
||||
func (p *ParticipantImpl) AddSubscriber(op types.Participant) error {
|
||||
func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
|
||||
p.lock.RLock()
|
||||
tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks))
|
||||
for _, t := range p.publishedTracks {
|
||||
@@ -361,7 +359,7 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) error {
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
if len(tracks) == 0 {
|
||||
return nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
logger.Debugw("subscribing new participant to tracks",
|
||||
@@ -369,12 +367,14 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) error {
|
||||
"newParticipant", op.Identity(),
|
||||
"numTracks", len(tracks))
|
||||
|
||||
n := 0
|
||||
for _, track := range tracks {
|
||||
if err := track.AddSubscriber(op); err != nil {
|
||||
return err
|
||||
return n, err
|
||||
}
|
||||
n += 1
|
||||
}
|
||||
return nil
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) RemoveSubscriber(participantId string) {
|
||||
@@ -533,40 +533,6 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki
|
||||
})
|
||||
}
|
||||
|
||||
// initiates server-driven negotiation by creating an offer
|
||||
func (p *ParticipantImpl) negotiate() {
|
||||
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
logger.Debugw("skipping server negotiation", "participant", p.Identity())
|
||||
// skip when disconnected
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugw("starting server negotiation", "participant", p.Identity())
|
||||
|
||||
offer, err := p.subscriber.pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
logger.Errorw("could not create offer", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = p.subscriber.pc.SetLocalDescription(offer)
|
||||
if err != nil {
|
||||
logger.Errorw("could not set local description", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugw("sending offer to participant",
|
||||
"participant", p.Identity(),
|
||||
//"sdp", offer.SDP,
|
||||
)
|
||||
|
||||
p.writeMessage(&livekit.SignalResponse{
|
||||
Message: &livekit.SignalResponse_Offer{
|
||||
Offer: ToProtoSessionDescription(offer),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
|
||||
oldState := p.State()
|
||||
if state == oldState {
|
||||
@@ -599,6 +565,25 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// when the server has an offer for participant
|
||||
func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) {
|
||||
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
logger.Debugw("skipping server offer", "participant", p.Identity())
|
||||
// skip when disconnected
|
||||
return
|
||||
}
|
||||
logger.Debugw("sending server offer to participant",
|
||||
"participant", p.Identity(),
|
||||
//"sdp", offer.SDP,
|
||||
)
|
||||
|
||||
_ = p.writeMessage(&livekit.SignalResponse{
|
||||
Message: &livekit.SignalResponse_Offer{
|
||||
Offer: ToProtoSessionDescription(sd),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// when a new remoteTrack is created, creates a Track and adds it to room
|
||||
func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
|
||||
logger.Debugw("mediaTrack added",
|
||||
|
||||
@@ -128,7 +128,7 @@ func TestTrackPublishing(t *testing.T) {
|
||||
|
||||
// after disconnection, things should continue to function and not panic
|
||||
func TestDisconnectTiming(t *testing.T) {
|
||||
t.Run("negotiate doesn't fail after channel closed", func(t *testing.T) {
|
||||
t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) {
|
||||
p := newParticipantForTest("test")
|
||||
msg := routing.NewMessageChannel()
|
||||
p.responseSink = msg
|
||||
@@ -140,9 +140,8 @@ func TestDisconnectTiming(t *testing.T) {
|
||||
track := &typesfakes.FakePublishedTrack{}
|
||||
p.handleTrackPublished(track)
|
||||
|
||||
// close channel and then try to negotiate
|
||||
// close channel and then try to Negotiate
|
||||
msg.Close()
|
||||
p.negotiate()
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
+27
-14
@@ -141,20 +141,11 @@ func (r *Room) Join(participant types.Participant) error {
|
||||
|
||||
if p.State() == livekit.ParticipantInfo_ACTIVE {
|
||||
// subscribe participant to existing publishedTracks
|
||||
for _, op := range r.GetParticipants() {
|
||||
if p.ID() == op.ID() {
|
||||
// don't send to itself
|
||||
continue
|
||||
}
|
||||
if err := op.AddSubscriber(p); err != nil {
|
||||
// TODO: log error? or disconnect?
|
||||
logger.Errorw("could not subscribe to participant",
|
||||
"dest", p.Identity(),
|
||||
"source", op.Identity())
|
||||
}
|
||||
}
|
||||
r.subscribeToExistingTracks(p)
|
||||
|
||||
// start the workers once connectivity is established
|
||||
p.Start()
|
||||
|
||||
} else if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
// remove participant from room
|
||||
go r.RemoveParticipant(p.Identity())
|
||||
@@ -244,7 +235,7 @@ func (r *Room) UpdateSubscriptions(participant types.Participant, subscription *
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the room if all participants had left, or it's still empty past timeout
|
||||
// CloseIfEmpty closes the room if all participants had left, or it's still empty past timeout
|
||||
func (r *Room) CloseIfEmpty() {
|
||||
if r.isClosed.Get() {
|
||||
return
|
||||
@@ -304,6 +295,7 @@ func (r *Room) onTrackAdded(participant types.Participant, track types.Published
|
||||
// not fully joined. don't subscribe yet
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Debugw("subscribing to new track",
|
||||
"source", participant.Identity(),
|
||||
"remoteTrack", track.ID(),
|
||||
@@ -336,6 +328,27 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Room) subscribeToExistingTracks(p types.Participant) {
|
||||
tracksAdded := 0
|
||||
for _, op := range r.GetParticipants() {
|
||||
if p.ID() == op.ID() {
|
||||
// don't send to itself
|
||||
continue
|
||||
}
|
||||
if n, err := op.AddSubscriber(p); err != nil {
|
||||
// TODO: log error? or disconnect?
|
||||
logger.Errorw("could not subscribe to participant",
|
||||
"dest", p.Identity(),
|
||||
"source", op.Identity())
|
||||
} else {
|
||||
tracksAdded += n
|
||||
}
|
||||
}
|
||||
if tracksAdded > 0 {
|
||||
logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded)
|
||||
}
|
||||
}
|
||||
|
||||
// broadcast an update about participant p
|
||||
func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
|
||||
updates := ToProtoParticipants([]types.Participant{p})
|
||||
@@ -357,7 +370,7 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
|
||||
|
||||
func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) {
|
||||
for _, p := range r.GetParticipants() {
|
||||
p.SendActiveSpeakers(speakers)
|
||||
_ = p.SendActiveSpeakers(speakers)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+41
-25
@@ -20,8 +20,8 @@ const (
|
||||
negotiationStateNone = iota
|
||||
// waiting for client answer
|
||||
negotiationStateClient
|
||||
// need to negotiate again
|
||||
negotiationStateServer
|
||||
// need to Negotiate again
|
||||
negotiationRetry
|
||||
)
|
||||
|
||||
// PCTransport is a wrapper around PeerConnection, with some helper methods
|
||||
@@ -32,7 +32,7 @@ type PCTransport struct {
|
||||
lock sync.Mutex
|
||||
pendingCandidates []webrtc.ICECandidateInit
|
||||
debouncedNegotiate func(func())
|
||||
onNegotiation func()
|
||||
onOffer func(offer webrtc.SessionDescription)
|
||||
|
||||
negotiationState atomic.Value
|
||||
}
|
||||
@@ -68,7 +68,6 @@ func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTranspo
|
||||
debouncedNegotiate: debounce.New(negotiationFrequency),
|
||||
}
|
||||
t.negotiationState.Store(negotiationStateNone)
|
||||
|
||||
t.pc.OnNegotiationNeeded(t.negotiate)
|
||||
|
||||
return t, nil
|
||||
@@ -90,7 +89,7 @@ func (t *PCTransport) PeerConnection() *webrtc.PeerConnection {
|
||||
}
|
||||
|
||||
func (t *PCTransport) Close() {
|
||||
t.pc.Close()
|
||||
_ = t.pc.Close()
|
||||
}
|
||||
|
||||
func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
|
||||
@@ -110,33 +109,50 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
|
||||
// negotiated, reset flag
|
||||
state := t.negotiationState.Load().(int)
|
||||
t.negotiationState.Store(negotiationStateNone)
|
||||
if state == negotiationStateServer && t.onNegotiation != nil {
|
||||
logger.Debugw("negotiating again")
|
||||
// need to negotiate again
|
||||
if state == negotiationRetry {
|
||||
// need to Negotiate again
|
||||
t.negotiate()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *PCTransport) OnNegotiationNeeded(f func()) {
|
||||
t.onNegotiation = f
|
||||
// OnOffer is called when the PeerConnection starts negotiation and prepares an offer
|
||||
func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) {
|
||||
t.onOffer = f
|
||||
}
|
||||
|
||||
func (t *PCTransport) negotiate() {
|
||||
t.debouncedNegotiate(func() {
|
||||
state := t.negotiationState.Load().(int)
|
||||
// when there's an ongoing negotiation, let it finish and not disrupt its state
|
||||
if state == negotiationStateClient {
|
||||
logger.Debugw("skipping negotiation, trying again later")
|
||||
t.negotiationState.Store(negotiationStateServer)
|
||||
return
|
||||
}
|
||||
|
||||
if t.onNegotiation != nil {
|
||||
t.onNegotiation()
|
||||
// indicate waiting for client
|
||||
t.negotiationState.Store(negotiationStateClient)
|
||||
}
|
||||
})
|
||||
t.debouncedNegotiate(t.handleNegotiate)
|
||||
}
|
||||
|
||||
func (t *PCTransport) handleNegotiate() {
|
||||
if t.onOffer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
state := t.negotiationState.Load().(int)
|
||||
// when there's an ongoing negotiation, let it finish and not disrupt its state
|
||||
if state == negotiationStateClient {
|
||||
logger.Debugw("skipping negotiation, trying again later")
|
||||
t.negotiationState.Store(negotiationRetry)
|
||||
return
|
||||
}
|
||||
|
||||
offer, err := t.pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
logger.Errorw("could not create offer", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = t.pc.SetLocalDescription(offer)
|
||||
if err != nil {
|
||||
logger.Errorw("could not set local description", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.onOffer(offer)
|
||||
|
||||
// indicate waiting for client
|
||||
t.negotiationState.Store(negotiationStateClient)
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ type Participant interface {
|
||||
HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error)
|
||||
HandleAnswer(sdp webrtc.SessionDescription) error
|
||||
AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error
|
||||
AddSubscriber(op Participant) error
|
||||
AddSubscriber(op Participant) (int, error)
|
||||
RemoveSubscriber(peerId string)
|
||||
SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error
|
||||
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
|
||||
|
||||
@@ -31,16 +31,18 @@ type FakeParticipant struct {
|
||||
arg1 string
|
||||
arg2 types.SubscribedTrack
|
||||
}
|
||||
AddSubscriberStub func(types.Participant) error
|
||||
AddSubscriberStub func(types.Participant) (int, error)
|
||||
addSubscriberMutex sync.RWMutex
|
||||
addSubscriberArgsForCall []struct {
|
||||
arg1 types.Participant
|
||||
}
|
||||
addSubscriberReturns struct {
|
||||
result1 error
|
||||
result1 int
|
||||
result2 error
|
||||
}
|
||||
addSubscriberReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
result1 int
|
||||
result2 error
|
||||
}
|
||||
AddTrackStub func(string, string, livekit.TrackType)
|
||||
addTrackMutex sync.RWMutex
|
||||
@@ -430,7 +432,7 @@ func (fake *FakeParticipant) AddSubscribedTrackArgsForCall(i int) (string, types
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) error {
|
||||
func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) (int, error) {
|
||||
fake.addSubscriberMutex.Lock()
|
||||
ret, specificReturn := fake.addSubscriberReturnsOnCall[len(fake.addSubscriberArgsForCall)]
|
||||
fake.addSubscriberArgsForCall = append(fake.addSubscriberArgsForCall, struct {
|
||||
@@ -444,9 +446,9 @@ func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) error {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddSubscriberCallCount() int {
|
||||
@@ -455,7 +457,7 @@ func (fake *FakeParticipant) AddSubscriberCallCount() int {
|
||||
return len(fake.addSubscriberArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddSubscriberCalls(stub func(types.Participant) error) {
|
||||
func (fake *FakeParticipant) AddSubscriberCalls(stub func(types.Participant) (int, error)) {
|
||||
fake.addSubscriberMutex.Lock()
|
||||
defer fake.addSubscriberMutex.Unlock()
|
||||
fake.AddSubscriberStub = stub
|
||||
@@ -468,27 +470,30 @@ func (fake *FakeParticipant) AddSubscriberArgsForCall(i int) types.Participant {
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddSubscriberReturns(result1 error) {
|
||||
func (fake *FakeParticipant) AddSubscriberReturns(result1 int, result2 error) {
|
||||
fake.addSubscriberMutex.Lock()
|
||||
defer fake.addSubscriberMutex.Unlock()
|
||||
fake.AddSubscriberStub = nil
|
||||
fake.addSubscriberReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
result1 int
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddSubscriberReturnsOnCall(i int, result1 error) {
|
||||
func (fake *FakeParticipant) AddSubscriberReturnsOnCall(i int, result1 int, result2 error) {
|
||||
fake.addSubscriberMutex.Lock()
|
||||
defer fake.addSubscriberMutex.Unlock()
|
||||
fake.AddSubscriberStub = nil
|
||||
if fake.addSubscriberReturnsOnCall == nil {
|
||||
fake.addSubscriberReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
result1 int
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.addSubscriberReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
result1 int
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) AddTrack(arg1 string, arg2 string, arg3 livekit.TrackType) {
|
||||
|
||||
@@ -5,14 +5,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/logger"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
livekit "github.com/livekit/livekit-server/proto"
|
||||
"github.com/livekit/protocol/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -140,7 +139,7 @@ func (r *RoomManager) DeleteRoom(roomName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// clean up after old rooms that have been around for awhile
|
||||
// CleanupRooms cleans up after old rooms that have been around for awhile
|
||||
func (r *RoomManager) CleanupRooms() error {
|
||||
// cleanup rooms that have been left for over a day
|
||||
rooms, err := r.roomStore.ListRooms()
|
||||
@@ -172,7 +171,7 @@ func (r *RoomManager) CloseIdleRooms() {
|
||||
}
|
||||
}
|
||||
|
||||
// starts WebRTC session when a new participant is connected, takes place on RTC node
|
||||
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
|
||||
func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
|
||||
room, err := r.getOrCreateRoom(roomName)
|
||||
if err != nil {
|
||||
@@ -196,7 +195,6 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
|
||||
prevSink.Close()
|
||||
}
|
||||
participant.SetResponseSink(responseSink)
|
||||
|
||||
return
|
||||
} else {
|
||||
// we need to clean up the existing participant, so a new one can join
|
||||
@@ -257,10 +255,14 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
}
|
||||
})
|
||||
room.OnParticipantChanged(func(p types.Participant) {
|
||||
var err error
|
||||
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
r.roomStore.DeleteParticipant(roomName, p.Identity())
|
||||
err = r.roomStore.DeleteParticipant(roomName, p.Identity())
|
||||
} else {
|
||||
r.roomStore.PersistParticipant(roomName, p.ToProto())
|
||||
err = r.roomStore.PersistParticipant(roomName, p.ToProto())
|
||||
}
|
||||
if err != nil {
|
||||
logger.Errorw("could not handle participant change", "error", err)
|
||||
}
|
||||
})
|
||||
r.lock.Lock()
|
||||
|
||||
@@ -63,7 +63,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
pi := routing.ParticipantInit{
|
||||
Reconnect: reconnectParam == "1" || reconnectParam == "true",
|
||||
Reconnect: boolValue(reconnectParam),
|
||||
Identity: claims.Identity,
|
||||
}
|
||||
// only use permissions if any of them are set, default permissive
|
||||
|
||||
@@ -25,5 +25,9 @@ func handleError(w http.ResponseWriter, status int, msg string) {
|
||||
l := logger.Desugar().WithOptions(zap.AddCallerSkip(1))
|
||||
l.Debug("error handling request", zap.String("error", msg), zap.Int("status", status))
|
||||
w.WriteHeader(status)
|
||||
w.Write([]byte(msg))
|
||||
_, _ = w.Write([]byte(msg))
|
||||
}
|
||||
|
||||
func boolValue(s string) bool {
|
||||
return s == "1" || s == "true"
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.26.0
|
||||
// protoc v3.15.7
|
||||
// protoc v3.15.8
|
||||
// source: livekit_internal.proto
|
||||
|
||||
package livekit
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.26.0
|
||||
// protoc v3.15.7
|
||||
// protoc v3.15.8
|
||||
// source: livekit_models.proto
|
||||
|
||||
package livekit
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.26.0
|
||||
// protoc v3.15.7
|
||||
// protoc v3.15.8
|
||||
// source: livekit_room.proto
|
||||
|
||||
package livekit
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.26.0
|
||||
// protoc v3.15.7
|
||||
// protoc v3.15.8
|
||||
// source: livekit_rtc.proto
|
||||
|
||||
package livekit
|
||||
|
||||
Reference in New Issue
Block a user