diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index da111482e..21a87f07d 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -132,7 +132,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { c.subscriber.PeerConnection().OnDataChannel(func(channel *webrtc.DataChannel) { }) - c.publisher.OnNegotiationNeeded(c.negotiate) + c.publisher.OnOffer(c.onOffer) c.publisher.PeerConnection().OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { logger.Debugw("ICE state has changed", "state", connectionState.String(), @@ -414,7 +414,6 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string) if _, err = c.publisher.PeerConnection().AddTrack(track); err != nil { return } - writer = NewTrackWriter(c.ctx, track, path) // write tracks only after ICE connectivity @@ -516,21 +515,11 @@ func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription) error { return nil } -func (c *RTCClient) negotiate() { +func (c *RTCClient) onOffer(offer webrtc.SessionDescription) { if c.localParticipant != nil { logger.Debugw("starting negotiation", "participant", c.localParticipant.Identity) } - - offer, err := c.publisher.PeerConnection().CreateOffer(nil) - if err != nil { - return - } - - if err := c.publisher.PeerConnection().SetLocalDescription(offer); err != nil { - return - } - - c.SendRequest(&livekit.SignalRequest{ + _ = c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Offer{ Offer: rtc.ToProtoSessionDescription(offer), }, diff --git a/go.mod b/go.mod index 2e8a05464..3529e6702 100644 --- a/go.mod +++ b/go.mod @@ -42,3 +42,5 @@ require ( ) replace github.com/pion/webrtc/v3 => github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125 + +replace github.com/pion/ice/v2 => github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd diff --git a/go.sum b/go.sum index c22e52e9c..089f95b5c 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd h1:nQsWFvFzYepfS913kSWUEI95VBG3oIIEf2PCoXpakd8= +github.com/davidzhao/ice/v2 v2.0.17-0.20210414203851-3eacd99ac5cd/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0= github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125 h1:QXgTQ0EMskrDLMRqLMnkOnGPmS2oJPy7TGCJXIysdec= github.com/davidzhao/webrtc/v3 v3.0.4-0.20210413050400-3d284fbde125/go.mod h1:vncfQFH7R8oOL0eZQKyQ10Qx7DPfGKoDKCzSsUz4CsE= github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE= @@ -159,7 +161,6 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= @@ -312,13 +313,9 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pion/datachannel v1.4.21 h1:3ZvhNyfmxsAqltQrApLPQMhSFNA+aT87RqyCq4OXmf0= github.com/pion/datachannel v1.4.21/go.mod h1:oiNyP4gHx2DIwRzX/MFyH0Rz/Gz05OgBlayAI2hAWjg= -github.com/pion/dtls/v2 v2.0.4/go.mod h1:qAkFscX0ZHoI1E07RfYPoRw3manThveu+mlTDdOxoGI= github.com/pion/dtls/v2 v2.0.8/go.mod h1:QuDII+8FVvk9Dp5t5vYIMTo7hh7uBkra+8QIm7QGm10= github.com/pion/dtls/v2 v2.0.9 h1:7Ow+V++YSZQMYzggI0P9vLJz/hUFcffsfGMfT/Qy+u8= github.com/pion/dtls/v2 v2.0.9/go.mod h1:O0Wr7si/Zj5/EBFlDzDd6UtVxx25CE1r7XM7BQKYQho= -github.com/pion/ice/v2 v2.0.15/go.mod h1:ZIiVGevpgAxF/cXiIVmuIUtCb3Xs4gCzCbXB6+nFkSI= -github.com/pion/ice/v2 v2.1.0 h1:h9lmmO+BoxPDn4k/qZRDKFqWLOMVABtIVV8TZba6sQQ= -github.com/pion/ice/v2 v2.1.0/go.mod h1:kV4EODVD5ux2z8XncbLHIOtcXKtYXVgLVCeVqnpoeP0= github.com/pion/interceptor v0.0.12 h1:eC1iVneBIAQJEfaNAfDqAncJWhMDAnaXPRCJsltdokE= github.com/pion/interceptor v0.0.12/go.mod h1:qzeuWuD/ZXvPqOnxNcnhWfkCZ2e1kwwslicyyPnhoK4= github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q= @@ -327,7 +324,6 @@ github.com/pion/ion-sfu v1.9.7 h1:kWshVj7J5JZFBHsGmHWqbSiPbXBLjtsWUkfnVEQv5Dw= github.com/pion/ion-sfu v1.9.7/go.mod h1:BQf2nidxkQU5Qc37e13i+rc7Q+qt8CWp0HmYtsy00o8= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= -github.com/pion/mdns v0.0.4/go.mod h1:R1sL0p50l42S5lJs91oNdUL58nm0QHrhxnSegr++qC0= github.com/pion/mdns v0.0.5 h1:Q2oj/JB3NqfzY9xGZ1fPzZzK7sDSD8rZPOvcIQ10BCw= github.com/pion/mdns v0.0.5/go.mod h1:UgssrvdD3mxpi8tMxAXbsppL3vJ4Jipw1mTCW+al01g= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= @@ -345,10 +341,8 @@ github.com/pion/srtp/v2 v2.0.2 h1:664iGzVmaY7KYS5M0gleY0DscRo9ReDfTxQrq4UgGoU= github.com/pion/srtp/v2 v2.0.2/go.mod h1:VEyLv4CuxrwGY8cxM+Ng3bmVy8ckz/1t6A0q/msKOw0= github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= -github.com/pion/transport v0.8.10/go.mod h1:tBmha/UCjpum5hqTWhfAEs3CO4/tHSg0MYRhSzR+CZ8= github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A= -github.com/pion/transport v0.12.1/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= github.com/pion/transport v0.12.3 h1:vdBfvfU/0Wq8kd2yhUMSDB/x+O4Z9MYVl2fJ5BT4JZw= github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A= @@ -500,7 +494,6 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w= @@ -546,14 +539,12 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201026091529-146b70c837a4/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 0fe5a33a3..ec1f2cef5 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -19,7 +19,7 @@ type MessageSink interface { //counterfeiter:generate . MessageSource type MessageSource interface { - // source exposes a one way channel to make it easier to use with select + // ReadChan exposes a one way channel to make it easier to use with select ReadChan() <-chan proto.Message } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 2e38c2192..18d7a551c 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -128,7 +128,7 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) { return nodes, nil } -// signal connection sets up paths to the RTC node, and starts to route messages to that message queue +// StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { // find the node where the room is hosted at rtcNode, err := r.GetNodeForRoom(roomName) @@ -248,8 +248,8 @@ func (r *RedisRouter) Stop() { return } logger.Debugw("stopping RedisRouter") - r.pubsub.Close() - r.UnregisterNode() + _ = r.pubsub.Close() + _ = r.UnregisterNode() r.cancel() } diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index 30e8aaa68..cad91c757 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -58,8 +58,7 @@ func NewWebRTCConfig(conf *config.RTCConfig, externalIP string) (*WebRTCConfig, bufferFactory := buffer.NewBufferFactory(conf.PacketBufferSize, zapr.NewLogger(logger.Desugar())) s.BufferFactory = bufferFactory.GetOrNew - networkTypes := []webrtc.NetworkType{} - + networkTypes := make([]webrtc.NetworkType, 0, 4) if !conf.ForceTCP { networkTypes = append(networkTypes, webrtc.NetworkTypeUDP4, diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index ace1831c6..d7bb42a84 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -109,8 +109,7 @@ func (t *MediaTrack) OnClose(f func()) { t.onClose = f } -// subscribes participant to current remoteTrack -// creates and add necessary forwarders and starts them +// AddSubscriber subscribes sub to current mediaTrack func (t *MediaTrack) AddSubscriber(sub types.Participant) error { t.lock.Lock() defer t.lock.Unlock() @@ -200,7 +199,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return nil } -// adds a new RTP receiver to the track, returns true if this is a new track +// AddReceiver adds a new RTP receiver to the track, returns true if this is a new track func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder) { t.lock.Lock() defer t.lock.Unlock() @@ -263,8 +262,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra }) } -// removes peer from subscription -// stop all forwarders to the peer +// RemoveSubscriber removes participant from subscription +// stop all forwarders to the client func (t *MediaTrack) RemoveSubscriber(participantId string) { t.lock.RLock() defer t.lock.RUnlock() @@ -301,9 +300,7 @@ func (t *MediaTrack) sendDownTrackBindingReports(participantId string, rtcpCh ch if chunks == nil { return } - if chunks != nil { - sd = append(sd, chunks...) - } + sd = append(sd, chunks...) pkts := []rtcp.Packet{ &rtcp.SourceDescription{Chunks: sd}, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 07fbbdde2..dcc59e8c0 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -112,8 +112,7 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, p.publisher.pc.OnTrack(p.onMediaTrack) p.subscriber.pc.OnDataChannel(p.onDataChannel) - - p.subscriber.OnNegotiationNeeded(p.negotiate) + p.subscriber.OnOffer(p.onOffer) return p, nil } @@ -139,7 +138,7 @@ func (p *ParticipantImpl) ConnectedAt() time.Time { return p.connectedAt.Load().(time.Time) } -// attach metadata to the participant +// SetMetadata attaches metadata to the participant func (p *ParticipantImpl) SetMetadata(metadata string) { p.metadata = metadata @@ -334,7 +333,6 @@ func (p *ParticipantImpl) Close() error { p.updateState(livekit.ParticipantInfo_DISCONNECTED) p.subscriber.pc.OnDataChannel(nil) p.subscriber.pc.OnICECandidate(nil) - p.subscriber.pc.OnNegotiationNeeded(nil) p.subscriber.pc.OnTrack(nil) p.publisher.pc.OnICECandidate(nil) // ensure this is synchronized @@ -352,7 +350,7 @@ func (p *ParticipantImpl) Close() error { } // Subscribes op to all publishedTracks -func (p *ParticipantImpl) AddSubscriber(op types.Participant) error { +func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) { p.lock.RLock() tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks)) for _, t := range p.publishedTracks { @@ -361,7 +359,7 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) error { defer p.lock.RUnlock() if len(tracks) == 0 { - return nil + return 0, nil } logger.Debugw("subscribing new participant to tracks", @@ -369,12 +367,14 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) error { "newParticipant", op.Identity(), "numTracks", len(tracks)) + n := 0 for _, track := range tracks { if err := track.AddSubscriber(op); err != nil { - return err + return n, err } + n += 1 } - return nil + return n, nil } func (p *ParticipantImpl) RemoveSubscriber(participantId string) { @@ -533,40 +533,6 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki }) } -// initiates server-driven negotiation by creating an offer -func (p *ParticipantImpl) negotiate() { - if p.State() == livekit.ParticipantInfo_DISCONNECTED { - logger.Debugw("skipping server negotiation", "participant", p.Identity()) - // skip when disconnected - return - } - - logger.Debugw("starting server negotiation", "participant", p.Identity()) - - offer, err := p.subscriber.pc.CreateOffer(nil) - if err != nil { - logger.Errorw("could not create offer", "err", err) - return - } - - err = p.subscriber.pc.SetLocalDescription(offer) - if err != nil { - logger.Errorw("could not set local description", "err", err) - return - } - - logger.Debugw("sending offer to participant", - "participant", p.Identity(), - //"sdp", offer.SDP, - ) - - p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Offer{ - Offer: ToProtoSessionDescription(offer), - }, - }) -} - func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { oldState := p.State() if state == oldState { @@ -599,6 +565,25 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { return nil } +// when the server has an offer for participant +func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) { + if p.State() == livekit.ParticipantInfo_DISCONNECTED { + logger.Debugw("skipping server offer", "participant", p.Identity()) + // skip when disconnected + return + } + logger.Debugw("sending server offer to participant", + "participant", p.Identity(), + //"sdp", offer.SDP, + ) + + _ = p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_Offer{ + Offer: ToProtoSessionDescription(sd), + }, + }) +} + // when a new remoteTrack is created, creates a Track and adds it to room func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { logger.Debugw("mediaTrack added", diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 8d4a62b0c..249535b49 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -128,7 +128,7 @@ func TestTrackPublishing(t *testing.T) { // after disconnection, things should continue to function and not panic func TestDisconnectTiming(t *testing.T) { - t.Run("negotiate doesn't fail after channel closed", func(t *testing.T) { + t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) { p := newParticipantForTest("test") msg := routing.NewMessageChannel() p.responseSink = msg @@ -140,9 +140,8 @@ func TestDisconnectTiming(t *testing.T) { track := &typesfakes.FakePublishedTrack{} p.handleTrackPublished(track) - // close channel and then try to negotiate + // close channel and then try to Negotiate msg.Close() - p.negotiate() }) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 03a1829ec..47d3e46f8 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -141,20 +141,11 @@ func (r *Room) Join(participant types.Participant) error { if p.State() == livekit.ParticipantInfo_ACTIVE { // subscribe participant to existing publishedTracks - for _, op := range r.GetParticipants() { - if p.ID() == op.ID() { - // don't send to itself - continue - } - if err := op.AddSubscriber(p); err != nil { - // TODO: log error? or disconnect? - logger.Errorw("could not subscribe to participant", - "dest", p.Identity(), - "source", op.Identity()) - } - } + r.subscribeToExistingTracks(p) + // start the workers once connectivity is established p.Start() + } else if p.State() == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room go r.RemoveParticipant(p.Identity()) @@ -244,7 +235,7 @@ func (r *Room) UpdateSubscriptions(participant types.Participant, subscription * return nil } -// Close the room if all participants had left, or it's still empty past timeout +// CloseIfEmpty closes the room if all participants had left, or it's still empty past timeout func (r *Room) CloseIfEmpty() { if r.isClosed.Get() { return @@ -304,6 +295,7 @@ func (r *Room) onTrackAdded(participant types.Participant, track types.Published // not fully joined. don't subscribe yet continue } + logger.Debugw("subscribing to new track", "source", participant.Identity(), "remoteTrack", track.ID(), @@ -336,6 +328,27 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) { } } +func (r *Room) subscribeToExistingTracks(p types.Participant) { + tracksAdded := 0 + for _, op := range r.GetParticipants() { + if p.ID() == op.ID() { + // don't send to itself + continue + } + if n, err := op.AddSubscriber(p); err != nil { + // TODO: log error? or disconnect? + logger.Errorw("could not subscribe to participant", + "dest", p.Identity(), + "source", op.Identity()) + } else { + tracksAdded += n + } + } + if tracksAdded > 0 { + logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded) + } +} + // broadcast an update about participant p func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) { updates := ToProtoParticipants([]types.Participant{p}) @@ -357,7 +370,7 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) { func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) { for _, p := range r.GetParticipants() { - p.SendActiveSpeakers(speakers) + _ = p.SendActiveSpeakers(speakers) } } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 1ca3ff77b..3d6898456 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -20,8 +20,8 @@ const ( negotiationStateNone = iota // waiting for client answer negotiationStateClient - // need to negotiate again - negotiationStateServer + // need to Negotiate again + negotiationRetry ) // PCTransport is a wrapper around PeerConnection, with some helper methods @@ -32,7 +32,7 @@ type PCTransport struct { lock sync.Mutex pendingCandidates []webrtc.ICECandidateInit debouncedNegotiate func(func()) - onNegotiation func() + onOffer func(offer webrtc.SessionDescription) negotiationState atomic.Value } @@ -68,7 +68,6 @@ func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTranspo debouncedNegotiate: debounce.New(negotiationFrequency), } t.negotiationState.Store(negotiationStateNone) - t.pc.OnNegotiationNeeded(t.negotiate) return t, nil @@ -90,7 +89,7 @@ func (t *PCTransport) PeerConnection() *webrtc.PeerConnection { } func (t *PCTransport) Close() { - t.pc.Close() + _ = t.pc.Close() } func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { @@ -110,33 +109,50 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { // negotiated, reset flag state := t.negotiationState.Load().(int) t.negotiationState.Store(negotiationStateNone) - if state == negotiationStateServer && t.onNegotiation != nil { - logger.Debugw("negotiating again") - // need to negotiate again + if state == negotiationRetry { + // need to Negotiate again t.negotiate() } return nil } -func (t *PCTransport) OnNegotiationNeeded(f func()) { - t.onNegotiation = f +// OnOffer is called when the PeerConnection starts negotiation and prepares an offer +func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) { + t.onOffer = f } func (t *PCTransport) negotiate() { - t.debouncedNegotiate(func() { - state := t.negotiationState.Load().(int) - // when there's an ongoing negotiation, let it finish and not disrupt its state - if state == negotiationStateClient { - logger.Debugw("skipping negotiation, trying again later") - t.negotiationState.Store(negotiationStateServer) - return - } - - if t.onNegotiation != nil { - t.onNegotiation() - // indicate waiting for client - t.negotiationState.Store(negotiationStateClient) - } - }) + t.debouncedNegotiate(t.handleNegotiate) +} + +func (t *PCTransport) handleNegotiate() { + if t.onOffer == nil { + return + } + + state := t.negotiationState.Load().(int) + // when there's an ongoing negotiation, let it finish and not disrupt its state + if state == negotiationStateClient { + logger.Debugw("skipping negotiation, trying again later") + t.negotiationState.Store(negotiationRetry) + return + } + + offer, err := t.pc.CreateOffer(nil) + if err != nil { + logger.Errorw("could not create offer", "err", err) + return + } + + err = t.pc.SetLocalDescription(offer) + if err != nil { + logger.Errorw("could not set local description", "err", err) + return + } + + t.onOffer(offer) + + // indicate waiting for client + t.negotiationState.Store(negotiationStateClient) } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 5b900329e..ab901bb5f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -41,7 +41,7 @@ type Participant interface { HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) HandleAnswer(sdp webrtc.SessionDescription) error AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error - AddSubscriber(op Participant) error + AddSubscriber(op Participant) (int, error) RemoveSubscriber(peerId string) SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error SendParticipantUpdate(participants []*livekit.ParticipantInfo) error diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 11b03a6e7..6f59fee46 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -31,16 +31,18 @@ type FakeParticipant struct { arg1 string arg2 types.SubscribedTrack } - AddSubscriberStub func(types.Participant) error + AddSubscriberStub func(types.Participant) (int, error) addSubscriberMutex sync.RWMutex addSubscriberArgsForCall []struct { arg1 types.Participant } addSubscriberReturns struct { - result1 error + result1 int + result2 error } addSubscriberReturnsOnCall map[int]struct { - result1 error + result1 int + result2 error } AddTrackStub func(string, string, livekit.TrackType) addTrackMutex sync.RWMutex @@ -430,7 +432,7 @@ func (fake *FakeParticipant) AddSubscribedTrackArgsForCall(i int) (string, types return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) error { +func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) (int, error) { fake.addSubscriberMutex.Lock() ret, specificReturn := fake.addSubscriberReturnsOnCall[len(fake.addSubscriberArgsForCall)] fake.addSubscriberArgsForCall = append(fake.addSubscriberArgsForCall, struct { @@ -444,9 +446,9 @@ func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) error { return stub(arg1) } if specificReturn { - return ret.result1 + return ret.result1, ret.result2 } - return fakeReturns.result1 + return fakeReturns.result1, fakeReturns.result2 } func (fake *FakeParticipant) AddSubscriberCallCount() int { @@ -455,7 +457,7 @@ func (fake *FakeParticipant) AddSubscriberCallCount() int { return len(fake.addSubscriberArgsForCall) } -func (fake *FakeParticipant) AddSubscriberCalls(stub func(types.Participant) error) { +func (fake *FakeParticipant) AddSubscriberCalls(stub func(types.Participant) (int, error)) { fake.addSubscriberMutex.Lock() defer fake.addSubscriberMutex.Unlock() fake.AddSubscriberStub = stub @@ -468,27 +470,30 @@ func (fake *FakeParticipant) AddSubscriberArgsForCall(i int) types.Participant { return argsForCall.arg1 } -func (fake *FakeParticipant) AddSubscriberReturns(result1 error) { +func (fake *FakeParticipant) AddSubscriberReturns(result1 int, result2 error) { fake.addSubscriberMutex.Lock() defer fake.addSubscriberMutex.Unlock() fake.AddSubscriberStub = nil fake.addSubscriberReturns = struct { - result1 error - }{result1} + result1 int + result2 error + }{result1, result2} } -func (fake *FakeParticipant) AddSubscriberReturnsOnCall(i int, result1 error) { +func (fake *FakeParticipant) AddSubscriberReturnsOnCall(i int, result1 int, result2 error) { fake.addSubscriberMutex.Lock() defer fake.addSubscriberMutex.Unlock() fake.AddSubscriberStub = nil if fake.addSubscriberReturnsOnCall == nil { fake.addSubscriberReturnsOnCall = make(map[int]struct { - result1 error + result1 int + result2 error }) } fake.addSubscriberReturnsOnCall[i] = struct { - result1 error - }{result1} + result1 int + result2 error + }{result1, result2} } func (fake *FakeParticipant) AddTrack(arg1 string, arg2 string, arg3 livekit.TrackType) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e90fcf134..7b4d98f43 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -5,14 +5,13 @@ import ( "sync" "time" - "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" livekit "github.com/livekit/livekit-server/proto" + "github.com/livekit/protocol/utils" ) const ( @@ -140,7 +139,7 @@ func (r *RoomManager) DeleteRoom(roomName string) error { return err } -// clean up after old rooms that have been around for awhile +// CleanupRooms cleans up after old rooms that have been around for awhile func (r *RoomManager) CleanupRooms() error { // cleanup rooms that have been left for over a day rooms, err := r.roomStore.ListRooms() @@ -172,7 +171,7 @@ func (r *RoomManager) CloseIdleRooms() { } } -// starts WebRTC session when a new participant is connected, takes place on RTC node +// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { room, err := r.getOrCreateRoom(roomName) if err != nil { @@ -196,7 +195,6 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, prevSink.Close() } participant.SetResponseSink(responseSink) - return } else { // we need to clean up the existing participant, so a new one can join @@ -257,10 +255,14 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { } }) room.OnParticipantChanged(func(p types.Participant) { + var err error if p.State() == livekit.ParticipantInfo_DISCONNECTED { - r.roomStore.DeleteParticipant(roomName, p.Identity()) + err = r.roomStore.DeleteParticipant(roomName, p.Identity()) } else { - r.roomStore.PersistParticipant(roomName, p.ToProto()) + err = r.roomStore.PersistParticipant(roomName, p.ToProto()) + } + if err != nil { + logger.Errorw("could not handle participant change", "error", err) } }) r.lock.Lock() diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 7496a7366..d3c4cbb2e 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -63,7 +63,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } pi := routing.ParticipantInit{ - Reconnect: reconnectParam == "1" || reconnectParam == "true", + Reconnect: boolValue(reconnectParam), Identity: claims.Identity, } // only use permissions if any of them are set, default permissive diff --git a/pkg/service/utils.go b/pkg/service/utils.go index a661c52f9..76acf99dc 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -25,5 +25,9 @@ func handleError(w http.ResponseWriter, status int, msg string) { l := logger.Desugar().WithOptions(zap.AddCallerSkip(1)) l.Debug("error handling request", zap.String("error", msg), zap.Int("status", status)) w.WriteHeader(status) - w.Write([]byte(msg)) + _, _ = w.Write([]byte(msg)) +} + +func boolValue(s string) bool { + return s == "1" || s == "true" } diff --git a/proto/livekit_internal.pb.go b/proto/livekit_internal.pb.go index 158000020..5c820191b 100644 --- a/proto/livekit_internal.pb.go +++ b/proto/livekit_internal.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.15.7 +// protoc v3.15.8 // source: livekit_internal.proto package livekit diff --git a/proto/livekit_models.pb.go b/proto/livekit_models.pb.go index 68445797d..b9172b0bf 100644 --- a/proto/livekit_models.pb.go +++ b/proto/livekit_models.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.15.7 +// protoc v3.15.8 // source: livekit_models.proto package livekit diff --git a/proto/livekit_room.pb.go b/proto/livekit_room.pb.go index 84687518d..0fda6e5de 100644 --- a/proto/livekit_room.pb.go +++ b/proto/livekit_room.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.15.7 +// protoc v3.15.8 // source: livekit_room.proto package livekit diff --git a/proto/livekit_rtc.pb.go b/proto/livekit_rtc.pb.go index bf6ba128f..dbd0c4bfb 100644 --- a/proto/livekit_rtc.pb.go +++ b/proto/livekit_rtc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.15.7 +// protoc v3.15.8 // source: livekit_rtc.proto package livekit