Merge remote-tracking branch 'origin/master' into agents-cleanup

This commit is contained in:
Paul Wells
2024-09-18 00:46:58 -07:00
9 changed files with 16 additions and 35 deletions
+4 -4
View File
@@ -774,7 +774,7 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
p.pubLogger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER)
p.pubLogger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER, "offer", offer)
shouldPend := false
if p.MigrateState() == types.MigrateStateInit {
shouldPend = true
@@ -788,7 +788,7 @@ func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
p.subLogger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER)
p.subLogger.Debugw("received answer", "transport", livekit.SignalTarget_SUBSCRIBER, "answer", answer)
/* from server received join request to client answer
* 1. server send join response & offer
@@ -806,8 +806,8 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) er
return nil
}
p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER)
answer = p.configurePublisherAnswer(answer)
p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer)
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
@@ -1505,7 +1505,7 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
// when the server has an offer for participant
func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) error {
p.subLogger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER)
p.subLogger.Debugw("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER, "offer", offer)
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),
+1 -8
View File
@@ -20,7 +20,6 @@ import (
"time"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -270,13 +269,7 @@ func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error {
}
func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error {
var icQueue *atomic.Pointer[webrtc.ICECandidate]
if target == livekit.SignalTarget_PUBLISHER {
icQueue = &p.icQueue[0]
} else {
icQueue = &p.icQueue[1]
}
prevIC := icQueue.Swap(ic)
prevIC := p.icQueue[target].Swap(ic)
if prevIC == nil {
return nil
}
+1
View File
@@ -603,6 +603,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C
agentDispatchServer := must.Get(rpc.NewTypedAgentDispatchInternalServer(r, r.bus))
killDispServer := r.agentDispatchServers.Replace(roomTopic, agentDispatchServer)
if err := agentDispatchServer.RegisterAllRoomTopics(roomTopic); err != nil {
killRoomServer()
killDispServer()
r.lock.Unlock()
return nil, err
-4
View File
@@ -23,7 +23,6 @@ import (
"github.com/twitchtv/twirp"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
@@ -39,7 +38,6 @@ type RoomService struct {
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
agentClient agent.Client
egressLauncher rtc.EgressLauncher
topicFormatter rpc.TopicFormatter
roomClient rpc.TypedRoomClient
@@ -52,7 +50,6 @@ func NewRoomService(
router routing.MessageRouter,
roomAllocator RoomAllocator,
serviceStore ServiceStore,
agentClient agent.Client,
egressLauncher rtc.EgressLauncher,
topicFormatter rpc.TopicFormatter,
roomClient rpc.TypedRoomClient,
@@ -64,7 +61,6 @@ func NewRoomService(
router: router,
roomAllocator: roomAllocator,
roomStore: serviceStore,
agentClient: agentClient,
egressLauncher: egressLauncher,
topicFormatter: topicFormatter,
roomClient: roomClient,
-1
View File
@@ -115,7 +115,6 @@ func newTestRoomService(limitConf config.LimitConfig) *TestRoomService {
allocator,
store,
nil,
nil,
rpc.NewTopicFormatter(),
&rpcfakes.FakeTypedRoomClient{},
&rpcfakes.FakeTypedParticipantClient{},
-4
View File
@@ -30,7 +30,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/maps"
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/selector"
@@ -52,7 +51,6 @@ type RTCService struct {
isDev bool
limits config.LimitConfig
parser *uaparser.Parser
agentClient agent.Client
telemetry telemetry.TelemetryService
mu sync.Mutex
@@ -65,7 +63,6 @@ func NewRTCService(
store ServiceStore,
router routing.MessageRouter,
currentNode routing.LocalNode,
agentClient agent.Client,
telemetry telemetry.TelemetryService,
) *RTCService {
s := &RTCService{
@@ -78,7 +75,6 @@ func NewRTCService(
isDev: conf.Development,
limits: conf.Limit,
parser: uaparser.NewFromSaved(),
agentClient: agentClient,
telemetry: telemetry,
connections: map[*websocket.Conn]struct{}{},
}
+6 -6
View File
@@ -66,10 +66,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
client, err := agent.NewAgentClient(messageBus)
if err != nil {
return nil, err
}
egressClient, err := rpc.NewEgressClient(clientParams)
if err != nil {
return nil, err
@@ -101,7 +97,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, client, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
@@ -123,12 +119,16 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, client, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
agentService, err := NewAgentService(conf, currentNode, messageBus, keyProvider)
if err != nil {
return nil, err
}
clientConfigurationManager := createClientConfiguration()
client, err := agent.NewAgentClient(messageBus)
if err != nil {
return nil, err
}
agentStore := getAgentStore(objectStore)
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
turnAuthHandler := NewTURNAuthHandler(keyProvider)
+3 -1
View File
@@ -1253,7 +1253,9 @@ func (d *DownTrack) GetState() DownTrackState {
}
func (d *DownTrack) SeedState(state DownTrackState) {
d.params.Logger.Debugw("seeding down track state", "state", state)
if state.RTPStats != nil || state.ForwarderState != nil {
d.params.Logger.Debugw("seeding down track state", "state", state)
}
if state.RTPStats != nil {
d.rtpStats.Seed(state.RTPStats)
d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
+1 -7
View File
@@ -555,13 +555,7 @@ func (c *RTCClient) sendRequest(msg *livekit.SignalRequest) error {
}
func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error {
var icQueue *atomic.Pointer[webrtc.ICECandidate]
if target == livekit.SignalTarget_PUBLISHER {
icQueue = &c.icQueue[0]
} else {
icQueue = &c.icQueue[1]
}
prevIC := icQueue.Swap(ic)
prevIC := c.icQueue[target].Swap(ic)
if prevIC == nil {
return nil
}