Clean up missed v2 pieces (#3837)

* Clean up missed v2 pieces

* missed stuff
This commit is contained in:
Raja Subramanian
2025-08-06 22:30:50 +05:30
committed by GitHub
parent 34a491309f
commit 7dea101286
4 changed files with 10 additions and 171 deletions
+2 -31
View File
@@ -62,9 +62,8 @@ import (
)
const (
LossyDataChannel = "_lossy"
ReliableDataChannel = "_reliable"
SignallingDataChannel = "_signalling"
LossyDataChannel = "_lossy"
ReliableDataChannel = "_reliable"
fastNegotiationFrequency = 10 * time.Millisecond
negotiationFrequency = 150 * time.Millisecond
@@ -205,7 +204,6 @@ type PCTransport struct {
lossyDC *datachannel.DataChannelWriter[*webrtc.DataChannel]
lossyDCOpened bool
unlabeledDataChannels []*datachannel.DataChannelWriter[*webrtc.DataChannel]
signallingDataChannel *datachannel.DataChannelWriter[*webrtc.DataChannel]
iceStartedAt time.Time
iceConnectedAt time.Time
@@ -807,7 +805,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
t.params.Logger.Debugw(dc.Label() + " data channel open")
var kind livekit.DataPacket_Kind
var isUnlabeled bool
var isSignalling bool
switch dc.Label() {
case ReliableDataChannel:
kind = livekit.DataPacket_RELIABLE
@@ -815,10 +812,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
case LossyDataChannel:
kind = livekit.DataPacket_LOSSY
case SignallingDataChannel:
t.params.Logger.Infow("signalling datachannel added", "label", dc.Label())
isSignalling = true
default:
t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label())
isUnlabeled = true
@@ -839,13 +832,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
)
t.lock.Unlock()
case isSignalling:
t.lock.Lock()
signallingDataChannel := datachannel.NewDataChannelWriter(dc, rawDC, 0)
t.signallingDataChannel = signallingDataChannel
t.lock.Unlock()
t.params.Handler.OnDataChannelOpenSignalling(signallingDataChannel)
case kind == livekit.DataPacket_RELIABLE:
t.lock.Lock()
if t.reliableDC != nil {
@@ -881,9 +867,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
case isUnlabeled:
t.params.Handler.OnDataMessageUnlabeled(buffer[:n])
case isSignalling:
t.params.Handler.OnDataMessageSignalling(buffer[:n])
default:
t.params.Handler.OnDataMessage(kind, buffer[:n])
}
@@ -892,18 +875,6 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
t.maybeNotifyFullyEstablished()
})
dc.OnClose(func() {
t.params.Logger.Debugw(dc.Label() + " data channel close")
switch dc.Label() {
case SignallingDataChannel:
t.lock.RLock()
signallingDataChannel := t.signallingDataChannel
t.lock.RUnlock()
t.params.Handler.OnDataChannelCloseSignalling(signallingDataChannel)
}
})
}
func (t *PCTransport) maybeNotifyFullyEstablished() {
+1 -10
View File
@@ -20,7 +20,6 @@ import (
"github.com/pion/webrtc/v4"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
"github.com/livekit/protocol/livekit"
)
@@ -42,9 +41,6 @@ type Handler interface {
OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver)
OnDataMessage(kind livekit.DataPacket_Kind, data []byte)
OnDataMessageUnlabeled(data []byte)
OnDataChannelOpenSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel])
OnDataChannelCloseSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel])
OnDataMessageSignalling(data []byte)
OnDataSendError(err error)
OnOffer(sd webrtc.SessionDescription, offerId uint32) error
OnAnswer(sd webrtc.SessionDescription, answerId uint32) error
@@ -64,12 +60,7 @@ func (h UnimplementedHandler) OnFailed(isShortLived bool)
func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {}
func (h UnimplementedHandler) OnDataMessage(kind livekit.DataPacket_Kind, data []byte) {}
func (h UnimplementedHandler) OnDataMessageUnlabeled(data []byte) {}
func (h UnimplementedHandler) OnDataChannelOpenSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) {
}
func (h UnimplementedHandler) OnDataChannelCloseSignalling(dc *datachannel.DataChannelWriter[*webrtc.DataChannel]) {
}
func (h UnimplementedHandler) OnDataMessageSignalling(data []byte) {}
func (h UnimplementedHandler) OnDataSendError(err error) {}
func (h UnimplementedHandler) OnDataSendError(err error) {}
func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error {
return ErrNoOfferHandler
}
@@ -6,7 +6,6 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/transport"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
"github.com/livekit/protocol/livekit"
webrtc "github.com/pion/webrtc/v4"
@@ -25,27 +24,12 @@ type FakeHandler struct {
onAnswerReturnsOnCall map[int]struct {
result1 error
}
OnDataChannelCloseSignallingStub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])
onDataChannelCloseSignallingMutex sync.RWMutex
onDataChannelCloseSignallingArgsForCall []struct {
arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]
}
OnDataChannelOpenSignallingStub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])
onDataChannelOpenSignallingMutex sync.RWMutex
onDataChannelOpenSignallingArgsForCall []struct {
arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]
}
OnDataMessageStub func(livekit.DataPacket_Kind, []byte)
onDataMessageMutex sync.RWMutex
onDataMessageArgsForCall []struct {
arg1 livekit.DataPacket_Kind
arg2 []byte
}
OnDataMessageSignallingStub func([]byte)
onDataMessageSignallingMutex sync.RWMutex
onDataMessageSignallingArgsForCall []struct {
arg1 []byte
}
OnDataMessageUnlabeledStub func([]byte)
onDataMessageUnlabeledMutex sync.RWMutex
onDataMessageUnlabeledArgsForCall []struct {
@@ -186,70 +170,6 @@ func (fake *FakeHandler) OnAnswerReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeHandler) OnDataChannelCloseSignalling(arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]) {
fake.onDataChannelCloseSignallingMutex.Lock()
fake.onDataChannelCloseSignallingArgsForCall = append(fake.onDataChannelCloseSignallingArgsForCall, struct {
arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]
}{arg1})
stub := fake.OnDataChannelCloseSignallingStub
fake.recordInvocation("OnDataChannelCloseSignalling", []interface{}{arg1})
fake.onDataChannelCloseSignallingMutex.Unlock()
if stub != nil {
fake.OnDataChannelCloseSignallingStub(arg1)
}
}
func (fake *FakeHandler) OnDataChannelCloseSignallingCallCount() int {
fake.onDataChannelCloseSignallingMutex.RLock()
defer fake.onDataChannelCloseSignallingMutex.RUnlock()
return len(fake.onDataChannelCloseSignallingArgsForCall)
}
func (fake *FakeHandler) OnDataChannelCloseSignallingCalls(stub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])) {
fake.onDataChannelCloseSignallingMutex.Lock()
defer fake.onDataChannelCloseSignallingMutex.Unlock()
fake.OnDataChannelCloseSignallingStub = stub
}
func (fake *FakeHandler) OnDataChannelCloseSignallingArgsForCall(i int) *datachannel.DataChannelWriter[*webrtc.DataChannel] {
fake.onDataChannelCloseSignallingMutex.RLock()
defer fake.onDataChannelCloseSignallingMutex.RUnlock()
argsForCall := fake.onDataChannelCloseSignallingArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeHandler) OnDataChannelOpenSignalling(arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]) {
fake.onDataChannelOpenSignallingMutex.Lock()
fake.onDataChannelOpenSignallingArgsForCall = append(fake.onDataChannelOpenSignallingArgsForCall, struct {
arg1 *datachannel.DataChannelWriter[*webrtc.DataChannel]
}{arg1})
stub := fake.OnDataChannelOpenSignallingStub
fake.recordInvocation("OnDataChannelOpenSignalling", []interface{}{arg1})
fake.onDataChannelOpenSignallingMutex.Unlock()
if stub != nil {
fake.OnDataChannelOpenSignallingStub(arg1)
}
}
func (fake *FakeHandler) OnDataChannelOpenSignallingCallCount() int {
fake.onDataChannelOpenSignallingMutex.RLock()
defer fake.onDataChannelOpenSignallingMutex.RUnlock()
return len(fake.onDataChannelOpenSignallingArgsForCall)
}
func (fake *FakeHandler) OnDataChannelOpenSignallingCalls(stub func(*datachannel.DataChannelWriter[*webrtc.DataChannel])) {
fake.onDataChannelOpenSignallingMutex.Lock()
defer fake.onDataChannelOpenSignallingMutex.Unlock()
fake.OnDataChannelOpenSignallingStub = stub
}
func (fake *FakeHandler) OnDataChannelOpenSignallingArgsForCall(i int) *datachannel.DataChannelWriter[*webrtc.DataChannel] {
fake.onDataChannelOpenSignallingMutex.RLock()
defer fake.onDataChannelOpenSignallingMutex.RUnlock()
argsForCall := fake.onDataChannelOpenSignallingArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeHandler) OnDataMessage(arg1 livekit.DataPacket_Kind, arg2 []byte) {
var arg2Copy []byte
if arg2 != nil {
@@ -288,43 +208,6 @@ func (fake *FakeHandler) OnDataMessageArgsForCall(i int) (livekit.DataPacket_Kin
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeHandler) OnDataMessageSignalling(arg1 []byte) {
var arg1Copy []byte
if arg1 != nil {
arg1Copy = make([]byte, len(arg1))
copy(arg1Copy, arg1)
}
fake.onDataMessageSignallingMutex.Lock()
fake.onDataMessageSignallingArgsForCall = append(fake.onDataMessageSignallingArgsForCall, struct {
arg1 []byte
}{arg1Copy})
stub := fake.OnDataMessageSignallingStub
fake.recordInvocation("OnDataMessageSignalling", []interface{}{arg1Copy})
fake.onDataMessageSignallingMutex.Unlock()
if stub != nil {
fake.OnDataMessageSignallingStub(arg1)
}
}
func (fake *FakeHandler) OnDataMessageSignallingCallCount() int {
fake.onDataMessageSignallingMutex.RLock()
defer fake.onDataMessageSignallingMutex.RUnlock()
return len(fake.onDataMessageSignallingArgsForCall)
}
func (fake *FakeHandler) OnDataMessageSignallingCalls(stub func([]byte)) {
fake.onDataMessageSignallingMutex.Lock()
defer fake.onDataMessageSignallingMutex.Unlock()
fake.OnDataMessageSignallingStub = stub
}
func (fake *FakeHandler) OnDataMessageSignallingArgsForCall(i int) []byte {
fake.onDataMessageSignallingMutex.RLock()
defer fake.onDataMessageSignallingMutex.RUnlock()
argsForCall := fake.onDataMessageSignallingArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeHandler) OnDataMessageUnlabeled(arg1 []byte) {
var arg1Copy []byte
if arg1 != nil {
@@ -754,14 +637,8 @@ func (fake *FakeHandler) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.onAnswerMutex.RLock()
defer fake.onAnswerMutex.RUnlock()
fake.onDataChannelCloseSignallingMutex.RLock()
defer fake.onDataChannelCloseSignallingMutex.RUnlock()
fake.onDataChannelOpenSignallingMutex.RLock()
defer fake.onDataChannelOpenSignallingMutex.RUnlock()
fake.onDataMessageMutex.RLock()
defer fake.onDataMessageMutex.RUnlock()
fake.onDataMessageSignallingMutex.RLock()
defer fake.onDataMessageSignallingMutex.RUnlock()
fake.onDataMessageUnlabeledMutex.RLock()
defer fake.onDataMessageUnlabeledMutex.RUnlock()
fake.onDataSendErrorMutex.RLock()
+7 -7
View File
@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
v, err := rpc.NewTypedRoomClient(clientParams)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
v2, err := rpc.NewTypedParticipantClient(clientParams)
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
if err != nil {
return nil, err
}