diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index c0b6032e9..8fe04d027 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -299,7 +299,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio } } -func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { +func (p *ParticipantImpl) ToProto(mediaTrackOnly bool) *livekit.ParticipantInfo { info := &livekit.ParticipantInfo{ Sid: string(p.params.SID), Identity: string(p.params.Identity), @@ -315,7 +315,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { info.Metadata = p.params.Grants.Metadata } - if p.dataTrack != nil { + if !mediaTrackOnly && p.dataTrack != nil { info.Tracks = append(info.Tracks, p.dataTrack.ToProto()) } @@ -607,7 +607,7 @@ func (p *ParticipantImpl) SendJoinResponse( Message: &livekit.SignalResponse_Join{ Join: &livekit.JoinResponse{ Room: roomInfo, - Participant: p.ToProto(), + Participant: p.ToProto(true), OtherParticipants: otherParticipants, ServerVersion: version.Version, ServerRegion: region, diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index c92f83833..9d60e87a7 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -170,9 +170,9 @@ func TestOutOfOrderUpdates(t *testing.T) { p := newParticipantForTest("test") p.SetMetadata("initial metadata") sink := p.GetResponseSink().(*routingfakes.FakeMessageSink) - pi1 := p.ToProto() + pi1 := p.ToProto(true) p.SetMetadata("second update") - pi2 := p.ToProto() + pi2 := p.ToProto(true) require.Greater(t, pi2.Version, pi1.Version) @@ -208,7 +208,7 @@ func TestDisconnectTiming(t *testing.T) { func TestCorrectJoinedAt(t *testing.T) { p := newParticipantForTest("test") - info := p.ToProto() + info := p.ToProto(true) require.NotZero(t, info.JoinedAt) require.True(t, time.Now().Unix()-info.JoinedAt <= 1) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index b14a089af..bfff00f0e 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -244,7 +244,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants)) for _, p := range r.participants { if p.ID() != participant.ID() && !p.Hidden() { - otherParticipants = append(otherParticipants, p.ToProto()) + otherParticipants = append(otherParticipants, p.ToProto(true)) } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 134fab23c..f953f8b64 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -53,7 +53,7 @@ type Participant interface { ID() livekit.ParticipantID Identity() livekit.ParticipantIdentity - ToProto() *livekit.ParticipantInfo + ToProto(mediaTrackOnly bool) *livekit.ParticipantInfo SetMetadata(metadata string) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 1fc661037..1d468cbb9 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -548,9 +548,10 @@ type FakeLocalParticipant struct { arg2 livekit.TrackID arg3 bool } - ToProtoStub func() *livekit.ParticipantInfo + ToProtoStub func(bool) *livekit.ParticipantInfo toProtoMutex sync.RWMutex toProtoArgsForCall []struct { + arg1 bool } toProtoReturns struct { result1 *livekit.ParticipantInfo @@ -3559,17 +3560,18 @@ func (fake *FakeLocalParticipant) SubscriptionPermissionUpdateArgsForCall(i int) return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeLocalParticipant) ToProto() *livekit.ParticipantInfo { +func (fake *FakeLocalParticipant) ToProto(arg1 bool) *livekit.ParticipantInfo { fake.toProtoMutex.Lock() ret, specificReturn := fake.toProtoReturnsOnCall[len(fake.toProtoArgsForCall)] fake.toProtoArgsForCall = append(fake.toProtoArgsForCall, struct { - }{}) + arg1 bool + }{arg1}) stub := fake.ToProtoStub fakeReturns := fake.toProtoReturns - fake.recordInvocation("ToProto", []interface{}{}) + fake.recordInvocation("ToProto", []interface{}{arg1}) fake.toProtoMutex.Unlock() if stub != nil { - return stub() + return stub(arg1) } if specificReturn { return ret.result1 @@ -3583,12 +3585,19 @@ func (fake *FakeLocalParticipant) ToProtoCallCount() int { return len(fake.toProtoArgsForCall) } -func (fake *FakeLocalParticipant) ToProtoCalls(stub func() *livekit.ParticipantInfo) { +func (fake *FakeLocalParticipant) ToProtoCalls(stub func(bool) *livekit.ParticipantInfo) { fake.toProtoMutex.Lock() defer fake.toProtoMutex.Unlock() fake.ToProtoStub = stub } +func (fake *FakeLocalParticipant) ToProtoArgsForCall(i int) bool { + fake.toProtoMutex.RLock() + defer fake.toProtoMutex.RUnlock() + argsForCall := fake.toProtoArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ToProtoReturns(result1 *livekit.ParticipantInfo) { fake.toProtoMutex.Lock() defer fake.toProtoMutex.Unlock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index ebb4033cc..4cd71f9ae 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -141,9 +141,10 @@ type FakeParticipant struct { subscriptionPermissionReturnsOnCall map[int]struct { result1 *livekit.SubscriptionPermission } - ToProtoStub func() *livekit.ParticipantInfo + ToProtoStub func(bool) *livekit.ParticipantInfo toProtoMutex sync.RWMutex toProtoArgsForCall []struct { + arg1 bool } toProtoReturns struct { result1 *livekit.ParticipantInfo @@ -905,17 +906,18 @@ func (fake *FakeParticipant) SubscriptionPermissionReturnsOnCall(i int, result1 }{result1} } -func (fake *FakeParticipant) ToProto() *livekit.ParticipantInfo { +func (fake *FakeParticipant) ToProto(arg1 bool) *livekit.ParticipantInfo { fake.toProtoMutex.Lock() ret, specificReturn := fake.toProtoReturnsOnCall[len(fake.toProtoArgsForCall)] fake.toProtoArgsForCall = append(fake.toProtoArgsForCall, struct { - }{}) + arg1 bool + }{arg1}) stub := fake.ToProtoStub fakeReturns := fake.toProtoReturns - fake.recordInvocation("ToProto", []interface{}{}) + fake.recordInvocation("ToProto", []interface{}{arg1}) fake.toProtoMutex.Unlock() if stub != nil { - return stub() + return stub(arg1) } if specificReturn { return ret.result1 @@ -929,12 +931,19 @@ func (fake *FakeParticipant) ToProtoCallCount() int { return len(fake.toProtoArgsForCall) } -func (fake *FakeParticipant) ToProtoCalls(stub func() *livekit.ParticipantInfo) { +func (fake *FakeParticipant) ToProtoCalls(stub func(bool) *livekit.ParticipantInfo) { fake.toProtoMutex.Lock() defer fake.toProtoMutex.Unlock() fake.ToProtoStub = stub } +func (fake *FakeParticipant) ToProtoArgsForCall(i int) bool { + fake.toProtoMutex.RLock() + defer fake.toProtoMutex.RUnlock() + argsForCall := fake.toProtoArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeParticipant) ToProtoReturns(result1 *livekit.ParticipantInfo) { fake.toProtoMutex.Lock() defer fake.toProtoMutex.Unlock() diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 628e0c076..9616dfb04 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -48,7 +48,7 @@ func UnpackDataTrackLabel(packed string) (peerID livekit.ParticipantID, trackID func ToProtoParticipants(participants []types.LocalParticipant) []*livekit.ParticipantInfo { infos := make([]*livekit.ParticipantInfo, 0, len(participants)) for _, op := range participants { - infos = append(infos, op.ToProto()) + infos = append(infos, op.ToProto(true)) } return infos } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 170fc35fc..7edb044e1 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -270,7 +270,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam _ = participant.Close(true) return } - if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { + if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto(true)); err != nil { pLogger.Errorw("could not store participant", err) } @@ -287,7 +287,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam updateParticipantCount() clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} - r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(), pi.Client, clientMeta) + r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(true), pi.Client, clientMeta) participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) @@ -295,7 +295,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam // update room store with new numParticipants updateParticipantCount() - r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto()) + r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto(true)) room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions) }) @@ -359,7 +359,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom.OnParticipantChanged(func(p types.LocalParticipant) { if p.State() != livekit.ParticipantInfo_DISCONNECTED { - if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil { + if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto(true)); err != nil { logger.Errorw("could not handle participant change", err) } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c3746e2d7..7f4ac49a5 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -558,6 +558,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. // Otherwise, with transceiver re-use last frame from previous stream is held in the // display buffer and there could be a brief moment where the previous stream is displayed. + d.logger.Infow("close downtrack", "peerID", d.peerID, "trackID", d.id, "flushBlankFrame", flush) if flush { _ = d.writeBlankFrameRTP() } diff --git a/test/multinode_roomservice_test.go b/test/multinode_roomservice_test.go index 2c5dd604d..a631a1f9e 100644 --- a/test/multinode_roomservice_test.go +++ b/test/multinode_roomservice_test.go @@ -140,10 +140,10 @@ func TestMultiNodeMutePublishedTrack(t *testing.T) { Identity: identity, }) require.NoError(t, err) - if len(res.Tracks) == 3 { + if len(res.Tracks) == 2 { return "" } else { - return fmt.Sprintf("expected three tracks to be published, actual: %d", len(res.Tracks)) + return fmt.Sprintf("expected 2 tracks to be published, actual: %d", len(res.Tracks)) } }) diff --git a/test/singlenode_test.go b/test/singlenode_test.go index ebcbf6808..17dae069d 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -420,7 +420,7 @@ func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) { if pubRemote == nil { return "could not find remote publisher" } - if len(pubRemote.Tracks) != 3 { + if len(pubRemote.Tracks) != 2 { return "did not receive metadata for published tracks" } return ""