From 49b75e94a6dd7d91bcdcf5c92ee7e69a543bbf9d Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 25 Oct 2024 18:57:23 +0530 Subject: [PATCH] Consolidate operations on LocalNode. (#3140) --- cmd/server/main.go | 2 +- pkg/agent/testutils/server.go | 4 +- pkg/routing/localrouter.go | 26 ++---- pkg/routing/node.go | 141 +++++++++++++++++++++++++++--- pkg/routing/redisrouter.go | 45 +++------- pkg/service/agentservice.go | 2 +- pkg/service/roomallocator_test.go | 18 ++-- pkg/service/roommanager.go | 14 +-- pkg/service/server.go | 6 +- pkg/service/signal.go | 4 +- pkg/service/wire.go | 2 +- pkg/service/wire_gen.go | 2 +- pkg/telemetry/analyticsservice.go | 2 +- test/integration_helpers.go | 4 +- test/webhook_test.go | 2 +- 15 files changed, 180 insertions(+), 94 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 427702954..6cdfba638 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -272,7 +272,7 @@ func startServer(c *cli.Context) error { return err } - if err := prometheus.Init(currentNode.Id, currentNode.Type); err != nil { + if err := prometheus.Init(string(currentNode.NodeID()), currentNode.NodeType()); err != nil { return err } diff --git a/pkg/agent/testutils/server.go b/pkg/agent/testutils/server.go index 1c4ce48f3..04de89f63 100644 --- a/pkg/agent/testutils/server.go +++ b/pkg/agent/testutils/server.go @@ -15,6 +15,7 @@ import ( "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -35,9 +36,10 @@ type TestServer struct { } func NewTestServer(bus psrpc.MessageBus) *TestServer { + localNode, _ := routing.NewLocalNode(nil) return NewTestServerWithService(must.Get(service.NewAgentService( &config.Config{Region: "test"}, - &livekit.Node{Id: guid.New("N_")}, + localNode, bus, auth.NewSimpleKeyProvider("test", "verysecretsecret"), ))) diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index ff100eeb9..8baba7510 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -23,7 +23,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" ) var _ Router = (*LocalRouter)(nil) @@ -56,10 +55,7 @@ func NewLocalRouter( } func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) { - r.lock.Lock() - defer r.lock.Unlock() - node := utils.CloneProto((*livekit.Node)(r.currentNode)) - return node, nil + return r.currentNode.Clone(), nil } func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error { @@ -83,20 +79,20 @@ func (r *LocalRouter) RemoveDeadNodes() error { } func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error) { - if nodeID == livekit.NodeID(r.currentNode.Id) { - return r.currentNode, nil + if nodeID == r.currentNode.NodeID() { + return r.currentNode.Clone(), nil } return nil, ErrNotFound } func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { return []*livekit.Node{ - r.currentNode, + r.currentNode.Clone(), }, nil } func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error) { - return r.CreateRoomWithNodeID(ctx, req, livekit.NodeID(r.currentNode.Id)) + return r.CreateRoomWithNodeID(ctx, req, r.currentNode.NodeID()) } func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error) { @@ -104,7 +100,7 @@ func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.Cre } func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) { - return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(r.currentNode.Id)) + return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, r.currentNode.NodeID()) } func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error) { @@ -136,15 +132,13 @@ func (r *LocalRouter) Start() error { } func (r *LocalRouter) Drain() { - r.lock.Lock() - defer r.lock.Unlock() - r.currentNode.State = livekit.NodeState_SHUTTING_DOWN + r.currentNode.SetState(livekit.NodeState_SHUTTING_DOWN) } func (r *LocalRouter) Stop() {} func (r *LocalRouter) GetRegion() string { - return r.currentNode.Region + return r.currentNode.Region() } func (r *LocalRouter) statsWorker() { @@ -154,9 +148,7 @@ func (r *LocalRouter) statsWorker() { } // update every 10 seconds <-time.After(statsUpdateInterval) - r.lock.Lock() - r.currentNode.Stats.UpdatedAt = time.Now().Unix() - r.lock.Unlock() + r.currentNode.UpdateNodeStats() } } diff --git a/pkg/routing/node.go b/pkg/routing/node.go index ff8f8d3ab..18ba7611b 100644 --- a/pkg/routing/node.go +++ b/pkg/routing/node.go @@ -16,33 +16,146 @@ package routing import ( "runtime" + "sync" "time" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) -type LocalNode *livekit.Node +type LocalNode interface { + Clone() *livekit.Node + SetNodeID(nodeID livekit.NodeID) + NodeID() livekit.NodeID + NodeType() livekit.NodeType + NodeIP() string + Region() string + SetState(state livekit.NodeState) + SetStats(stats *livekit.NodeStats) + UpdateNodeStats() bool + SecondsSinceNodeStatsUpdate() float64 +} -func NewLocalNode(conf *config.Config) (LocalNode, error) { +type LocalNodeImpl struct { + lock sync.RWMutex + node *livekit.Node + + // previous stats for computing averages + prevStats *livekit.NodeStats +} + +func NewLocalNode(conf *config.Config) (*LocalNodeImpl, error) { nodeID := guid.New(utils.NodePrefix) - if conf.RTC.NodeIP == "" { + if conf != nil && conf.RTC.NodeIP == "" { return nil, ErrIPNotSet } - node := &livekit.Node{ - Id: nodeID, - Ip: conf.RTC.NodeIP, - NumCpus: uint32(runtime.NumCPU()), - Region: conf.Region, - State: livekit.NodeState_SERVING, - Stats: &livekit.NodeStats{ - StartedAt: time.Now().Unix(), - UpdatedAt: time.Now().Unix(), + l := &LocalNodeImpl{ + node: &livekit.Node{ + Id: nodeID, + NumCpus: uint32(runtime.NumCPU()), + State: livekit.NodeState_SERVING, + Stats: &livekit.NodeStats{ + StartedAt: time.Now().Unix(), + UpdatedAt: time.Now().Unix(), + }, }, } - - return node, nil + if conf != nil { + l.node.Ip = conf.RTC.NodeIP + l.node.Region = conf.Region + } + return l, nil +} + +func NewLocalNodeFromNodeProto(node *livekit.Node) (*LocalNodeImpl, error) { + return &LocalNodeImpl{node: utils.CloneProto(node)}, nil +} + +func (l *LocalNodeImpl) Clone() *livekit.Node { + l.lock.RLock() + defer l.lock.RUnlock() + + return utils.CloneProto(l.node) +} + +// for testing only +func (l *LocalNodeImpl) SetNodeID(nodeID livekit.NodeID) { + l.lock.Lock() + defer l.lock.Unlock() + + l.node.Id = string(nodeID) +} + +func (l *LocalNodeImpl) NodeID() livekit.NodeID { + l.lock.RLock() + defer l.lock.RUnlock() + + return livekit.NodeID(l.node.Id) +} + +func (l *LocalNodeImpl) NodeType() livekit.NodeType { + l.lock.RLock() + defer l.lock.RUnlock() + + return l.node.Type +} + +func (l *LocalNodeImpl) NodeIP() string { + l.lock.RLock() + defer l.lock.RUnlock() + + return l.node.Ip +} + +func (l *LocalNodeImpl) Region() string { + l.lock.RLock() + defer l.lock.RUnlock() + + return l.node.Region +} + +func (l *LocalNodeImpl) SetState(state livekit.NodeState) { + l.lock.Lock() + defer l.lock.Unlock() + + l.node.State = state +} + +// for testing only +func (l *LocalNodeImpl) SetStats(stats *livekit.NodeStats) { + l.lock.Lock() + defer l.lock.Unlock() + + l.node.Stats = utils.CloneProto(stats) +} + +func (l *LocalNodeImpl) UpdateNodeStats() bool { + l.lock.Lock() + defer l.lock.Unlock() + + if l.prevStats == nil { + l.prevStats = l.node.Stats + } + updated, computedAvg, err := prometheus.GetUpdatedNodeStats(l.node.Stats, l.prevStats) + if err != nil { + logger.Errorw("could not update node stats", err) + return false + } + l.node.Stats = updated + if computedAvg { + l.prevStats = updated + } + return true +} + +func (l *LocalNodeImpl) SecondsSinceNodeStatsUpdate() float64 { + l.lock.RLock() + defer l.lock.RUnlock() + + return time.Since(time.Unix(0, l.node.Stats.UpdatedAt)).Seconds() } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 85e56dbfd..6dbe28e55 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "runtime/pprof" - "sync" "time" "github.com/pkg/errors" @@ -31,14 +30,13 @@ import ( "github.com/livekit/protocol/rpc" "github.com/livekit/livekit-server/pkg/routing/selector" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) const ( // expire participant mappings after a day participantMappingTTL = 24 * time.Hour statsUpdateInterval = 2 * time.Second - statsMaxDelaySeconds = 30 + statsMaxDelaySeconds = float64(30) // hash of node_id => Node proto NodesKey = "nodes" @@ -59,9 +57,6 @@ type RedisRouter struct { kps rpc.KeepalivePubSub ctx context.Context isStarted atomic.Bool - nodeMu sync.RWMutex - // previous stats for computing averages - prevStats *livekit.NodeStats cancel func() } @@ -77,13 +72,11 @@ func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.Keepalive } func (r *RedisRouter) RegisterNode() error { - r.nodeMu.RLock() - data, err := proto.Marshal((*livekit.Node)(r.currentNode)) - r.nodeMu.RUnlock() + data, err := proto.Marshal(r.currentNode.Clone()) if err != nil { return err } - if err := r.rc.HSet(r.ctx, NodesKey, r.currentNode.Id, data).Err(); err != nil { + if err := r.rc.HSet(r.ctx, NodesKey, string(r.currentNode.NodeID()), data).Err(); err != nil { return errors.Wrap(err, "could not register node") } return nil @@ -91,7 +84,7 @@ func (r *RedisRouter) RegisterNode() error { func (r *RedisRouter) UnregisterNode() error { // could be called after Stop(), so we'd want to use an unrelated context - return r.rc.HDel(context.Background(), NodesKey, r.currentNode.Id).Err() + return r.rc.HDel(context.Background(), NodesKey, string(r.currentNode.NodeID())).Err() } func (r *RedisRouter) RemoveDeadNodes() error { @@ -195,11 +188,9 @@ func (r *RedisRouter) Start() error { } func (r *RedisRouter) Drain() { - r.nodeMu.Lock() - r.currentNode.State = livekit.NodeState_SHUTTING_DOWN - r.nodeMu.Unlock() + r.currentNode.SetState(livekit.NodeState_SHUTTING_DOWN) if err := r.RegisterNode(); err != nil { - logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.Id) + logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.NodeID()) } } @@ -219,13 +210,9 @@ func (r *RedisRouter) statsWorker() { // update periodically select { case <-time.After(statsUpdateInterval): - r.kps.PublishPing(r.ctx, livekit.NodeID(r.currentNode.Id), &rpc.KeepalivePing{Timestamp: time.Now().Unix()}) + r.kps.PublishPing(r.ctx, r.currentNode.NodeID(), &rpc.KeepalivePing{Timestamp: time.Now().Unix()}) - r.nodeMu.RLock() - stats := r.currentNode.Stats - r.nodeMu.RUnlock() - - delaySeconds := time.Now().Unix() - stats.UpdatedAt + delaySeconds := r.currentNode.SecondsSinceNodeStatsUpdate() if delaySeconds > statsMaxDelaySeconds { if !goroutineDumped { goroutineDumped = true @@ -245,7 +232,7 @@ func (r *RedisRouter) statsWorker() { } func (r *RedisRouter) keepaliveWorker(startedChan chan error) { - pings, err := r.kps.SubscribePing(r.ctx, livekit.NodeID(r.currentNode.Id)) + pings, err := r.kps.SubscribePing(r.ctx, r.currentNode.NodeID()) if err != nil { startedChan <- err return @@ -258,21 +245,9 @@ func (r *RedisRouter) keepaliveWorker(startedChan chan error) { continue } - r.nodeMu.Lock() - if r.prevStats == nil { - r.prevStats = r.currentNode.Stats - } - updated, computedAvg, err := prometheus.GetUpdatedNodeStats(r.currentNode.Stats, r.prevStats) - if err != nil { - logger.Errorw("could not update node stats", err) - r.nodeMu.Unlock() + if !r.currentNode.UpdateNodeStats() { continue } - r.currentNode.Stats = updated - if computedAvg { - r.prevStats = updated - } - r.nodeMu.Unlock() // TODO: check stats against config.Limit values if err := r.RegisterNode(); err != nil { diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 92a36a9b3..ba25ff96d 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -165,7 +165,7 @@ func NewAgentService(conf *config.Config, Protocol: types.CurrentProtocol, AgentProtocol: agent.CurrentProtocol, Region: conf.Region, - NodeId: currentNode.Id, + NodeId: string(currentNode.NodeID()), } agentServer, err := rpc.NewAgentInternalServer(s, bus) diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index 0ce252f82..3d3dd29db 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -37,7 +37,7 @@ func TestCreateRoom(t *testing.T) { node, err := routing.NewLocalNode(conf) require.NoError(t, err) - ra, conf := newTestRoomAllocator(t, conf, node) + ra, conf := newTestRoomAllocator(t, conf, node.Clone()) room, _, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}, true) require.NoError(t, err) @@ -55,10 +55,12 @@ func SelectRoomNode(t *testing.T) { node, err := routing.NewLocalNode(conf) require.NoError(t, err) - node.Stats.NumTracksIn = 100 - node.Stats.NumTracksOut = 100 + node.SetStats(&livekit.NodeStats{ + NumTracksIn: 100, + NumTracksOut: 100, + }) - ra, _ := newTestRoomAllocator(t, conf, node) + ra, _ := newTestRoomAllocator(t, conf, node.Clone()) err = ra.SelectRoomNode(context.Background(), "low-limit-room", "") require.ErrorIs(t, err, routing.ErrNodeLimitReached) @@ -71,10 +73,12 @@ func SelectRoomNode(t *testing.T) { node, err := routing.NewLocalNode(conf) require.NoError(t, err) - node.Stats.BytesInPerSec = 1000 - node.Stats.BytesOutPerSec = 1000 + node.SetStats(&livekit.NodeStats{ + BytesInPerSec: 1000, + BytesOutPerSec: 1000, + }) - ra, _ := newTestRoomAllocator(t, conf, node) + ra, _ := newTestRoomAllocator(t, conf, node.Clone()) err = ra.SelectRoomNode(context.Background(), "low-limit-room", "") require.ErrorIs(t, err, routing.ErrNodeLimitReached) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 19b2b5737..ca102f29d 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -142,7 +142,7 @@ func NewLocalRoomManager( Protocol: types.CurrentProtocol, AgentProtocol: agent.CurrentProtocol, Region: conf.Region, - NodeId: currentNode.Id, + NodeId: string(currentNode.NodeID()), }, } @@ -150,7 +150,7 @@ func NewLocalRoomManager( if err != nil { return nil, err } - if err := r.roomManagerServer.RegisterAllNodeTopics(livekit.NodeID(currentNode.Id)); err != nil { + if err := r.roomManagerServer.RegisterAllNodeTopics(currentNode.NodeID()); err != nil { return nil, err } @@ -315,7 +315,7 @@ func (r *RoomManager) StartSession( // It is possible that the client did not get that send request. So, send it again. logger.Infow("cannot restart a closed participant", "room", room.Name(), - "nodeID", r.currentNode.Id, + "nodeID", r.currentNode.NodeID(), "participant", pi.Identity, "reason", pi.ReconnectReason, ) @@ -342,7 +342,7 @@ func (r *RoomManager) StartSession( } participant.GetLogger().Infow("resuming RTC session", - "nodeID", r.currentNode.Id, + "nodeID", r.currentNode.NodeID(), "reason", pi.ReconnectReason, "numParticipants", room.GetParticipantCount(), ) @@ -362,7 +362,7 @@ func (r *RoomManager) StartSession( participant.GetLogger().Warnw("could not resume participant", err) return err } - r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason) + r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), r.currentNode.NodeID(), pi.ReconnectReason) go r.rtcSessionWorker(room, participant, requestSource) return nil } @@ -403,7 +403,7 @@ func (r *RoomManager) StartSession( ) pLogger.Infow("starting RTC session", "room", room.Name(), - "nodeID", r.currentNode.Id, + "nodeID", r.currentNode.NodeID(), "clientInfo", logger.Proto(pi.Client), "reconnect", pi.Reconnect, "reconnectReason", pi.ReconnectReason, @@ -532,7 +532,7 @@ func (r *RoomManager) StartSession( // update room store with new numParticipants persistRoomForParticipantCount(room.ToProto()) - clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} + clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region(), Node: string(r.currentNode.NodeID())} r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) participant.OnClose(func(p types.LocalParticipant) { killParticipantServer() diff --git a/pkg/service/server.go b/pkg/service/server.go index 30222079e..ed7fc95e4 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -172,7 +172,7 @@ func NewLivekitServer(conf *config.Config, } func (s *LivekitServer) Node() *livekit.Node { - return s.currentNode + return s.currentNode.Clone() } func (s *LivekitServer) HTTPPort() int { @@ -232,8 +232,8 @@ func (s *LivekitServer) Start() error { values := []interface{}{ "portHttp", s.config.Port, - "nodeID", s.currentNode.Id, - "nodeIP", s.currentNode.Ip, + "nodeID", s.currentNode.NodeID(), + "nodeIP", s.currentNode.NodeIP(), "version", version.Version, } if s.config.BindAddresses != nil { diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 08d5d0a78..742ce51d1 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -78,7 +78,7 @@ func NewDefaultSignalServer( router routing.Router, roomManager *RoomManager, ) (r *SignalServer, err error) { - return NewSignalServer(livekit.NodeID(currentNode.Id), currentNode.Region, bus, config, &defaultSessionHandler{currentNode, router, roomManager}) + return NewSignalServer(currentNode.NodeID(), currentNode.Region(), bus, config, &defaultSessionHandler{currentNode, router, roomManager}) } type defaultSessionHandler struct { @@ -105,7 +105,7 @@ func (s *defaultSessionHandler) HandleSession( return err } - if rtcNode.Id != s.currentNode.Id { + if livekit.NodeID(rtcNode.Id) != s.currentNode.NodeID() { err = routing.ErrIncorrectRTCNode logger.Errorw("called participant on incorrect node", err, "rtcNode", rtcNode, diff --git a/pkg/service/wire.go b/pkg/service/wire.go index dcabb5da8..b0cbb12fe 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -123,7 +123,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi } func getNodeID(currentNode routing.LocalNode) livekit.NodeID { - return livekit.NodeID(currentNode.Id) + return currentNode.NodeID() } func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 27c5271d2..55b8db9ea 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -183,7 +183,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi // wire.go: func getNodeID(currentNode routing.LocalNode) livekit.NodeID { - return livekit.NodeID(currentNode.Id) + return currentNode.NodeID() } func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { diff --git a/pkg/telemetry/analyticsservice.go b/pkg/telemetry/analyticsservice.go index f6f44e5e7..365e7ed05 100644 --- a/pkg/telemetry/analyticsservice.go +++ b/pkg/telemetry/analyticsservice.go @@ -49,7 +49,7 @@ type analyticsService struct { func NewAnalyticsService(_ *config.Config, currentNode routing.LocalNode) AnalyticsService { return &analyticsService{ analyticsKey: "", // TODO: conf.AnalyticsKey - nodeID: currentNode.Id, + nodeID: string(currentNode.NodeID()), } } diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 088a31e4b..66bb351e5 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -161,7 +161,7 @@ func createSingleNodeServer(configUpdater func(*config.Config)) *service.Livekit if err != nil { panic(fmt.Sprintf("could not create local node: %v", err)) } - currentNode.Id = guid.New(nodeID1) + currentNode.SetNodeID(livekit.NodeID(guid.New(nodeID1))) s, err := service.InitializeServer(conf, currentNode) if err != nil { @@ -188,7 +188,7 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { if err != nil { panic(err) } - currentNode.Id = nodeID + currentNode.SetNodeID(livekit.NodeID(nodeID)) // redis routing and store s, err := service.InitializeServer(conf, currentNode) diff --git a/test/webhook_test.go b/test/webhook_test.go index e572f7c6b..e0469a4e0 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -137,7 +137,7 @@ func setupServerWithWebhook() (server *service.LivekitServer, testServer *webhoo if err != nil { return } - currentNode.Id = guid.New(nodeID1) + currentNode.SetNodeID(livekit.NodeID(guid.New(nodeID1))) server, err = service.InitializeServer(conf, currentNode) if err != nil {