From 0b0431b765bcd8ba776aacf0b136db8a9b7a68d2 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 17 Sep 2023 10:08:35 -0700 Subject: [PATCH] Per-session TURN credentials (#2080) Switching to using session specific TURN credentials instead of shared credentials per Room. Also eliminates need to load Room from Redis during TURN authentication --- pkg/service/auth.go | 1 + pkg/service/roommanager.go | 77 +++++++++++++++++++++++++------------- pkg/service/turn.go | 55 ++++++++++++++++++++++----- pkg/service/wire.go | 3 +- pkg/service/wire_gen.go | 5 ++- 5 files changed, 104 insertions(+), 37 deletions(-) diff --git a/pkg/service/auth.go b/pkg/service/auth.go index af940e83a..1b5829e30 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -38,6 +38,7 @@ var ( ErrPermissionDenied = errors.New("permissions denied") ErrMissingAuthorization = errors.New("invalid authorization header. Must start with " + bearerPrefix) ErrInvalidAuthorizationToken = errors.New("invalid authorization token") + ErrInvalidAPIKey = errors.New("invalid API key") ) // authentication middleware diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index ec5f21516..2d7bdda81 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -66,6 +66,7 @@ type RoomManager struct { clientConfManager clientconfiguration.ClientConfigurationManager egressLauncher rtc.EgressLauncher versionGenerator utils.TimedVersionGenerator + turnAuthHandler *TURNAuthHandler rooms map[livekit.RoomName]*rtc.Room @@ -81,6 +82,7 @@ func NewLocalRoomManager( clientConfManager clientconfiguration.ClientConfigurationManager, egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, + turnAuthHandler *TURNAuthHandler, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf) if err != nil { @@ -97,6 +99,7 @@ func NewLocalRoomManager( clientConfManager: clientConfManager, egressLauncher: egressLauncher, versionGenerator: versionGenerator, + turnAuthHandler: turnAuthHandler, rooms: make(map[livekit.RoomName]*rtc.Room), @@ -244,6 +247,10 @@ func (r *RoomManager) StartSession( return nil } + // should not error out, error is logged in iceServersForParticipant even if it fails + // since this is used for TURN server credentials, we don't want to fail the request even if there's no TURN for the session + apiKey, _, _ := r.getFirstKeyPair() + participant := room.GetParticipant(pi.Identity) if participant != nil { // When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state, @@ -286,8 +293,9 @@ func (r *RoomManager) StartSession( participant, requestSource, responseSink, - r.iceServersForRoom( - protoRoom, + r.iceServersForParticipant( + apiKey, + participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS, ), pi.ReconnectReason, @@ -411,7 +419,8 @@ func (r *RoomManager) StartSession( opts := rtc.ParticipantOptions{ AutoSubscribe: pi.AutoSubscribe, } - if err = room.Join(participant, requestSource, &opts, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)); err != nil { + iceServers := r.iceServersForParticipant(apiKey, participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS) + if err = room.Join(participant, requestSource, &opts, iceServers); err != nil { pLogger.Errorw("could not join room", err) _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err @@ -684,7 +693,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo } } -func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livekit.ICEServer { +func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer { var iceServers []*livekit.ICEServer rtcConf := r.config.RTC @@ -705,11 +714,19 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek urls = append(urls, fmt.Sprintf("turns:%s:443?transport=tcp", r.config.TURN.Domain)) } if len(urls) > 0 { - iceServers = append(iceServers, &livekit.ICEServer{ - Urls: urls, - Username: ri.Name, - Credential: ri.TurnPassword, - }) + username := r.turnAuthHandler.CreateUsername(apiKey, participant.ID()) + password, err := r.turnAuthHandler.CreatePassword(apiKey, participant.ID()) + if err != nil { + participant.GetLogger().Warnw("could not create turn password", err) + hasSTUN = false + } else { + logger.Infow("created TURN password", "username", username, "password", password) + iceServers = append(iceServers, &livekit.ICEServer{ + Urls: urls, + Username: username, + Credential: password, + }) + } } } @@ -746,23 +763,26 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek } func (r *RoomManager) refreshToken(participant types.LocalParticipant) error { - for key, secret := range r.config.Keys { - grants := participant.ClaimGrants() - token := auth.NewAccessToken(key, secret) - token.SetName(grants.Name). - SetIdentity(string(participant.Identity())). - SetValidFor(tokenDefaultTTL). - SetMetadata(grants.Metadata). - AddGrant(grants.Video) - jwt, err := token.ToJWT() - if err == nil { - err = participant.SendRefreshToken(jwt) - } - if err != nil { - return err - } - break + key, secret, err := r.getFirstKeyPair() + if err != nil { + return err } + + grants := participant.ClaimGrants() + token := auth.NewAccessToken(key, secret) + token.SetName(grants.Name). + SetIdentity(string(participant.Identity())). + SetValidFor(tokenDefaultTTL). + SetMetadata(grants.Metadata). + AddGrant(grants.Video) + jwt, err := token.ToJWT() + if err == nil { + err = participant.SendRefreshToken(jwt) + } + if err != nil { + return err + } + return nil } @@ -786,6 +806,13 @@ func (r *RoomManager) getIceConfig(participant types.LocalParticipant) *livekit. return iceConfigCacheEntry.iceConfig } +func (r *RoomManager) getFirstKeyPair() (string, string, error) { + for key, secret := range r.config.Keys { + return key, secret, nil + } + return "", "", errors.New("no API keys configured") +} + // ------------------------------------ func iceServerForStunServers(servers []string) *livekit.ICEServer { diff --git a/pkg/service/turn.go b/pkg/service/turn.go index 08d971fb4..a4b91e212 100644 --- a/pkg/service/turn.go +++ b/pkg/service/turn.go @@ -15,14 +15,18 @@ package service import ( - "context" + "crypto/sha256" "crypto/tls" + "fmt" "net" "strconv" + "strings" + "github.com/jxskiss/base62" "github.com/pion/turn/v2" "github.com/pkg/errors" + "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/logger/pionlogger" @@ -142,14 +146,47 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone return turn.NewServer(serverConfig) } -func newTurnAuthHandler(roomStore ObjectStore) turn.AuthHandler { - return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { - // room id should be the username, create a hashed room id - rm, _, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username), false) - if err != nil { - return nil, false - } +func getTURNAuthHandlerFunc(handler *TURNAuthHandler) turn.AuthHandler { + return handler.HandleAuth +} - return turn.GenerateAuthKey(username, LivekitRealm, rm.TurnPassword), true +type TURNAuthHandler struct { + keyProvider auth.KeyProvider +} + +func NewTURNAuthHandler(keyProvider auth.KeyProvider) *TURNAuthHandler { + return &TURNAuthHandler{ + keyProvider: keyProvider, } } + +func (h *TURNAuthHandler) CreateUsername(apiKey string, pID livekit.ParticipantID) string { + return base62.EncodeToString([]byte(fmt.Sprintf("%s|%s", apiKey, pID))) +} + +func (h *TURNAuthHandler) CreatePassword(apiKey string, pID livekit.ParticipantID) (string, error) { + secret := h.keyProvider.GetSecret(apiKey) + if secret == "" { + return "", ErrInvalidAPIKey + } + keyInput := fmt.Sprintf("%s|%s", secret, pID) + sum := sha256.Sum256([]byte(keyInput)) + return base62.EncodeToString(sum[:]), nil +} + +func (h *TURNAuthHandler) HandleAuth(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { + decoded, err := base62.DecodeString(username) + if err != nil { + return nil, false + } + parts := strings.Split(string(decoded), "|") + if len(parts) != 2 { + return nil, false + } + password, err := h.CreatePassword(parts[0], livekit.ParticipantID(parts[1])) + if err != nil { + logger.Warnw("could not create TURN password", err, "username", username) + return nil, false + } + return turn.GenerateAuthKey(username, LivekitRealm, password), true +} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 955f2ecef..9d8fbf394 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -73,7 +73,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewDefaultSignalServer, routing.NewSignalClient, NewLocalRoomManager, - newTurnAuthHandler, + NewTURNAuthHandler, + getTURNAuthHandlerFunc, newInProcessTurnServer, utils.NewDefaultTimedVersionGenerator, NewLivekitServer, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index b051fb954..942d693fc 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -87,7 +87,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator) + turnAuthHandler := NewTURNAuthHandler(keyProvider) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler) if err != nil { return nil, err } @@ -95,7 +96,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - authHandler := newTurnAuthHandler(objectStore) + authHandler := getTURNAuthHandlerFunc(turnAuthHandler) server, err := newInProcessTurnServer(conf, authHandler) if err != nil { return nil, err