mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 17:45:40 +00:00
Retry initial connection attempt should it fail (#1335)
Sometimes the initial selected node could fail. In that case, we'll give it a few more attempts to locate a media node for the session instead of failing it after the first try.
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
// MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource,
|
||||
// potentially on a different node via a transport
|
||||
//
|
||||
//counterfeiter:generate . MessageSink
|
||||
type MessageSink interface {
|
||||
WriteMessage(msg proto.Message) error
|
||||
@@ -57,6 +58,7 @@ type RTCMessageCallback func(
|
||||
)
|
||||
|
||||
// Router allows multiple nodes to coordinate the participant session
|
||||
//
|
||||
//counterfeiter:generate . Router
|
||||
type Router interface {
|
||||
MessageRouter
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/sebest/xff"
|
||||
"github.com/ua-parser/uap-go/uaparser"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/utils"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
|
||||
@@ -25,10 +26,6 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
maxInitialResponseWait = 10 * time.Second
|
||||
)
|
||||
|
||||
type RTCService struct {
|
||||
router routing.MessageRouter
|
||||
roomAllocator RoomAllocator
|
||||
@@ -184,33 +181,30 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// create room if it doesn't exist, also assigns an RTC node for the room
|
||||
rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: string(roomName)})
|
||||
if err != nil {
|
||||
handleError(w, http.StatusInternalServerError, err, loggerFields...)
|
||||
return
|
||||
// give it a few attempts to start session
|
||||
var cr connectionResult
|
||||
for i := 0; i < 3; i++ {
|
||||
connectionTimeout := 3 * time.Second * time.Duration(i+1)
|
||||
ctx := utils.ContextWithAttempt(r.Context(), i)
|
||||
cr, err = s.startConnection(ctx, roomName, pi, connectionTimeout)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if i < 2 {
|
||||
fieldsWithAttempt := append(loggerFields, "attempt", i)
|
||||
logger.Warnw("failed to start connection, retrying", err, fieldsWithAttempt...)
|
||||
}
|
||||
}
|
||||
|
||||
// this needs to be started first *before* using router functions on this node
|
||||
connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi)
|
||||
if err != nil {
|
||||
handleError(w, http.StatusInternalServerError, err, loggerFields...)
|
||||
return
|
||||
}
|
||||
|
||||
// wait for the first message before upgrading to websocket. If no one is
|
||||
// responding to our connection attempt, we should terminate the connection
|
||||
// instead of waiting forever on the WebSocket
|
||||
initialResponse, err := readInitialResponse(resSource, maxInitialResponseWait)
|
||||
if err != nil {
|
||||
prometheus.IncrementParticipantJoinFail(1)
|
||||
handleError(w, http.StatusInternalServerError, err, loggerFields...)
|
||||
return
|
||||
}
|
||||
|
||||
prometheus.IncrementParticipantJoin(1)
|
||||
|
||||
if !pi.Reconnect && initialResponse.GetJoin() != nil {
|
||||
pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid())
|
||||
if !pi.Reconnect && cr.InitialResponse.GetJoin() != nil {
|
||||
pi.ID = livekit.ParticipantID(cr.InitialResponse.GetJoin().GetParticipant().GetSid())
|
||||
}
|
||||
|
||||
var signalStats *telemetry.BytesTrackStats
|
||||
@@ -222,7 +216,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
pLogger := rtc.LoggerWithParticipant(
|
||||
rtc.LoggerWithRoom(logger.GetLogger(), roomName, livekit.RoomID(rm.Sid)),
|
||||
rtc.LoggerWithRoom(logger.GetLogger(), roomName, livekit.RoomID(cr.Room.Sid)),
|
||||
pi.Identity,
|
||||
pi.ID,
|
||||
false,
|
||||
@@ -231,9 +225,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
done := make(chan struct{})
|
||||
// function exits when websocket terminates, it'll close the event reading off of response sink as well
|
||||
defer func() {
|
||||
pLogger.Infow("finishing WS connection", "connID", connId)
|
||||
resSource.Close()
|
||||
reqSink.Close()
|
||||
pLogger.Infow("finishing WS connection", "connID", cr.ConnectionID)
|
||||
cr.ResponseSource.Close()
|
||||
cr.RequestSink.Close()
|
||||
close(done)
|
||||
|
||||
if signalStats != nil {
|
||||
@@ -250,7 +244,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// websocket established
|
||||
sigConn := NewWSSignalConnection(conn)
|
||||
if count, err := sigConn.WriteResponse(initialResponse); err != nil {
|
||||
if count, err := sigConn.WriteResponse(cr.InitialResponse); err != nil {
|
||||
pLogger.Warnw("could not write initial response", err)
|
||||
return
|
||||
} else {
|
||||
@@ -258,7 +252,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
signalStats.AddBytes(uint64(count), true)
|
||||
}
|
||||
}
|
||||
pLogger.Infow("new client WS connected", "connID", connId)
|
||||
pLogger.Infow("new client WS connected", "connID", cr.ConnectionID)
|
||||
|
||||
// handle responses
|
||||
go func() {
|
||||
@@ -272,7 +266,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case msg := <-resSource.ReadChan():
|
||||
case msg := <-cr.ResponseSource.ReadChan():
|
||||
if msg == nil {
|
||||
return
|
||||
}
|
||||
@@ -280,12 +274,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if !ok {
|
||||
pLogger.Errorw("unexpected message type", nil,
|
||||
"type", fmt.Sprintf("%T", msg),
|
||||
"connID", connId)
|
||||
"connID", cr.ConnectionID)
|
||||
continue
|
||||
}
|
||||
|
||||
if pi.ID == "" && initialResponse.GetJoin() != nil {
|
||||
pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid())
|
||||
if pi.ID == "" && cr.InitialResponse.GetJoin() != nil {
|
||||
pi.ID = livekit.ParticipantID(cr.InitialResponse.GetJoin().GetParticipant().GetSid())
|
||||
signalStats = telemetry.NewBytesTrackStats(
|
||||
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID),
|
||||
pi.ID,
|
||||
@@ -309,7 +303,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if err != nil {
|
||||
if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
|
||||
pLogger.Debugw("exit ws read loop for closed connection", "connID", connId)
|
||||
pLogger.Debugw("exit ws read loop for closed connection", "connID", cr.ConnectionID)
|
||||
return
|
||||
} else {
|
||||
pLogger.Errorw("error reading from websocket", err)
|
||||
@@ -334,9 +328,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := reqSink.WriteMessage(req); err != nil {
|
||||
if err := cr.RequestSink.WriteMessage(req); err != nil {
|
||||
pLogger.Warnw("error writing to request sink", err,
|
||||
"connID", connId)
|
||||
"connID", cr.ConnectionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -402,6 +396,38 @@ func (s *RTCService) ParseClientInfo(r *http.Request) *livekit.ClientInfo {
|
||||
return ci
|
||||
}
|
||||
|
||||
type connectionResult struct {
|
||||
Room *livekit.Room
|
||||
ConnectionID livekit.ConnectionID
|
||||
RequestSink routing.MessageSink
|
||||
ResponseSource routing.MessageSource
|
||||
InitialResponse *livekit.SignalResponse
|
||||
}
|
||||
|
||||
func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, error) {
|
||||
var cr connectionResult
|
||||
var err error
|
||||
cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)})
|
||||
if err != nil {
|
||||
return cr, err
|
||||
}
|
||||
|
||||
// this needs to be started first *before* using router functions on this node
|
||||
cr.ConnectionID, cr.RequestSink, cr.ResponseSource, err = s.router.StartParticipantSignal(ctx, roomName, pi)
|
||||
if err != nil {
|
||||
return cr, err
|
||||
}
|
||||
|
||||
// wait for the first message before upgrading to websocket. If no one is
|
||||
// responding to our connection attempt, we should terminate the connection
|
||||
// instead of waiting forever on the WebSocket
|
||||
cr.InitialResponse, err = readInitialResponse(cr.ResponseSource, timeout)
|
||||
if err != nil {
|
||||
return cr, err
|
||||
}
|
||||
return cr, nil
|
||||
}
|
||||
|
||||
func readInitialResponse(source routing.MessageSource, timeout time.Duration) (*livekit.SignalResponse, error) {
|
||||
responseTimer := time.NewTimer(timeout)
|
||||
defer responseTimer.Stop()
|
||||
|
||||
@@ -199,6 +199,12 @@ func IncrementParticipantJoin(join uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func IncrementParticipantJoinFail(join uint32) {
|
||||
if join > 0 {
|
||||
promParticipantJoin.WithLabelValues("signal_failed").Add(float64(join))
|
||||
}
|
||||
}
|
||||
|
||||
func IncrementParticipantRtcInit(join uint32) {
|
||||
if join > 0 {
|
||||
participantRTCInit.Add(uint64(join))
|
||||
|
||||
32
pkg/utils/context.go
Normal file
32
pkg/utils/context.go
Normal file
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2023 LiveKit, Inc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import "context"
|
||||
|
||||
var attemptKey = struct{}{}
|
||||
|
||||
func ContextWithAttempt(ctx context.Context, attempt int) context.Context {
|
||||
return context.WithValue(ctx, attemptKey, attempt)
|
||||
}
|
||||
|
||||
func GetAttempt(ctx context.Context) int {
|
||||
if attempt, ok := ctx.Value(attemptKey).(int); ok {
|
||||
return attempt
|
||||
}
|
||||
return 0
|
||||
}
|
||||
Reference in New Issue
Block a user