From 1436a1f186774304579fd4fe98136e1150d64419 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Wed, 18 Sep 2024 00:45:55 -0700 Subject: [PATCH] driveby cleanup (#3017) * driveby cleanup * test --- pkg/rtc/participant_signal.go | 9 +-------- pkg/service/roommanager.go | 1 + pkg/service/roomservice.go | 4 ---- pkg/service/roomservice_test.go | 1 - pkg/service/rtcservice.go | 4 ---- pkg/service/wire_gen.go | 12 ++++++------ test/client/client.go | 8 +------- 7 files changed, 9 insertions(+), 30 deletions(-) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 6c4e61e2a..ed0ce2cef 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -20,7 +20,6 @@ import ( "time" "github.com/pion/webrtc/v3" - "go.uber.org/atomic" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -270,13 +269,7 @@ func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error { } func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { - var icQueue *atomic.Pointer[webrtc.ICECandidate] - if target == livekit.SignalTarget_PUBLISHER { - icQueue = &p.icQueue[0] - } else { - icQueue = &p.icQueue[1] - } - prevIC := icQueue.Swap(ic) + prevIC := p.icQueue[target].Swap(ic) if prevIC == nil { return nil } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index c58a6b1f4..c62886028 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -603,6 +603,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C agentDispatchServer := must.Get(rpc.NewTypedAgentDispatchInternalServer(r, r.bus)) killDispServer := r.agentDispatchServers.Replace(roomTopic, agentDispatchServer) if err := agentDispatchServer.RegisterAllRoomTopics(roomTopic); err != nil { + killRoomServer() killDispServer() r.lock.Unlock() return nil, err diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 1d7718c8d..76875b75d 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -23,7 +23,6 @@ import ( "github.com/twitchtv/twirp" "google.golang.org/protobuf/proto" - "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" @@ -39,7 +38,6 @@ type RoomService struct { router routing.MessageRouter roomAllocator RoomAllocator roomStore ServiceStore - agentClient agent.Client egressLauncher rtc.EgressLauncher topicFormatter rpc.TopicFormatter roomClient rpc.TypedRoomClient @@ -52,7 +50,6 @@ func NewRoomService( router routing.MessageRouter, roomAllocator RoomAllocator, serviceStore ServiceStore, - agentClient agent.Client, egressLauncher rtc.EgressLauncher, topicFormatter rpc.TopicFormatter, roomClient rpc.TypedRoomClient, @@ -64,7 +61,6 @@ func NewRoomService( router: router, roomAllocator: roomAllocator, roomStore: serviceStore, - agentClient: agentClient, egressLauncher: egressLauncher, topicFormatter: topicFormatter, roomClient: roomClient, diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index 8c9d19fc3..8969ee84c 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -115,7 +115,6 @@ func newTestRoomService(limitConf config.LimitConfig) *TestRoomService { allocator, store, nil, - nil, rpc.NewTopicFormatter(), &rpcfakes.FakeTypedRoomClient{}, &rpcfakes.FakeTypedParticipantClient{}, diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ef14da079..01dc056f9 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -30,7 +30,6 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/maps" - "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/selector" @@ -52,7 +51,6 @@ type RTCService struct { isDev bool limits config.LimitConfig parser *uaparser.Parser - agentClient agent.Client telemetry telemetry.TelemetryService mu sync.Mutex @@ -65,7 +63,6 @@ func NewRTCService( store ServiceStore, router routing.MessageRouter, currentNode routing.LocalNode, - agentClient agent.Client, telemetry telemetry.TelemetryService, ) *RTCService { s := &RTCService{ @@ -78,7 +75,6 @@ func NewRTCService( isDev: conf.Development, limits: conf.Limit, parser: uaparser.NewFromSaved(), - agentClient: agentClient, telemetry: telemetry, connections: map[*websocket.Conn]struct{}{}, } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 07b48e683..27c5271d2 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -66,10 +66,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - client, err := agent.NewAgentClient(messageBus) - if err != nil { - return nil, err - } egressClient, err := rpc.NewEgressClient(clientParams) if err != nil { return nil, err @@ -101,7 +97,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } @@ -123,12 +119,16 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) - rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, client, telemetryService) + rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) agentService, err := NewAgentService(conf, currentNode, messageBus, keyProvider) if err != nil { return nil, err } clientConfigurationManager := createClientConfiguration() + client, err := agent.NewAgentClient(messageBus) + if err != nil { + return nil, err + } agentStore := getAgentStore(objectStore) timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) diff --git a/test/client/client.go b/test/client/client.go index 117932737..9d0105b5d 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -555,13 +555,7 @@ func (c *RTCClient) sendRequest(msg *livekit.SignalRequest) error { } func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { - var icQueue *atomic.Pointer[webrtc.ICECandidate] - if target == livekit.SignalTarget_PUBLISHER { - icQueue = &c.icQueue[0] - } else { - icQueue = &c.icQueue[1] - } - prevIC := icQueue.Swap(ic) + prevIC := c.icQueue[target].Swap(ic) if prevIC == nil { return nil }