Add the ability to send data to specific participants (#34)

* UserPacket destination filter

* Updated protocol to v0.5.2

* Added DataChannel tests
This commit is contained in:
Théo Monnom
2021-06-09 21:14:14 +02:00
committed by GitHub
parent bbf7f83784
commit 321ebab99b
4 changed files with 78 additions and 1 deletions

2
go.mod
View File

@@ -10,7 +10,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.5.1
github.com/livekit/protocol v0.5.2
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

2
go.sum
View File

@@ -232,6 +232,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.5.1 h1:RSX0sQM2+NK2kCHTg/n6wPf0d3qYskc611GYV3h+5+8=
github.com/livekit/protocol v0.5.1/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/livekit/protocol v0.5.2 h1:jYj+50gKBrtJ1q6iEVdHDruHVllA1xtOhiM1/YKqgMk=
github.com/livekit/protocol v0.5.2/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -391,6 +391,8 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) {
}
func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
dest := dp.GetUser().GetDestinationSids()
for _, op := range r.GetParticipants() {
if op.State() != livekit.ParticipantInfo_ACTIVE {
continue
@@ -398,6 +400,18 @@ func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
if op.ID() == source.ID() {
continue
}
if len(dest) > 0 {
found := false
for _, dSid := range dest {
if op.ID() == dSid {
found = true
break
}
}
if !found {
continue
}
}
_ = op.SendDataPacket(dp)
}
}

View File

@@ -335,6 +335,67 @@ func TestActiveSpeakers(t *testing.T) {
})
}
func TestDataChannel(t *testing.T) {
t.Parallel()
t.Run("participants should receive data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 3})
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
packet := livekit.DataPacket{
Kind: livekit.DataPacket_RELIABLE,
Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{
ParticipantSid: p.ID(),
Payload: []byte("message.."),
},
},
}
p.OnDataPacketArgsForCall(0)(p, &packet)
// ensure everyone has received the packet
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
if fp == p {
require.Zero(t, fp.SendDataPacketCallCount())
continue
}
require.Equal(t, 1, fp.SendDataPacketCallCount())
require.Equal(t, packet.Value, fp.SendDataPacketArgsForCall(0).Value)
}
})
t.Run("only one participant should receive the data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 4})
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p1 := participants[1].(*typesfakes.FakeParticipant)
packet := livekit.DataPacket{
Kind: livekit.DataPacket_RELIABLE,
Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{
ParticipantSid: p.ID(),
Payload: []byte("message to p1.."),
DestinationSids: []string{p1.ID()},
},
},
}
p.OnDataPacketArgsForCall(0)(p, &packet)
// only p1 should receive the data
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
if fp != p1 {
require.Zero(t, fp.SendDataPacketCallCount())
}
}
require.Equal(t, 1, p1.SendDataPacketCallCount())
require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value)
})
}
type testRoomOpts struct {
num int
protocol types.ProtocolVersion