From 2fa46e2df43e4f26bed9ffc2616b3f6b7d102e33 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 25 Jan 2023 22:59:57 -0800 Subject: [PATCH] Retry initial connection attempt should it fail (#1335) Sometimes the initial selected node could fail. In that case, we'll give it a few more attempts to locate a media node for the session instead of failing it after the first try. --- pkg/routing/interfaces.go | 2 + pkg/service/rtcservice.go | 98 ++++++++++++++++++----------- pkg/telemetry/prometheus/packets.go | 6 ++ pkg/utils/context.go | 32 ++++++++++ 4 files changed, 102 insertions(+), 36 deletions(-) create mode 100644 pkg/utils/context.go diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 53e835a0a..b7cd5b406 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -16,6 +16,7 @@ import ( // MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, // potentially on a different node via a transport +// //counterfeiter:generate . MessageSink type MessageSink interface { WriteMessage(msg proto.Message) error @@ -57,6 +58,7 @@ type RTCMessageCallback func( ) // Router allows multiple nodes to coordinate the participant session +// //counterfeiter:generate . Router type Router interface { MessageRouter diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ee295b4a5..c7a777d4e 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -14,6 +14,7 @@ import ( "github.com/sebest/xff" "github.com/ua-parser/uap-go/uaparser" + "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -25,10 +26,6 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -const ( - maxInitialResponseWait = 10 * time.Second -) - type RTCService struct { router routing.MessageRouter roomAllocator RoomAllocator @@ -184,33 +181,30 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - // create room if it doesn't exist, also assigns an RTC node for the room - rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: string(roomName)}) - if err != nil { - handleError(w, http.StatusInternalServerError, err, loggerFields...) - return + // give it a few attempts to start session + var cr connectionResult + for i := 0; i < 3; i++ { + connectionTimeout := 3 * time.Second * time.Duration(i+1) + ctx := utils.ContextWithAttempt(r.Context(), i) + cr, err = s.startConnection(ctx, roomName, pi, connectionTimeout) + if err == nil { + break + } + if i < 2 { + fieldsWithAttempt := append(loggerFields, "attempt", i) + logger.Warnw("failed to start connection, retrying", err, fieldsWithAttempt...) + } } - - // this needs to be started first *before* using router functions on this node - connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi) - if err != nil { - handleError(w, http.StatusInternalServerError, err, loggerFields...) - return - } - - // wait for the first message before upgrading to websocket. If no one is - // responding to our connection attempt, we should terminate the connection - // instead of waiting forever on the WebSocket - initialResponse, err := readInitialResponse(resSource, maxInitialResponseWait) if err != nil { + prometheus.IncrementParticipantJoinFail(1) handleError(w, http.StatusInternalServerError, err, loggerFields...) return } prometheus.IncrementParticipantJoin(1) - if !pi.Reconnect && initialResponse.GetJoin() != nil { - pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) + if !pi.Reconnect && cr.InitialResponse.GetJoin() != nil { + pi.ID = livekit.ParticipantID(cr.InitialResponse.GetJoin().GetParticipant().GetSid()) } var signalStats *telemetry.BytesTrackStats @@ -222,7 +216,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetLogger(), roomName, livekit.RoomID(rm.Sid)), + rtc.LoggerWithRoom(logger.GetLogger(), roomName, livekit.RoomID(cr.Room.Sid)), pi.Identity, pi.ID, false, @@ -231,9 +225,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { - pLogger.Infow("finishing WS connection", "connID", connId) - resSource.Close() - reqSink.Close() + pLogger.Infow("finishing WS connection", "connID", cr.ConnectionID) + cr.ResponseSource.Close() + cr.RequestSink.Close() close(done) if signalStats != nil { @@ -250,7 +244,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // websocket established sigConn := NewWSSignalConnection(conn) - if count, err := sigConn.WriteResponse(initialResponse); err != nil { + if count, err := sigConn.WriteResponse(cr.InitialResponse); err != nil { pLogger.Warnw("could not write initial response", err) return } else { @@ -258,7 +252,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { signalStats.AddBytes(uint64(count), true) } } - pLogger.Infow("new client WS connected", "connID", connId) + pLogger.Infow("new client WS connected", "connID", cr.ConnectionID) // handle responses go func() { @@ -272,7 +266,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { select { case <-done: return - case msg := <-resSource.ReadChan(): + case msg := <-cr.ResponseSource.ReadChan(): if msg == nil { return } @@ -280,12 +274,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !ok { pLogger.Errorw("unexpected message type", nil, "type", fmt.Sprintf("%T", msg), - "connID", connId) + "connID", cr.ConnectionID) continue } - if pi.ID == "" && initialResponse.GetJoin() != nil { - pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) + if pi.ID == "" && cr.InitialResponse.GetJoin() != nil { + pi.ID = livekit.ParticipantID(cr.InitialResponse.GetJoin().GetParticipant().GetSid()) signalStats = telemetry.NewBytesTrackStats( telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID), pi.ID, @@ -309,7 +303,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") || websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { - pLogger.Debugw("exit ws read loop for closed connection", "connID", connId) + pLogger.Debugw("exit ws read loop for closed connection", "connID", cr.ConnectionID) return } else { pLogger.Errorw("error reading from websocket", err) @@ -334,9 +328,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } continue } - if err := reqSink.WriteMessage(req); err != nil { + if err := cr.RequestSink.WriteMessage(req); err != nil { pLogger.Warnw("error writing to request sink", err, - "connID", connId) + "connID", cr.ConnectionID) } } } @@ -402,6 +396,38 @@ func (s *RTCService) ParseClientInfo(r *http.Request) *livekit.ClientInfo { return ci } +type connectionResult struct { + Room *livekit.Room + ConnectionID livekit.ConnectionID + RequestSink routing.MessageSink + ResponseSource routing.MessageSource + InitialResponse *livekit.SignalResponse +} + +func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, error) { + var cr connectionResult + var err error + cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) + if err != nil { + return cr, err + } + + // this needs to be started first *before* using router functions on this node + cr.ConnectionID, cr.RequestSink, cr.ResponseSource, err = s.router.StartParticipantSignal(ctx, roomName, pi) + if err != nil { + return cr, err + } + + // wait for the first message before upgrading to websocket. If no one is + // responding to our connection attempt, we should terminate the connection + // instead of waiting forever on the WebSocket + cr.InitialResponse, err = readInitialResponse(cr.ResponseSource, timeout) + if err != nil { + return cr, err + } + return cr, nil +} + func readInitialResponse(source routing.MessageSource, timeout time.Duration) (*livekit.SignalResponse, error) { responseTimer := time.NewTimer(timeout) defer responseTimer.Stop() diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index b51d14231..8972d6ff6 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -199,6 +199,12 @@ func IncrementParticipantJoin(join uint32) { } } +func IncrementParticipantJoinFail(join uint32) { + if join > 0 { + promParticipantJoin.WithLabelValues("signal_failed").Add(float64(join)) + } +} + func IncrementParticipantRtcInit(join uint32) { if join > 0 { participantRTCInit.Add(uint64(join)) diff --git a/pkg/utils/context.go b/pkg/utils/context.go new file mode 100644 index 000000000..f5262e178 --- /dev/null +++ b/pkg/utils/context.go @@ -0,0 +1,32 @@ +/* + * Copyright 2023 LiveKit, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import "context" + +var attemptKey = struct{}{} + +func ContextWithAttempt(ctx context.Context, attempt int) context.Context { + return context.WithValue(ctx, attemptKey, attempt) +} + +func GetAttempt(ctx context.Context) int { + if attempt, ok := ctx.Value(attemptKey).(int); ok { + return attempt + } + return 0 +}