From 7dea101286c29c98fc97cbdc3b5f70a73a1c438a Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 6 Aug 2025 22:30:50 +0530 Subject: [PATCH] Clean up missed v2 pieces (#3837) * Clean up missed v2 pieces * missed stuff --- pkg/rtc/transport.go | 33 +---- pkg/rtc/transport/handler.go | 11 +- .../transport/transportfakes/fake_handler.go | 123 ------------------ pkg/service/wire_gen.go | 14 +- 4 files changed, 10 insertions(+), 171 deletions(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index bf40e948d..cb9a19811 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -62,9 +62,8 @@ import ( ) const ( - LossyDataChannel = "_lossy" - ReliableDataChannel = "_reliable" - SignallingDataChannel = "_signalling" + LossyDataChannel = "_lossy" + ReliableDataChannel = "_reliable" fastNegotiationFrequency = 10 * time.Millisecond negotiationFrequency = 150 * time.Millisecond @@ -205,7 +204,6 @@ type PCTransport struct { lossyDC *datachannel.DataChannelWriter[*webrtc.DataChannel] lossyDCOpened bool unlabeledDataChannels []*datachannel.DataChannelWriter[*webrtc.DataChannel] - signallingDataChannel *datachannel.DataChannelWriter[*webrtc.DataChannel] iceStartedAt time.Time iceConnectedAt time.Time @@ -807,7 +805,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { t.params.Logger.Debugw(dc.Label() + " data channel open") var kind livekit.DataPacket_Kind var isUnlabeled bool - var isSignalling bool switch dc.Label() { case ReliableDataChannel: kind = livekit.DataPacket_RELIABLE @@ -815,10 +812,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { case LossyDataChannel: kind = livekit.DataPacket_LOSSY - case SignallingDataChannel: - t.params.Logger.Infow("signalling datachannel added", "label", dc.Label()) - isSignalling = true - default: t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label()) isUnlabeled = true @@ -839,13 +832,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { ) t.lock.Unlock() - case isSignalling: - t.lock.Lock() - signallingDataChannel := datachannel.NewDataChannelWriter(dc, rawDC, 0) - t.signallingDataChannel = signallingDataChannel - t.lock.Unlock() - t.params.Handler.OnDataChannelOpenSignalling(signallingDataChannel) - case kind == livekit.DataPacket_RELIABLE: t.lock.Lock() if t.reliableDC != nil { @@ -881,9 +867,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { case isUnlabeled: t.params.Handler.OnDataMessageUnlabeled(buffer[:n]) - case isSignalling: - t.params.Handler.OnDataMessageSignalling(buffer[:n]) - default: t.params.Handler.OnDataMessage(kind, buffer[:n]) } @@ -892,18 +875,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { t.maybeNotifyFullyEstablished() }) - - dc.OnClose(func() { - t.params.Logger.Debugw(dc.Label() + " data channel close") - switch dc.Label() { - case SignallingDataChannel: - t.lock.RLock() - signallingDataChannel := t.signallingDataChannel - t.lock.RUnlock() - - t.params.Handler.OnDataChannelCloseSignalling(signallingDataChannel) - } - }) } func (t *PCTransport) maybeNotifyFullyEstablished() { diff --git a/pkg/rtc/transport/handler.go b/pkg/rtc/transport/handler.go index 82a47dfb3..e43e3bd7b 100644 --- a/pkg/rtc/transport/handler.go +++ b/pkg/rtc/transport/handler.go @@ -20,7 +20,6 @@ import ( "github.com/pion/webrtc/v4" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/protocol/livekit" ) @@ -42,9 +41,6 @@ type Handler interface { OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) OnDataMessage(kind livekit.DataPacket_Kind, data []byte) OnDataMessageUnlabeled(data []byte) - OnDataChannelOpenSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) - OnDataChannelCloseSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) - OnDataMessageSignalling(data []byte) OnDataSendError(err error) OnOffer(sd webrtc.SessionDescription, offerId uint32) error OnAnswer(sd webrtc.SessionDescription, answerId uint32) error @@ -64,12 +60,7 @@ func (h UnimplementedHandler) OnFailed(isShortLived bool) func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {} func (h UnimplementedHandler) OnDataMessage(kind livekit.DataPacket_Kind, data []byte) {} func (h UnimplementedHandler) OnDataMessageUnlabeled(data []byte) {} -func (h UnimplementedHandler) OnDataChannelOpenSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) { -} -func (h UnimplementedHandler) OnDataChannelCloseSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) { -} -func (h UnimplementedHandler) OnDataMessageSignalling(data []byte) {} -func (h UnimplementedHandler) OnDataSendError(err error) {} +func (h UnimplementedHandler) OnDataSendError(err error) {} func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error { return ErrNoOfferHandler } diff --git a/pkg/rtc/transport/transportfakes/fake_handler.go b/pkg/rtc/transport/transportfakes/fake_handler.go index 64d650ded..fd46b60ba 100644 --- a/pkg/rtc/transport/transportfakes/fake_handler.go +++ b/pkg/rtc/transport/transportfakes/fake_handler.go @@ -6,7 +6,6 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/protocol/livekit" webrtc "github.com/pion/webrtc/v4" @@ -25,27 +24,12 @@ type FakeHandler struct { onAnswerReturnsOnCall map[int]struct { result1 error } - OnDataChannelCloseSignallingStub func(*datachannel.DataChannelWriter[*webrtc.DataChannel]) - onDataChannelCloseSignallingMutex sync.RWMutex - onDataChannelCloseSignallingArgsForCall []struct { - arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel] - } - OnDataChannelOpenSignallingStub func(*datachannel.DataChannelWriter[*webrtc.DataChannel]) - onDataChannelOpenSignallingMutex sync.RWMutex - onDataChannelOpenSignallingArgsForCall []struct { - arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel] - } OnDataMessageStub func(livekit.DataPacket_Kind, []byte) onDataMessageMutex sync.RWMutex onDataMessageArgsForCall []struct { arg1 livekit.DataPacket_Kind arg2 []byte } - OnDataMessageSignallingStub func([]byte) - onDataMessageSignallingMutex sync.RWMutex - onDataMessageSignallingArgsForCall []struct { - arg1 []byte - } OnDataMessageUnlabeledStub func([]byte) onDataMessageUnlabeledMutex sync.RWMutex onDataMessageUnlabeledArgsForCall []struct { @@ -186,70 +170,6 @@ func (fake *FakeHandler) OnAnswerReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeHandler) OnDataChannelCloseSignalling(arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]) { - fake.onDataChannelCloseSignallingMutex.Lock() - fake.onDataChannelCloseSignallingArgsForCall = append(fake.onDataChannelCloseSignallingArgsForCall, struct { - arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel] - }{arg1}) - stub := fake.OnDataChannelCloseSignallingStub - fake.recordInvocation("OnDataChannelCloseSignalling", []interface{}{arg1}) - fake.onDataChannelCloseSignallingMutex.Unlock() - if stub != nil { - fake.OnDataChannelCloseSignallingStub(arg1) - } -} - -func (fake *FakeHandler) OnDataChannelCloseSignallingCallCount() int { - fake.onDataChannelCloseSignallingMutex.RLock() - defer fake.onDataChannelCloseSignallingMutex.RUnlock() - return len(fake.onDataChannelCloseSignallingArgsForCall) -} - -func (fake *FakeHandler) OnDataChannelCloseSignallingCalls(stub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])) { - fake.onDataChannelCloseSignallingMutex.Lock() - defer fake.onDataChannelCloseSignallingMutex.Unlock() - fake.OnDataChannelCloseSignallingStub = stub -} - -func (fake *FakeHandler) OnDataChannelCloseSignallingArgsForCall(i int) *datachannel.DataChannelWriter[*webrtc.DataChannel] { - fake.onDataChannelCloseSignallingMutex.RLock() - defer fake.onDataChannelCloseSignallingMutex.RUnlock() - argsForCall := fake.onDataChannelCloseSignallingArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeHandler) OnDataChannelOpenSignalling(arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]) { - fake.onDataChannelOpenSignallingMutex.Lock() - fake.onDataChannelOpenSignallingArgsForCall = append(fake.onDataChannelOpenSignallingArgsForCall, struct { - arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel] - }{arg1}) - stub := fake.OnDataChannelOpenSignallingStub - fake.recordInvocation("OnDataChannelOpenSignalling", []interface{}{arg1}) - fake.onDataChannelOpenSignallingMutex.Unlock() - if stub != nil { - fake.OnDataChannelOpenSignallingStub(arg1) - } -} - -func (fake *FakeHandler) OnDataChannelOpenSignallingCallCount() int { - fake.onDataChannelOpenSignallingMutex.RLock() - defer fake.onDataChannelOpenSignallingMutex.RUnlock() - return len(fake.onDataChannelOpenSignallingArgsForCall) -} - -func (fake *FakeHandler) OnDataChannelOpenSignallingCalls(stub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])) { - fake.onDataChannelOpenSignallingMutex.Lock() - defer fake.onDataChannelOpenSignallingMutex.Unlock() - fake.OnDataChannelOpenSignallingStub = stub -} - -func (fake *FakeHandler) OnDataChannelOpenSignallingArgsForCall(i int) *datachannel.DataChannelWriter[*webrtc.DataChannel] { - fake.onDataChannelOpenSignallingMutex.RLock() - defer fake.onDataChannelOpenSignallingMutex.RUnlock() - argsForCall := fake.onDataChannelOpenSignallingArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeHandler) OnDataMessage(arg1 livekit.DataPacket_Kind, arg2 []byte) { var arg2Copy []byte if arg2 != nil { @@ -288,43 +208,6 @@ func (fake *FakeHandler) OnDataMessageArgsForCall(i int) (livekit.DataPacket_Kin return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeHandler) OnDataMessageSignalling(arg1 []byte) { - var arg1Copy []byte - if arg1 != nil { - arg1Copy = make([]byte, len(arg1)) - copy(arg1Copy, arg1) - } - fake.onDataMessageSignallingMutex.Lock() - fake.onDataMessageSignallingArgsForCall = append(fake.onDataMessageSignallingArgsForCall, struct { - arg1 []byte - }{arg1Copy}) - stub := fake.OnDataMessageSignallingStub - fake.recordInvocation("OnDataMessageSignalling", []interface{}{arg1Copy}) - fake.onDataMessageSignallingMutex.Unlock() - if stub != nil { - fake.OnDataMessageSignallingStub(arg1) - } -} - -func (fake *FakeHandler) OnDataMessageSignallingCallCount() int { - fake.onDataMessageSignallingMutex.RLock() - defer fake.onDataMessageSignallingMutex.RUnlock() - return len(fake.onDataMessageSignallingArgsForCall) -} - -func (fake *FakeHandler) OnDataMessageSignallingCalls(stub func([]byte)) { - fake.onDataMessageSignallingMutex.Lock() - defer fake.onDataMessageSignallingMutex.Unlock() - fake.OnDataMessageSignallingStub = stub -} - -func (fake *FakeHandler) OnDataMessageSignallingArgsForCall(i int) []byte { - fake.onDataMessageSignallingMutex.RLock() - defer fake.onDataMessageSignallingMutex.RUnlock() - argsForCall := fake.onDataMessageSignallingArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeHandler) OnDataMessageUnlabeled(arg1 []byte) { var arg1Copy []byte if arg1 != nil { @@ -754,14 +637,8 @@ func (fake *FakeHandler) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.onAnswerMutex.RLock() defer fake.onAnswerMutex.RUnlock() - fake.onDataChannelCloseSignallingMutex.RLock() - defer fake.onDataChannelCloseSignallingMutex.RUnlock() - fake.onDataChannelOpenSignallingMutex.RLock() - defer fake.onDataChannelOpenSignallingMutex.RUnlock() fake.onDataMessageMutex.RLock() defer fake.onDataMessageMutex.RUnlock() - fake.onDataMessageSignallingMutex.RLock() - defer fake.onDataMessageSignallingMutex.RUnlock() fake.onDataMessageUnlabeledMutex.RLock() defer fake.onDataMessageUnlabeledMutex.RUnlock() fake.onDataSendErrorMutex.RLock() diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0c62abbbf..3b8913f0c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - v, err := rpc.NewTypedRoomClient(clientParams) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - v2, err := rpc.NewTypedParticipantClient(clientParams) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } - v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) + whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) if err != nil { return nil, err }