From e7601251bcf0d262275d46bbedebe5901148b3a0 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 19 Dec 2025 12:09:16 +0530 Subject: [PATCH] Make data message naming a bit more consistent. (#4177) * Make data message naming a bit more consistent. OnDataPacket and OnDataMessage half-way in the chain made it confusing (for me at least). Use same name throughout. API still uses SendDataPacket, but that is not harder to read. * test --- pkg/rtc/participant.go | 4 +- pkg/rtc/room.go | 15 ++-- pkg/rtc/room_test.go | 6 +- pkg/rtc/types/interfaces.go | 8 +- .../fake_local_participant_listener.go | 86 +++++++++---------- pkg/service/wire_gen.go | 14 +-- 6 files changed, 67 insertions(+), 66 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7af3ae451..456dbf469 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2460,7 +2460,7 @@ func (p *ParticipantImpl) handleReceivedDataMessage(kind livekit.DataPacket_Kind } if shouldForwardData { - p.listener().OnDataPacket(p, kind, dp) + p.listener().OnDataMessage(p, kind, dp) } if shouldForwardMetrics { p.listener().OnMetrics(p, dp) @@ -2474,7 +2474,7 @@ func (p *ParticipantImpl) onReceivedDataMessageUnlabeled(data []byte) { p.dataChannelStats.AddBytes(uint64(len(data)), false) - p.listener().OnDataMessage(p, data) + p.listener().OnDataMessageUnlabeled(p, data) } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0bc158db1..366e48af7 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -825,7 +825,7 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) { } func (r *Room) SendDataPacket(dp *livekit.DataPacket, kind livekit.DataPacket_Kind) { - r.onDataPacket(nil, kind, dp) + r.onDataMessage(nil, kind, dp) } func (r *Room) SetMetadata(metadata string) <-chan struct{} { @@ -1253,7 +1253,8 @@ func (r *Room) onStateChange(p types.LocalParticipant) { go r.RemoveParticipant(p.Identity(), p.ID(), p.CloseReason()) } } -func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) { + +func (r *Room) onDataMessage(source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) { if kind == livekit.DataPacket_RELIABLE && source != nil && dp.GetSequence() > 0 { data, err := proto.Marshal(dp) if err != nil { @@ -1270,7 +1271,7 @@ func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPack BroadcastDataPacketForRoom(r, source, kind, dp, r.logger) } -func (r *Room) onDataMessage(source types.LocalParticipant, data []byte) { +func (r *Room) onDataMessageUnlabeled(source types.LocalParticipant, data []byte) { BroadcastDataMessageForRoom(r, source, data, r.logger) } @@ -1924,12 +1925,12 @@ func (l *localParticipantListener) OnSubscriberReady(p types.LocalParticipant) { func (l *localParticipantListener) OnMigrateStateChange(_p types.LocalParticipant, _migrateState types.MigrateState) { } -func (l *localParticipantListener) OnDataPacket(p types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) { - l.room.onDataPacket(p, kind, dp) +func (l *localParticipantListener) OnDataMessage(p types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) { + l.room.onDataMessage(p, kind, dp) } -func (l *localParticipantListener) OnDataMessage(p types.LocalParticipant, data []byte) { - l.room.onDataMessage(p, data) +func (l *localParticipantListener) OnDataMessageUnlabeled(p types.LocalParticipant, data []byte) { + l.room.onDataMessageUnlabeled(p, data) } func (l *localParticipantListener) OnDataTrackMessage(_p types.LocalParticipant, _data []byte, _packet *datatrack.Packet) { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 646e8a193..f6ef6e5df 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -645,7 +645,7 @@ func TestDataChannel(t *testing.T) { } encoded, _ := proto.Marshal(packetExp) - lpl.OnDataPacket(p, packet.Kind, packet) + lpl.OnDataMessage(p, packet.Kind, packet) // ensure everyone has received the packet for _, op := range participants { @@ -695,7 +695,7 @@ func TestDataChannel(t *testing.T) { } encoded, _ := proto.Marshal(packetExp) - lpl.OnDataPacket(p, packet.Kind, packet) + lpl.OnDataMessage(p, packet.Kind, packet) // only p1 should receive the data for _, op := range participants { @@ -729,7 +729,7 @@ func TestDataChannel(t *testing.T) { } if p.CanPublishData() { lpl := rm.LocalParticipantListener() - lpl.OnDataPacket(p, packet.Kind, &packet) + lpl.OnDataMessage(p, packet.Kind, &packet) } // no one should've been sent packet diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index e7d37a381..b69510556 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -581,8 +581,8 @@ type LocalParticipantListener interface { OnStateChange(LocalParticipant) OnSubscriberReady(LocalParticipant) OnMigrateStateChange(LocalParticipant, MigrateState) - OnDataPacket(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) - OnDataMessage(LocalParticipant, []byte) + OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) + OnDataMessageUnlabeled(LocalParticipant, []byte) OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet) OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool) OnUpdateSubscriptions( @@ -607,9 +607,9 @@ type NullLocalParticipantListener struct { func (*NullLocalParticipantListener) OnStateChange(LocalParticipant) {} func (*NullLocalParticipantListener) OnSubscriberReady(LocalParticipant) {} func (*NullLocalParticipantListener) OnMigrateStateChange(LocalParticipant, MigrateState) {} -func (*NullLocalParticipantListener) OnDataPacket(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) { +func (*NullLocalParticipantListener) OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) { } -func (*NullLocalParticipantListener) OnDataMessage(LocalParticipant, []byte) {} +func (*NullLocalParticipantListener) OnDataMessageUnlabeled(LocalParticipant, []byte) {} func (*NullLocalParticipantListener) OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet) { } func (*NullLocalParticipantListener) OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool) { diff --git a/pkg/rtc/types/typesfakes/fake_local_participant_listener.go b/pkg/rtc/types/typesfakes/fake_local_participant_listener.go index 7e41f45a7..7c8c57649 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant_listener.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant_listener.go @@ -10,19 +10,19 @@ import ( ) type FakeLocalParticipantListener struct { - OnDataMessageStub func(types.LocalParticipant, []byte) + OnDataMessageStub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) onDataMessageMutex sync.RWMutex onDataMessageArgsForCall []struct { - arg1 types.LocalParticipant - arg2 []byte - } - OnDataPacketStub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) - onDataPacketMutex sync.RWMutex - onDataPacketArgsForCall []struct { arg1 types.LocalParticipant arg2 livekit.DataPacket_Kind arg3 *livekit.DataPacket } + OnDataMessageUnlabeledStub func(types.LocalParticipant, []byte) + onDataMessageUnlabeledMutex sync.RWMutex + onDataMessageUnlabeledArgsForCall []struct { + arg1 types.LocalParticipant + arg2 []byte + } OnDataTrackMessageStub func(types.LocalParticipant, []byte, *datatrack.Packet) onDataTrackMessageMutex sync.RWMutex onDataTrackMessageArgsForCall []struct { @@ -154,22 +154,18 @@ type FakeLocalParticipantListener struct { invocationsMutex sync.RWMutex } -func (fake *FakeLocalParticipantListener) OnDataMessage(arg1 types.LocalParticipant, arg2 []byte) { - var arg2Copy []byte - if arg2 != nil { - arg2Copy = make([]byte, len(arg2)) - copy(arg2Copy, arg2) - } +func (fake *FakeLocalParticipantListener) OnDataMessage(arg1 types.LocalParticipant, arg2 livekit.DataPacket_Kind, arg3 *livekit.DataPacket) { fake.onDataMessageMutex.Lock() fake.onDataMessageArgsForCall = append(fake.onDataMessageArgsForCall, struct { arg1 types.LocalParticipant - arg2 []byte - }{arg1, arg2Copy}) + arg2 livekit.DataPacket_Kind + arg3 *livekit.DataPacket + }{arg1, arg2, arg3}) stub := fake.OnDataMessageStub - fake.recordInvocation("OnDataMessage", []interface{}{arg1, arg2Copy}) + fake.recordInvocation("OnDataMessage", []interface{}{arg1, arg2, arg3}) fake.onDataMessageMutex.Unlock() if stub != nil { - fake.OnDataMessageStub(arg1, arg2) + fake.OnDataMessageStub(arg1, arg2, arg3) } } @@ -179,51 +175,55 @@ func (fake *FakeLocalParticipantListener) OnDataMessageCallCount() int { return len(fake.onDataMessageArgsForCall) } -func (fake *FakeLocalParticipantListener) OnDataMessageCalls(stub func(types.LocalParticipant, []byte)) { +func (fake *FakeLocalParticipantListener) OnDataMessageCalls(stub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) { fake.onDataMessageMutex.Lock() defer fake.onDataMessageMutex.Unlock() fake.OnDataMessageStub = stub } -func (fake *FakeLocalParticipantListener) OnDataMessageArgsForCall(i int) (types.LocalParticipant, []byte) { +func (fake *FakeLocalParticipantListener) OnDataMessageArgsForCall(i int) (types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) { fake.onDataMessageMutex.RLock() defer fake.onDataMessageMutex.RUnlock() argsForCall := fake.onDataMessageArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeLocalParticipantListener) OnDataPacket(arg1 types.LocalParticipant, arg2 livekit.DataPacket_Kind, arg3 *livekit.DataPacket) { - fake.onDataPacketMutex.Lock() - fake.onDataPacketArgsForCall = append(fake.onDataPacketArgsForCall, struct { +func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeled(arg1 types.LocalParticipant, arg2 []byte) { + var arg2Copy []byte + if arg2 != nil { + arg2Copy = make([]byte, len(arg2)) + copy(arg2Copy, arg2) + } + fake.onDataMessageUnlabeledMutex.Lock() + fake.onDataMessageUnlabeledArgsForCall = append(fake.onDataMessageUnlabeledArgsForCall, struct { arg1 types.LocalParticipant - arg2 livekit.DataPacket_Kind - arg3 *livekit.DataPacket - }{arg1, arg2, arg3}) - stub := fake.OnDataPacketStub - fake.recordInvocation("OnDataPacket", []interface{}{arg1, arg2, arg3}) - fake.onDataPacketMutex.Unlock() + arg2 []byte + }{arg1, arg2Copy}) + stub := fake.OnDataMessageUnlabeledStub + fake.recordInvocation("OnDataMessageUnlabeled", []interface{}{arg1, arg2Copy}) + fake.onDataMessageUnlabeledMutex.Unlock() if stub != nil { - fake.OnDataPacketStub(arg1, arg2, arg3) + fake.OnDataMessageUnlabeledStub(arg1, arg2) } } -func (fake *FakeLocalParticipantListener) OnDataPacketCallCount() int { - fake.onDataPacketMutex.RLock() - defer fake.onDataPacketMutex.RUnlock() - return len(fake.onDataPacketArgsForCall) +func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledCallCount() int { + fake.onDataMessageUnlabeledMutex.RLock() + defer fake.onDataMessageUnlabeledMutex.RUnlock() + return len(fake.onDataMessageUnlabeledArgsForCall) } -func (fake *FakeLocalParticipantListener) OnDataPacketCalls(stub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) { - fake.onDataPacketMutex.Lock() - defer fake.onDataPacketMutex.Unlock() - fake.OnDataPacketStub = stub +func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledCalls(stub func(types.LocalParticipant, []byte)) { + fake.onDataMessageUnlabeledMutex.Lock() + defer fake.onDataMessageUnlabeledMutex.Unlock() + fake.OnDataMessageUnlabeledStub = stub } -func (fake *FakeLocalParticipantListener) OnDataPacketArgsForCall(i int) (types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) { - fake.onDataPacketMutex.RLock() - defer fake.onDataPacketMutex.RUnlock() - argsForCall := fake.onDataPacketArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledArgsForCall(i int) (types.LocalParticipant, []byte) { + fake.onDataMessageUnlabeledMutex.RLock() + defer fake.onDataMessageUnlabeledMutex.RUnlock() + argsForCall := fake.onDataMessageUnlabeledArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeLocalParticipantListener) OnDataTrackMessage(arg1 types.LocalParticipant, arg2 []byte, arg3 *datatrack.Packet) { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 22b1e0c1c..b33cc1744 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - v, err := rpc.NewTypedRoomClient(clientParams) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - v2, err := rpc.NewTypedParticipantClient(clientParams) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } - v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) + whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) if err != nil { return nil, err }