don't send datatrack info to client (#492)

* don't send datatrack info to client
This commit is contained in:
cnderrauber
2022-03-08 17:05:26 +08:00
committed by GitHub
parent d57f76fbef
commit 128199e634
11 changed files with 47 additions and 28 deletions
+3 -3
View File
@@ -299,7 +299,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
}
}
func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
func (p *ParticipantImpl) ToProto(mediaTrackOnly bool) *livekit.ParticipantInfo {
info := &livekit.ParticipantInfo{
Sid: string(p.params.SID),
Identity: string(p.params.Identity),
@@ -315,7 +315,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
info.Metadata = p.params.Grants.Metadata
}
if p.dataTrack != nil {
if !mediaTrackOnly && p.dataTrack != nil {
info.Tracks = append(info.Tracks, p.dataTrack.ToProto())
}
@@ -607,7 +607,7 @@ func (p *ParticipantImpl) SendJoinResponse(
Message: &livekit.SignalResponse_Join{
Join: &livekit.JoinResponse{
Room: roomInfo,
Participant: p.ToProto(),
Participant: p.ToProto(true),
OtherParticipants: otherParticipants,
ServerVersion: version.Version,
ServerRegion: region,
+3 -3
View File
@@ -170,9 +170,9 @@ func TestOutOfOrderUpdates(t *testing.T) {
p := newParticipantForTest("test")
p.SetMetadata("initial metadata")
sink := p.GetResponseSink().(*routingfakes.FakeMessageSink)
pi1 := p.ToProto()
pi1 := p.ToProto(true)
p.SetMetadata("second update")
pi2 := p.ToProto()
pi2 := p.ToProto(true)
require.Greater(t, pi2.Version, pi1.Version)
@@ -208,7 +208,7 @@ func TestDisconnectTiming(t *testing.T) {
func TestCorrectJoinedAt(t *testing.T) {
p := newParticipantForTest("test")
info := p.ToProto()
info := p.ToProto(true)
require.NotZero(t, info.JoinedAt)
require.True(t, time.Now().Unix()-info.JoinedAt <= 1)
}
+1 -1
View File
@@ -244,7 +244,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants))
for _, p := range r.participants {
if p.ID() != participant.ID() && !p.Hidden() {
otherParticipants = append(otherParticipants, p.ToProto())
otherParticipants = append(otherParticipants, p.ToProto(true))
}
}
+1 -1
View File
@@ -53,7 +53,7 @@ type Participant interface {
ID() livekit.ParticipantID
Identity() livekit.ParticipantIdentity
ToProto() *livekit.ParticipantInfo
ToProto(mediaTrackOnly bool) *livekit.ParticipantInfo
SetMetadata(metadata string)
@@ -548,9 +548,10 @@ type FakeLocalParticipant struct {
arg2 livekit.TrackID
arg3 bool
}
ToProtoStub func() *livekit.ParticipantInfo
ToProtoStub func(bool) *livekit.ParticipantInfo
toProtoMutex sync.RWMutex
toProtoArgsForCall []struct {
arg1 bool
}
toProtoReturns struct {
result1 *livekit.ParticipantInfo
@@ -3559,17 +3560,18 @@ func (fake *FakeLocalParticipant) SubscriptionPermissionUpdateArgsForCall(i int)
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) ToProto() *livekit.ParticipantInfo {
func (fake *FakeLocalParticipant) ToProto(arg1 bool) *livekit.ParticipantInfo {
fake.toProtoMutex.Lock()
ret, specificReturn := fake.toProtoReturnsOnCall[len(fake.toProtoArgsForCall)]
fake.toProtoArgsForCall = append(fake.toProtoArgsForCall, struct {
}{})
arg1 bool
}{arg1})
stub := fake.ToProtoStub
fakeReturns := fake.toProtoReturns
fake.recordInvocation("ToProto", []interface{}{})
fake.recordInvocation("ToProto", []interface{}{arg1})
fake.toProtoMutex.Unlock()
if stub != nil {
return stub()
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -3583,12 +3585,19 @@ func (fake *FakeLocalParticipant) ToProtoCallCount() int {
return len(fake.toProtoArgsForCall)
}
func (fake *FakeLocalParticipant) ToProtoCalls(stub func() *livekit.ParticipantInfo) {
func (fake *FakeLocalParticipant) ToProtoCalls(stub func(bool) *livekit.ParticipantInfo) {
fake.toProtoMutex.Lock()
defer fake.toProtoMutex.Unlock()
fake.ToProtoStub = stub
}
func (fake *FakeLocalParticipant) ToProtoArgsForCall(i int) bool {
fake.toProtoMutex.RLock()
defer fake.toProtoMutex.RUnlock()
argsForCall := fake.toProtoArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ToProtoReturns(result1 *livekit.ParticipantInfo) {
fake.toProtoMutex.Lock()
defer fake.toProtoMutex.Unlock()
+15 -6
View File
@@ -141,9 +141,10 @@ type FakeParticipant struct {
subscriptionPermissionReturnsOnCall map[int]struct {
result1 *livekit.SubscriptionPermission
}
ToProtoStub func() *livekit.ParticipantInfo
ToProtoStub func(bool) *livekit.ParticipantInfo
toProtoMutex sync.RWMutex
toProtoArgsForCall []struct {
arg1 bool
}
toProtoReturns struct {
result1 *livekit.ParticipantInfo
@@ -905,17 +906,18 @@ func (fake *FakeParticipant) SubscriptionPermissionReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeParticipant) ToProto() *livekit.ParticipantInfo {
func (fake *FakeParticipant) ToProto(arg1 bool) *livekit.ParticipantInfo {
fake.toProtoMutex.Lock()
ret, specificReturn := fake.toProtoReturnsOnCall[len(fake.toProtoArgsForCall)]
fake.toProtoArgsForCall = append(fake.toProtoArgsForCall, struct {
}{})
arg1 bool
}{arg1})
stub := fake.ToProtoStub
fakeReturns := fake.toProtoReturns
fake.recordInvocation("ToProto", []interface{}{})
fake.recordInvocation("ToProto", []interface{}{arg1})
fake.toProtoMutex.Unlock()
if stub != nil {
return stub()
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -929,12 +931,19 @@ func (fake *FakeParticipant) ToProtoCallCount() int {
return len(fake.toProtoArgsForCall)
}
func (fake *FakeParticipant) ToProtoCalls(stub func() *livekit.ParticipantInfo) {
func (fake *FakeParticipant) ToProtoCalls(stub func(bool) *livekit.ParticipantInfo) {
fake.toProtoMutex.Lock()
defer fake.toProtoMutex.Unlock()
fake.ToProtoStub = stub
}
func (fake *FakeParticipant) ToProtoArgsForCall(i int) bool {
fake.toProtoMutex.RLock()
defer fake.toProtoMutex.RUnlock()
argsForCall := fake.toProtoArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) ToProtoReturns(result1 *livekit.ParticipantInfo) {
fake.toProtoMutex.Lock()
defer fake.toProtoMutex.Unlock()
+1 -1
View File
@@ -48,7 +48,7 @@ func UnpackDataTrackLabel(packed string) (peerID livekit.ParticipantID, trackID
func ToProtoParticipants(participants []types.LocalParticipant) []*livekit.ParticipantInfo {
infos := make([]*livekit.ParticipantInfo, 0, len(participants))
for _, op := range participants {
infos = append(infos, op.ToProto())
infos = append(infos, op.ToProto(true))
}
return infos
}
+4 -4
View File
@@ -270,7 +270,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam
_ = participant.Close(true)
return
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto(true)); err != nil {
pLogger.Errorw("could not store participant", err)
}
@@ -287,7 +287,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam
updateParticipantCount()
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id}
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(), pi.Client, clientMeta)
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(true), pi.Client, clientMeta)
participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
pLogger.Errorw("could not delete participant", err)
@@ -295,7 +295,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam
// update room store with new numParticipants
updateParticipantCount()
r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto())
r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto(true))
room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions)
})
@@ -359,7 +359,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
newRoom.OnParticipantChanged(func(p types.LocalParticipant) {
if p.State() != livekit.ParticipantInfo_DISCONNECTED {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto(true)); err != nil {
logger.Errorw("could not handle participant change", err)
}
}
+1
View File
@@ -558,6 +558,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
// Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end.
// Otherwise, with transceiver re-use last frame from previous stream is held in the
// display buffer and there could be a brief moment where the previous stream is displayed.
d.logger.Infow("close downtrack", "peerID", d.peerID, "trackID", d.id, "flushBlankFrame", flush)
if flush {
_ = d.writeBlankFrameRTP()
}
+2 -2
View File
@@ -140,10 +140,10 @@ func TestMultiNodeMutePublishedTrack(t *testing.T) {
Identity: identity,
})
require.NoError(t, err)
if len(res.Tracks) == 3 {
if len(res.Tracks) == 2 {
return ""
} else {
return fmt.Sprintf("expected three tracks to be published, actual: %d", len(res.Tracks))
return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks))
}
})
+1 -1
View File
@@ -420,7 +420,7 @@ func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) {
if pubRemote == nil {
return "could not find remote publisher"
}
if len(pubRemote.Tracks) != 3 {
if len(pubRemote.Tracks) != 2 {
return "did not receive metadata for published tracks"
}
return ""