From 321ebab99b3a50baf72b40d51ae5a4fcf590e116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Wed, 9 Jun 2021 21:14:14 +0200 Subject: [PATCH] Add the ability to send data to specific participants (#34) * UserPacket destination filter * Updated protocol to v0.5.2 * Added DataChannel tests --- go.mod | 2 +- go.sum | 2 ++ pkg/rtc/room.go | 14 ++++++++++ pkg/rtc/room_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 9e8f26642..388f747c4 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.5.1 + github.com/livekit/protocol v0.5.2 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index a64351685..c560b6e39 100644 --- a/go.sum +++ b/go.sum @@ -232,6 +232,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v0.5.1 h1:RSX0sQM2+NK2kCHTg/n6wPf0d3qYskc611GYV3h+5+8= github.com/livekit/protocol v0.5.1/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= +github.com/livekit/protocol v0.5.2 h1:jYj+50gKBrtJ1q6iEVdHDruHVllA1xtOhiM1/YKqgMk= +github.com/livekit/protocol v0.5.2/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 44d8e25bd..214da4aaa 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -391,6 +391,8 @@ func (r *Room) onParticipantMetadataUpdate(p types.Participant) { } func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) { + dest := dp.GetUser().GetDestinationSids() + for _, op := range r.GetParticipants() { if op.State() != livekit.ParticipantInfo_ACTIVE { continue @@ -398,6 +400,18 @@ func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) { if op.ID() == source.ID() { continue } + if len(dest) > 0 { + found := false + for _, dSid := range dest { + if op.ID() == dSid { + found = true + break + } + } + if !found { + continue + } + } _ = op.SendDataPacket(dp) } } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 5d14ba5ba..fac0214f1 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -335,6 +335,67 @@ func TestActiveSpeakers(t *testing.T) { }) } +func TestDataChannel(t *testing.T) { + t.Parallel() + + t.Run("participants should receive data", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 3}) + participants := rm.GetParticipants() + p := participants[0].(*typesfakes.FakeParticipant) + + packet := livekit.DataPacket{ + Kind: livekit.DataPacket_RELIABLE, + Value: &livekit.DataPacket_User{ + User: &livekit.UserPacket{ + ParticipantSid: p.ID(), + Payload: []byte("message.."), + }, + }, + } + p.OnDataPacketArgsForCall(0)(p, &packet) + + // ensure everyone has received the packet + for _, op := range participants { + fp := op.(*typesfakes.FakeParticipant) + if fp == p { + require.Zero(t, fp.SendDataPacketCallCount()) + continue + } + require.Equal(t, 1, fp.SendDataPacketCallCount()) + require.Equal(t, packet.Value, fp.SendDataPacketArgsForCall(0).Value) + } + }) + + t.Run("only one participant should receive the data", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 4}) + participants := rm.GetParticipants() + p := participants[0].(*typesfakes.FakeParticipant) + p1 := participants[1].(*typesfakes.FakeParticipant) + + packet := livekit.DataPacket{ + Kind: livekit.DataPacket_RELIABLE, + Value: &livekit.DataPacket_User{ + User: &livekit.UserPacket{ + ParticipantSid: p.ID(), + Payload: []byte("message to p1.."), + DestinationSids: []string{p1.ID()}, + }, + }, + } + p.OnDataPacketArgsForCall(0)(p, &packet) + + // only p1 should receive the data + for _, op := range participants { + fp := op.(*typesfakes.FakeParticipant) + if fp != p1 { + require.Zero(t, fp.SendDataPacketCallCount()) + } + } + require.Equal(t, 1, p1.SendDataPacketCallCount()) + require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value) + }) +} + type testRoomOpts struct { num int protocol types.ProtocolVersion