diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3789afb29..5d53f7b0c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3121,12 +3121,12 @@ func (p *ParticipantImpl) SendDataMessage(kind livekit.DataPacket_Kind, data []b return p.TransportManager.SendDataMessage(kind, data) } -func (p *ParticipantImpl) SendDataMessageUnlabeled(data []byte, useRaw bool) error { +func (p *ParticipantImpl) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error { if p.State() != livekit.ParticipantInfo_ACTIVE { return ErrDataChannelUnavailable } - return p.TransportManager.SendDataMessageUnlabeled(data, useRaw) + return p.TransportManager.SendDataMessageUnlabeled(data, useRaw, sender) } func (p *ParticipantImpl) onDataSendError(err error) { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index ebf093ec9..ebcf1d6c6 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1842,7 +1842,7 @@ func BroadcastDataMessageForRoom(r types.Room, source types.LocalParticipant, da return } - op.SendDataMessageUnlabeled(data, false) + op.SendDataMessageUnlabeled(data, false, source.Identity()) }) } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 6f1e502e6..db877c5ac 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1181,7 +1181,7 @@ func (t *PCTransport) SendDataMessage(kind livekit.DataPacket_Kind, data []byte) return t.sendDataMessage(dc, data) } -func (t *PCTransport) SendDataMessageUnlabeled(data []byte, useRaw bool) error { +func (t *PCTransport) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error { convertToUserPacket := false var dc *datachannel.DataChannelWriter[*webrtc.DataChannel] t.lock.RLock() @@ -1203,6 +1203,7 @@ func (t *PCTransport) SendDataMessageUnlabeled(data []byte, useRaw bool) error { if convertToUserPacket { dpData, err := proto.Marshal(&livekit.DataPacket{ + ParticipantIdentity: string(sender), Value: &livekit.DataPacket_User{ User: &livekit.UserPacket{Payload: data}, }, diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 6475666ff..719dda382 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -302,10 +302,10 @@ func (t *TransportManager) SendDataMessage(kind livekit.DataPacket_Kind, data [] return t.handleSendDataResult(t.getTransport(true).SendDataMessage(kind, data), kind.String(), len(data)) } -func (t *TransportManager) SendDataMessageUnlabeled(data []byte, useRaw bool) error { +func (t *TransportManager) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error { // downstream data is sent via primary peer connection return t.handleSendDataResult( - t.getTransport(true).SendDataMessageUnlabeled(data, useRaw), + t.getTransport(true).SendDataMessageUnlabeled(data, useRaw, sender), "unlabeled", len(data), ) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 31a10f9dd..4741bcf30 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -407,7 +407,7 @@ type LocalParticipant interface { SendParticipantUpdate(participants []*livekit.ParticipantInfo) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, force bool) error SendDataMessage(kind livekit.DataPacket_Kind, data []byte) error - SendDataMessageUnlabeled(data []byte, useRaw bool) error + SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error SendRoomUpdate(room *livekit.Room) error SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index bb6eaa50b..49232f2e0 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -843,11 +843,12 @@ type FakeLocalParticipant struct { sendDataMessageReturnsOnCall map[int]struct { result1 error } - SendDataMessageUnlabeledStub func([]byte, bool) error + SendDataMessageUnlabeledStub func([]byte, bool, livekit.ParticipantIdentity) error sendDataMessageUnlabeledMutex sync.RWMutex sendDataMessageUnlabeledArgsForCall []struct { arg1 []byte arg2 bool + arg3 livekit.ParticipantIdentity } sendDataMessageUnlabeledReturns struct { result1 error @@ -5666,7 +5667,7 @@ func (fake *FakeLocalParticipant) SendDataMessageReturnsOnCall(i int, result1 er }{result1} } -func (fake *FakeLocalParticipant) SendDataMessageUnlabeled(arg1 []byte, arg2 bool) error { +func (fake *FakeLocalParticipant) SendDataMessageUnlabeled(arg1 []byte, arg2 bool, arg3 livekit.ParticipantIdentity) error { var arg1Copy []byte if arg1 != nil { arg1Copy = make([]byte, len(arg1)) @@ -5677,13 +5678,14 @@ func (fake *FakeLocalParticipant) SendDataMessageUnlabeled(arg1 []byte, arg2 boo fake.sendDataMessageUnlabeledArgsForCall = append(fake.sendDataMessageUnlabeledArgsForCall, struct { arg1 []byte arg2 bool - }{arg1Copy, arg2}) + arg3 livekit.ParticipantIdentity + }{arg1Copy, arg2, arg3}) stub := fake.SendDataMessageUnlabeledStub fakeReturns := fake.sendDataMessageUnlabeledReturns - fake.recordInvocation("SendDataMessageUnlabeled", []interface{}{arg1Copy, arg2}) + fake.recordInvocation("SendDataMessageUnlabeled", []interface{}{arg1Copy, arg2, arg3}) fake.sendDataMessageUnlabeledMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -5697,17 +5699,17 @@ func (fake *FakeLocalParticipant) SendDataMessageUnlabeledCallCount() int { return len(fake.sendDataMessageUnlabeledArgsForCall) } -func (fake *FakeLocalParticipant) SendDataMessageUnlabeledCalls(stub func([]byte, bool) error) { +func (fake *FakeLocalParticipant) SendDataMessageUnlabeledCalls(stub func([]byte, bool, livekit.ParticipantIdentity) error) { fake.sendDataMessageUnlabeledMutex.Lock() defer fake.sendDataMessageUnlabeledMutex.Unlock() fake.SendDataMessageUnlabeledStub = stub } -func (fake *FakeLocalParticipant) SendDataMessageUnlabeledArgsForCall(i int) ([]byte, bool) { +func (fake *FakeLocalParticipant) SendDataMessageUnlabeledArgsForCall(i int) ([]byte, bool, livekit.ParticipantIdentity) { fake.sendDataMessageUnlabeledMutex.RLock() defer fake.sendDataMessageUnlabeledMutex.RUnlock() argsForCall := fake.sendDataMessageUnlabeledArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeLocalParticipant) SendDataMessageUnlabeledReturns(result1 error) { diff --git a/test/client/client.go b/test/client/client.go index 3e51ae4d2..08c03e20b 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -754,7 +754,7 @@ func (c *RTCClient) PublishDataUnlabeled(data []byte) error { return err } - return c.publisher.SendDataMessageUnlabeled(data, true) + return c.publisher.SendDataMessageUnlabeled(data, true, "test") } func (c *RTCClient) GetPublishedTrackIDs() []string {