Per-session TURN credentials (#2080)

Switching to using session specific TURN credentials instead of shared
credentials per Room. Also eliminates need to load Room from Redis
during TURN authentication
This commit is contained in:
David Zhao
2023-09-17 10:08:35 -07:00
committed by GitHub
parent 019ad88b08
commit 0b0431b765
5 changed files with 104 additions and 37 deletions

View File

@@ -38,6 +38,7 @@ var (
ErrPermissionDenied = errors.New("permissions denied")
ErrMissingAuthorization = errors.New("invalid authorization header. Must start with " + bearerPrefix)
ErrInvalidAuthorizationToken = errors.New("invalid authorization token")
ErrInvalidAPIKey = errors.New("invalid API key")
)
// authentication middleware

View File

@@ -66,6 +66,7 @@ type RoomManager struct {
clientConfManager clientconfiguration.ClientConfigurationManager
egressLauncher rtc.EgressLauncher
versionGenerator utils.TimedVersionGenerator
turnAuthHandler *TURNAuthHandler
rooms map[livekit.RoomName]*rtc.Room
@@ -81,6 +82,7 @@ func NewLocalRoomManager(
clientConfManager clientconfiguration.ClientConfigurationManager,
egressLauncher rtc.EgressLauncher,
versionGenerator utils.TimedVersionGenerator,
turnAuthHandler *TURNAuthHandler,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf)
if err != nil {
@@ -97,6 +99,7 @@ func NewLocalRoomManager(
clientConfManager: clientConfManager,
egressLauncher: egressLauncher,
versionGenerator: versionGenerator,
turnAuthHandler: turnAuthHandler,
rooms: make(map[livekit.RoomName]*rtc.Room),
@@ -244,6 +247,10 @@ func (r *RoomManager) StartSession(
return nil
}
// should not error out, error is logged in iceServersForParticipant even if it fails
// since this is used for TURN server credentials, we don't want to fail the request even if there's no TURN for the session
apiKey, _, _ := r.getFirstKeyPair()
participant := room.GetParticipant(pi.Identity)
if participant != nil {
// When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state,
@@ -286,8 +293,9 @@ func (r *RoomManager) StartSession(
participant,
requestSource,
responseSink,
r.iceServersForRoom(
protoRoom,
r.iceServersForParticipant(
apiKey,
participant,
iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS,
),
pi.ReconnectReason,
@@ -411,7 +419,8 @@ func (r *RoomManager) StartSession(
opts := rtc.ParticipantOptions{
AutoSubscribe: pi.AutoSubscribe,
}
if err = room.Join(participant, requestSource, &opts, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)); err != nil {
iceServers := r.iceServersForParticipant(apiKey, participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)
if err = room.Join(participant, requestSource, &opts, iceServers); err != nil {
pLogger.Errorw("could not join room", err)
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
@@ -684,7 +693,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo
}
}
func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livekit.ICEServer {
func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer {
var iceServers []*livekit.ICEServer
rtcConf := r.config.RTC
@@ -705,11 +714,19 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek
urls = append(urls, fmt.Sprintf("turns:%s:443?transport=tcp", r.config.TURN.Domain))
}
if len(urls) > 0 {
iceServers = append(iceServers, &livekit.ICEServer{
Urls: urls,
Username: ri.Name,
Credential: ri.TurnPassword,
})
username := r.turnAuthHandler.CreateUsername(apiKey, participant.ID())
password, err := r.turnAuthHandler.CreatePassword(apiKey, participant.ID())
if err != nil {
participant.GetLogger().Warnw("could not create turn password", err)
hasSTUN = false
} else {
logger.Infow("created TURN password", "username", username, "password", password)
iceServers = append(iceServers, &livekit.ICEServer{
Urls: urls,
Username: username,
Credential: password,
})
}
}
}
@@ -746,23 +763,26 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek
}
func (r *RoomManager) refreshToken(participant types.LocalParticipant) error {
for key, secret := range r.config.Keys {
grants := participant.ClaimGrants()
token := auth.NewAccessToken(key, secret)
token.SetName(grants.Name).
SetIdentity(string(participant.Identity())).
SetValidFor(tokenDefaultTTL).
SetMetadata(grants.Metadata).
AddGrant(grants.Video)
jwt, err := token.ToJWT()
if err == nil {
err = participant.SendRefreshToken(jwt)
}
if err != nil {
return err
}
break
key, secret, err := r.getFirstKeyPair()
if err != nil {
return err
}
grants := participant.ClaimGrants()
token := auth.NewAccessToken(key, secret)
token.SetName(grants.Name).
SetIdentity(string(participant.Identity())).
SetValidFor(tokenDefaultTTL).
SetMetadata(grants.Metadata).
AddGrant(grants.Video)
jwt, err := token.ToJWT()
if err == nil {
err = participant.SendRefreshToken(jwt)
}
if err != nil {
return err
}
return nil
}
@@ -786,6 +806,13 @@ func (r *RoomManager) getIceConfig(participant types.LocalParticipant) *livekit.
return iceConfigCacheEntry.iceConfig
}
func (r *RoomManager) getFirstKeyPair() (string, string, error) {
for key, secret := range r.config.Keys {
return key, secret, nil
}
return "", "", errors.New("no API keys configured")
}
// ------------------------------------
func iceServerForStunServers(servers []string) *livekit.ICEServer {

View File

@@ -15,14 +15,18 @@
package service
import (
"context"
"crypto/sha256"
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"github.com/jxskiss/base62"
"github.com/pion/turn/v2"
"github.com/pkg/errors"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/logger/pionlogger"
@@ -142,14 +146,47 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone
return turn.NewServer(serverConfig)
}
func newTurnAuthHandler(roomStore ObjectStore) turn.AuthHandler {
return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
// room id should be the username, create a hashed room id
rm, _, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username), false)
if err != nil {
return nil, false
}
func getTURNAuthHandlerFunc(handler *TURNAuthHandler) turn.AuthHandler {
return handler.HandleAuth
}
return turn.GenerateAuthKey(username, LivekitRealm, rm.TurnPassword), true
type TURNAuthHandler struct {
keyProvider auth.KeyProvider
}
func NewTURNAuthHandler(keyProvider auth.KeyProvider) *TURNAuthHandler {
return &TURNAuthHandler{
keyProvider: keyProvider,
}
}
func (h *TURNAuthHandler) CreateUsername(apiKey string, pID livekit.ParticipantID) string {
return base62.EncodeToString([]byte(fmt.Sprintf("%s|%s", apiKey, pID)))
}
func (h *TURNAuthHandler) CreatePassword(apiKey string, pID livekit.ParticipantID) (string, error) {
secret := h.keyProvider.GetSecret(apiKey)
if secret == "" {
return "", ErrInvalidAPIKey
}
keyInput := fmt.Sprintf("%s|%s", secret, pID)
sum := sha256.Sum256([]byte(keyInput))
return base62.EncodeToString(sum[:]), nil
}
func (h *TURNAuthHandler) HandleAuth(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
decoded, err := base62.DecodeString(username)
if err != nil {
return nil, false
}
parts := strings.Split(string(decoded), "|")
if len(parts) != 2 {
return nil, false
}
password, err := h.CreatePassword(parts[0], livekit.ParticipantID(parts[1]))
if err != nil {
logger.Warnw("could not create TURN password", err, "username", username)
return nil, false
}
return turn.GenerateAuthKey(username, LivekitRealm, password), true
}

View File

@@ -73,7 +73,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewDefaultSignalServer,
routing.NewSignalClient,
NewLocalRoomManager,
newTurnAuthHandler,
NewTURNAuthHandler,
getTURNAuthHandlerFunc,
newInProcessTurnServer,
utils.NewDefaultTimedVersionGenerator,
NewLivekitServer,

View File

@@ -87,7 +87,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
clientConfigurationManager := createClientConfiguration()
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator)
turnAuthHandler := NewTURNAuthHandler(keyProvider)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler)
if err != nil {
return nil, err
}
@@ -95,7 +96,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
authHandler := newTurnAuthHandler(objectStore)
authHandler := getTURNAuthHandlerFunc(turnAuthHandler)
server, err := newInProcessTurnServer(conf, authHandler)
if err != nil {
return nil, err