Make data message naming a bit more consistent. (#4177)

* Make data message naming a bit more consistent.

OnDataPacket and OnDataMessage half-way in the chain made it confusing
(for me at least). Use same name throughout.
API still uses SendDataPacket, but that is not harder to read.

* test
This commit is contained in:
Raja Subramanian
2025-12-19 12:09:16 +05:30
committed by GitHub
parent a04e566dbf
commit e7601251bc
6 changed files with 67 additions and 66 deletions
+2 -2
View File
@@ -2460,7 +2460,7 @@ func (p *ParticipantImpl) handleReceivedDataMessage(kind livekit.DataPacket_Kind
}
if shouldForwardData {
p.listener().OnDataPacket(p, kind, dp)
p.listener().OnDataMessage(p, kind, dp)
}
if shouldForwardMetrics {
p.listener().OnMetrics(p, dp)
@@ -2474,7 +2474,7 @@ func (p *ParticipantImpl) onReceivedDataMessageUnlabeled(data []byte) {
p.dataChannelStats.AddBytes(uint64(len(data)), false)
p.listener().OnDataMessage(p, data)
p.listener().OnDataMessageUnlabeled(p, data)
}
func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {
+8 -7
View File
@@ -825,7 +825,7 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) {
}
func (r *Room) SendDataPacket(dp *livekit.DataPacket, kind livekit.DataPacket_Kind) {
r.onDataPacket(nil, kind, dp)
r.onDataMessage(nil, kind, dp)
}
func (r *Room) SetMetadata(metadata string) <-chan struct{} {
@@ -1253,7 +1253,8 @@ func (r *Room) onStateChange(p types.LocalParticipant) {
go r.RemoveParticipant(p.Identity(), p.ID(), p.CloseReason())
}
}
func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) {
func (r *Room) onDataMessage(source types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) {
if kind == livekit.DataPacket_RELIABLE && source != nil && dp.GetSequence() > 0 {
data, err := proto.Marshal(dp)
if err != nil {
@@ -1270,7 +1271,7 @@ func (r *Room) onDataPacket(source types.LocalParticipant, kind livekit.DataPack
BroadcastDataPacketForRoom(r, source, kind, dp, r.logger)
}
func (r *Room) onDataMessage(source types.LocalParticipant, data []byte) {
func (r *Room) onDataMessageUnlabeled(source types.LocalParticipant, data []byte) {
BroadcastDataMessageForRoom(r, source, data, r.logger)
}
@@ -1924,12 +1925,12 @@ func (l *localParticipantListener) OnSubscriberReady(p types.LocalParticipant) {
func (l *localParticipantListener) OnMigrateStateChange(_p types.LocalParticipant, _migrateState types.MigrateState) {
}
func (l *localParticipantListener) OnDataPacket(p types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) {
l.room.onDataPacket(p, kind, dp)
func (l *localParticipantListener) OnDataMessage(p types.LocalParticipant, kind livekit.DataPacket_Kind, dp *livekit.DataPacket) {
l.room.onDataMessage(p, kind, dp)
}
func (l *localParticipantListener) OnDataMessage(p types.LocalParticipant, data []byte) {
l.room.onDataMessage(p, data)
func (l *localParticipantListener) OnDataMessageUnlabeled(p types.LocalParticipant, data []byte) {
l.room.onDataMessageUnlabeled(p, data)
}
func (l *localParticipantListener) OnDataTrackMessage(_p types.LocalParticipant, _data []byte, _packet *datatrack.Packet) {
+3 -3
View File
@@ -645,7 +645,7 @@ func TestDataChannel(t *testing.T) {
}
encoded, _ := proto.Marshal(packetExp)
lpl.OnDataPacket(p, packet.Kind, packet)
lpl.OnDataMessage(p, packet.Kind, packet)
// ensure everyone has received the packet
for _, op := range participants {
@@ -695,7 +695,7 @@ func TestDataChannel(t *testing.T) {
}
encoded, _ := proto.Marshal(packetExp)
lpl.OnDataPacket(p, packet.Kind, packet)
lpl.OnDataMessage(p, packet.Kind, packet)
// only p1 should receive the data
for _, op := range participants {
@@ -729,7 +729,7 @@ func TestDataChannel(t *testing.T) {
}
if p.CanPublishData() {
lpl := rm.LocalParticipantListener()
lpl.OnDataPacket(p, packet.Kind, &packet)
lpl.OnDataMessage(p, packet.Kind, &packet)
}
// no one should've been sent packet
+4 -4
View File
@@ -581,8 +581,8 @@ type LocalParticipantListener interface {
OnStateChange(LocalParticipant)
OnSubscriberReady(LocalParticipant)
OnMigrateStateChange(LocalParticipant, MigrateState)
OnDataPacket(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
OnDataMessage(LocalParticipant, []byte)
OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
OnDataMessageUnlabeled(LocalParticipant, []byte)
OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet)
OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool)
OnUpdateSubscriptions(
@@ -607,9 +607,9 @@ type NullLocalParticipantListener struct {
func (*NullLocalParticipantListener) OnStateChange(LocalParticipant) {}
func (*NullLocalParticipantListener) OnSubscriberReady(LocalParticipant) {}
func (*NullLocalParticipantListener) OnMigrateStateChange(LocalParticipant, MigrateState) {}
func (*NullLocalParticipantListener) OnDataPacket(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
func (*NullLocalParticipantListener) OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
}
func (*NullLocalParticipantListener) OnDataMessage(LocalParticipant, []byte) {}
func (*NullLocalParticipantListener) OnDataMessageUnlabeled(LocalParticipant, []byte) {}
func (*NullLocalParticipantListener) OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet) {
}
func (*NullLocalParticipantListener) OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool) {
@@ -10,19 +10,19 @@ import (
)
type FakeLocalParticipantListener struct {
OnDataMessageStub func(types.LocalParticipant, []byte)
OnDataMessageStub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
onDataMessageMutex sync.RWMutex
onDataMessageArgsForCall []struct {
arg1 types.LocalParticipant
arg2 []byte
}
OnDataPacketStub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
onDataPacketMutex sync.RWMutex
onDataPacketArgsForCall []struct {
arg1 types.LocalParticipant
arg2 livekit.DataPacket_Kind
arg3 *livekit.DataPacket
}
OnDataMessageUnlabeledStub func(types.LocalParticipant, []byte)
onDataMessageUnlabeledMutex sync.RWMutex
onDataMessageUnlabeledArgsForCall []struct {
arg1 types.LocalParticipant
arg2 []byte
}
OnDataTrackMessageStub func(types.LocalParticipant, []byte, *datatrack.Packet)
onDataTrackMessageMutex sync.RWMutex
onDataTrackMessageArgsForCall []struct {
@@ -154,22 +154,18 @@ type FakeLocalParticipantListener struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeLocalParticipantListener) OnDataMessage(arg1 types.LocalParticipant, arg2 []byte) {
var arg2Copy []byte
if arg2 != nil {
arg2Copy = make([]byte, len(arg2))
copy(arg2Copy, arg2)
}
func (fake *FakeLocalParticipantListener) OnDataMessage(arg1 types.LocalParticipant, arg2 livekit.DataPacket_Kind, arg3 *livekit.DataPacket) {
fake.onDataMessageMutex.Lock()
fake.onDataMessageArgsForCall = append(fake.onDataMessageArgsForCall, struct {
arg1 types.LocalParticipant
arg2 []byte
}{arg1, arg2Copy})
arg2 livekit.DataPacket_Kind
arg3 *livekit.DataPacket
}{arg1, arg2, arg3})
stub := fake.OnDataMessageStub
fake.recordInvocation("OnDataMessage", []interface{}{arg1, arg2Copy})
fake.recordInvocation("OnDataMessage", []interface{}{arg1, arg2, arg3})
fake.onDataMessageMutex.Unlock()
if stub != nil {
fake.OnDataMessageStub(arg1, arg2)
fake.OnDataMessageStub(arg1, arg2, arg3)
}
}
@@ -179,51 +175,55 @@ func (fake *FakeLocalParticipantListener) OnDataMessageCallCount() int {
return len(fake.onDataMessageArgsForCall)
}
func (fake *FakeLocalParticipantListener) OnDataMessageCalls(stub func(types.LocalParticipant, []byte)) {
func (fake *FakeLocalParticipantListener) OnDataMessageCalls(stub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) {
fake.onDataMessageMutex.Lock()
defer fake.onDataMessageMutex.Unlock()
fake.OnDataMessageStub = stub
}
func (fake *FakeLocalParticipantListener) OnDataMessageArgsForCall(i int) (types.LocalParticipant, []byte) {
func (fake *FakeLocalParticipantListener) OnDataMessageArgsForCall(i int) (types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
fake.onDataMessageMutex.RLock()
defer fake.onDataMessageMutex.RUnlock()
argsForCall := fake.onDataMessageArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipantListener) OnDataPacket(arg1 types.LocalParticipant, arg2 livekit.DataPacket_Kind, arg3 *livekit.DataPacket) {
fake.onDataPacketMutex.Lock()
fake.onDataPacketArgsForCall = append(fake.onDataPacketArgsForCall, struct {
func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeled(arg1 types.LocalParticipant, arg2 []byte) {
var arg2Copy []byte
if arg2 != nil {
arg2Copy = make([]byte, len(arg2))
copy(arg2Copy, arg2)
}
fake.onDataMessageUnlabeledMutex.Lock()
fake.onDataMessageUnlabeledArgsForCall = append(fake.onDataMessageUnlabeledArgsForCall, struct {
arg1 types.LocalParticipant
arg2 livekit.DataPacket_Kind
arg3 *livekit.DataPacket
}{arg1, arg2, arg3})
stub := fake.OnDataPacketStub
fake.recordInvocation("OnDataPacket", []interface{}{arg1, arg2, arg3})
fake.onDataPacketMutex.Unlock()
arg2 []byte
}{arg1, arg2Copy})
stub := fake.OnDataMessageUnlabeledStub
fake.recordInvocation("OnDataMessageUnlabeled", []interface{}{arg1, arg2Copy})
fake.onDataMessageUnlabeledMutex.Unlock()
if stub != nil {
fake.OnDataPacketStub(arg1, arg2, arg3)
fake.OnDataMessageUnlabeledStub(arg1, arg2)
}
}
func (fake *FakeLocalParticipantListener) OnDataPacketCallCount() int {
fake.onDataPacketMutex.RLock()
defer fake.onDataPacketMutex.RUnlock()
return len(fake.onDataPacketArgsForCall)
func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledCallCount() int {
fake.onDataMessageUnlabeledMutex.RLock()
defer fake.onDataMessageUnlabeledMutex.RUnlock()
return len(fake.onDataMessageUnlabeledArgsForCall)
}
func (fake *FakeLocalParticipantListener) OnDataPacketCalls(stub func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) {
fake.onDataPacketMutex.Lock()
defer fake.onDataPacketMutex.Unlock()
fake.OnDataPacketStub = stub
func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledCalls(stub func(types.LocalParticipant, []byte)) {
fake.onDataMessageUnlabeledMutex.Lock()
defer fake.onDataMessageUnlabeledMutex.Unlock()
fake.OnDataMessageUnlabeledStub = stub
}
func (fake *FakeLocalParticipantListener) OnDataPacketArgsForCall(i int) (types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
fake.onDataPacketMutex.RLock()
defer fake.onDataPacketMutex.RUnlock()
argsForCall := fake.onDataPacketArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledArgsForCall(i int) (types.LocalParticipant, []byte) {
fake.onDataMessageUnlabeledMutex.RLock()
defer fake.onDataMessageUnlabeledMutex.RUnlock()
argsForCall := fake.onDataMessageUnlabeledArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipantListener) OnDataTrackMessage(arg1 types.LocalParticipant, arg2 []byte, arg3 *datatrack.Packet) {
+7 -7
View File
@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
v, err := rpc.NewTypedRoomClient(clientParams)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
v2, err := rpc.NewTypedParticipantClient(clientParams)
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
if err != nil {
return nil, err
}