From bcf9fe3f0f31a68a76517d63e560a9dbcc174f15 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 28 Jan 2024 22:10:35 +0530 Subject: [PATCH] Use a participant worker queue in room. (#2420) * Use a participant worker queue in room. Removes selectively needing to call things in goroutine from participant. Also, a bit of drive-by clean up. * spelling * prevent race * don't need to remove in goroutine as it is already running in the worker * worker will get cleaned up in state change callback * create participant worker only if not created already * ref count participant worker * maintain participant list * clean up oldState --- pkg/rtc/mediatracksubscriptions.go | 2 +- pkg/rtc/participant.go | 120 ++++------ pkg/rtc/room.go | 226 ++++++++++++------ pkg/rtc/room_test.go | 14 +- pkg/rtc/subscriptionmanager.go | 2 +- pkg/rtc/testutils.go | 4 + pkg/rtc/transport.go | 2 - pkg/rtc/transportmanager.go | 6 +- pkg/rtc/types/interfaces.go | 6 +- .../typesfakes/fake_local_participant.go | 124 ++++------ pkg/rtc/types/typesfakes/fake_participant.go | 30 --- pkg/rtc/uptrackmanager.go | 3 - 12 files changed, 265 insertions(+), 274 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 0a619bb99..e074c2577 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -196,7 +196,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * }) downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { - sub.OnReceiverReport(dt, report) + sub.HandleReceiverReport(dt, report) }) var transceiver *webrtc.RTPTransceiver diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ff6758d80..0aa98e198 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -191,7 +191,6 @@ type ParticipantImpl struct { lastRTT uint32 lock utils.RWMutex - once sync.Once dirty atomic.Bool version atomic.Uint32 @@ -201,7 +200,7 @@ type ParticipantImpl struct { onTrackPublished func(types.LocalParticipant, types.MediaTrack) onTrackUpdated func(types.LocalParticipant, types.MediaTrack) onTrackUnpublished func(types.LocalParticipant, types.MediaTrack) - onStateChange func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) + onStateChange func(p types.LocalParticipant, state livekit.ParticipantInfo_State) onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState) onParticipantUpdate func(types.LocalParticipant) onDataPacket func(types.LocalParticipant, *livekit.DataPacket) @@ -525,18 +524,36 @@ func (p *ParticipantImpl) OnTrackPublished(callback func(types.LocalParticipant, p.lock.Unlock() } +func (p *ParticipantImpl) getOnTrackPublished() func(types.LocalParticipant, types.MediaTrack) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onTrackPublished +} + func (p *ParticipantImpl) OnTrackUnpublished(callback func(types.LocalParticipant, types.MediaTrack)) { p.lock.Lock() p.onTrackUnpublished = callback p.lock.Unlock() } -func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) { +func (p *ParticipantImpl) getOnTrackUnpublished() func(types.LocalParticipant, types.MediaTrack) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onTrackUnpublished +} + +func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, state livekit.ParticipantInfo_State)) { p.lock.Lock() p.onStateChange = callback p.lock.Unlock() } +func (p *ParticipantImpl) getOnStateChange() func(p types.LocalParticipant, state livekit.ParticipantInfo_State) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onStateChange +} + func (p *ParticipantImpl) OnMigrateStateChange(callback func(p types.LocalParticipant, state types.MigrateState)) { p.lock.Lock() p.onMigrateStateChange = callback @@ -546,7 +563,6 @@ func (p *ParticipantImpl) OnMigrateStateChange(callback func(p types.LocalPartic func (p *ParticipantImpl) getOnMigrateStateChange() func(p types.LocalParticipant, state types.MigrateState) { p.lock.RLock() defer p.lock.RUnlock() - return p.onMigrateStateChange } @@ -556,6 +572,12 @@ func (p *ParticipantImpl) OnTrackUpdated(callback func(types.LocalParticipant, t p.lock.Unlock() } +func (p *ParticipantImpl) getOnTrackUpdated() func(types.LocalParticipant, types.MediaTrack) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onTrackUpdated +} + func (p *ParticipantImpl) OnParticipantUpdate(callback func(types.LocalParticipant)) { p.lock.Lock() p.onParticipantUpdate = callback @@ -667,13 +689,9 @@ func (p *ParticipantImpl) handleMigrateTracks() { } p.pendingTracksLock.Unlock() - // launch callbacks in goroutine since they could block. - // callbacks handle webhooks as well as db persistence - go func() { - for _, t := range addedTracks { - p.handleTrackPublished(t) - } - }() + for _, t := range addedTracks { + p.handleTrackPublished(t) + } } func (p *ParticipantImpl) removePendingMigratedTrack(mt *MediaTrack) { @@ -728,12 +746,6 @@ func (p *ParticipantImpl) SetMigrateInfo( p.TransportManager.SetMigrateInfo(previousOffer, previousAnswer, dataChannels) } -func (p *ParticipantImpl) Start() { - p.once.Do(func() { - p.UpTrackManager.Start() - }) -} - func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error { if p.isClosed.Swap(true) { // already closed @@ -916,7 +928,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) { } if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil { - go onMigrateStateChange(p, s) + onMigrateStateChange(p, s) } } @@ -1146,7 +1158,6 @@ func (p *ParticipantImpl) setupTransportManager() error { SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(), Config: p.params.Config, ProtocolVersion: p.params.ProtocolVersion, - Telemetry: p.params.Telemetry, CongestionControlConfig: p.params.CongestionControlConfig, EnabledPublishCodecs: p.enabledPublishCodecs, EnabledSubscribeCodecs: p.enabledSubscribeCodecs, @@ -1225,12 +1236,8 @@ func (p *ParticipantImpl) setupUpTrackManager() { }) p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) { - p.lock.RLock() - onTrackUpdated := p.onTrackUpdated - p.lock.RUnlock() - p.dirty.Store(true) - if onTrackUpdated != nil { + if onTrackUpdated := p.getOnTrackUpdated(); onTrackUpdated != nil { onTrackUpdated(p, track) } }) @@ -1261,26 +1268,16 @@ func (p *ParticipantImpl) setupParticipantTrafficLoad() { } func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { - oldState := p.State() - if !(p.state.Swap(state) != state) { + oldState := p.state.Swap(state).(livekit.ParticipantInfo_State) + if oldState == state { return } p.params.Logger.Debugw("updating participant state", "state", state.String()) p.dirty.Store(true) - p.lock.RLock() - onStateChange := p.onStateChange - p.lock.RUnlock() - if onStateChange != nil { - go func() { - defer func() { - if r := Recover(p.GetLogger()); r != nil { - os.Exit(1) - } - }() - onStateChange(p, oldState) - }() + if onStateChange := p.getOnStateChange(); onStateChange != nil { + onStateChange(p, state) } } @@ -1367,10 +1364,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w ) if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() { - p.lock.RLock() - onTrackUpdated := p.onTrackUpdated - p.lock.RUnlock() - if onTrackUpdated != nil { + if onTrackUpdated := p.getOnTrackUpdated(); onTrackUpdated != nil { onTrackUpdated(p, publishedTrack) } } @@ -1885,14 +1879,12 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei } if newTrack { - go func() { - p.pubLogger.Debugw( - "track published", - "trackID", mt.ID(), - "track", logger.Proto(mt.ToProto()), - ) - p.handleTrackPublished(mt) - }() + p.pubLogger.Debugw( + "track published", + "trackID", mt.ID(), + "track", logger.Proto(mt.ToProto()), + ) + p.handleTrackPublished(mt) } return mt, newTrack @@ -2000,15 +1992,6 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv p.supervisor.ClearPublishedTrack(trackID, mt) } - // not logged when closing - p.params.Telemetry.TrackUnpublished( - context.Background(), - p.ID(), - p.Identity(), - mt.ToProto(), - !p.IsClosed(), - ) - // re-use Track sid p.pendingTracksLock.Lock() if pti := p.pendingTracks[signalCid]; pti != nil { @@ -2023,10 +2006,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv if !p.IsClosed() { // unpublished events aren't necessary when participant is closed p.pubLogger.Debugw("track unpublished", "trackID", ti.Sid, "track", logger.Proto(ti)) - p.lock.RLock() - onTrackUnpublished := p.onTrackUnpublished - p.lock.RUnlock() - if onTrackUnpublished != nil { + if onTrackUnpublished := p.getOnTrackUnpublished(); onTrackUnpublished != nil { onTrackUnpublished(p, mt) } } @@ -2036,22 +2016,10 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv } func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { - p.lock.RLock() - onTrackPublished := p.onTrackPublished - p.lock.RUnlock() - if onTrackPublished != nil { + if onTrackPublished := p.getOnTrackPublished(); onTrackPublished != nil { onTrackPublished(p, track) } - // send webhook after callbacks are complete, persistence and state handling happens - // in `onTrackPublished` cb - p.params.Telemetry.TrackPublished( - context.Background(), - p.ID(), - p.Identity(), - track.ToProto(), - ) - p.pendingTracksLock.Lock() delete(p.pendingPublishingTracks, track.ID()) p.pendingTracksLock.Unlock() diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1bb1f671c..c6c7609f5 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -73,6 +73,11 @@ type disconnectSignalOnResumeNoMessages struct { closedCount int } +type participantWorker struct { + eventsQueue *sutils.OpsQueue + participants []types.LocalParticipant +} + type Room struct { lock sync.RWMutex @@ -94,6 +99,7 @@ type Room struct { // map of identity -> Participant participants map[livekit.ParticipantIdentity]types.LocalParticipant + participantWorkers map[livekit.ParticipantIdentity]*participantWorker participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource hasPublished map[livekit.ParticipantIdentity]bool @@ -151,6 +157,7 @@ func NewRoom( trackManager: NewRoomTrackManager(), serverInfo: serverInfo, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), + participantWorkers: make(map[livekit.ParticipantIdentity]*participantWorker), participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource), hasPublished: make(map[livekit.ParticipantIdentity]bool), @@ -321,7 +328,6 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me if r.participants[participant.Identity()] != nil { return ErrAlreadyJoined } - if r.protoRoom.MaxParticipants > 0 && !participant.IsRecorder() { numParticipants := uint32(0) for _, p := range r.participants { @@ -338,84 +344,102 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me r.joinedAt.Store(time.Now().Unix()) } - // it's important to set this before connection, we don't want to miss out on any published tracks - participant.OnTrackPublished(r.onTrackPublished) - participant.OnStateChange(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) { - if r.onParticipantChanged != nil { - r.onParticipantChanged(participant) - } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + pw := r.addParticipantWorkerLocked(participant) - state := p.State() - if state == livekit.ParticipantInfo_ACTIVE { - // subscribe participant to existing published tracks - r.subscribeToExistingTracks(p) - - // start the workers once connectivity is established - p.Start() - - meta := &livekit.AnalyticsClientMeta{ - ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()), + participant.OnStateChange(func(p types.LocalParticipant, state livekit.ParticipantInfo_State) { + pw.eventsQueue.Enqueue(func() { + if r.onParticipantChanged != nil { + r.onParticipantChanged(p) } - cds := participant.GetICEConnectionDetails() - for _, cd := range cds { - if cd.Type != types.ICEConnectionTypeUnknown { - meta.ConnectionType = string(cd.Type) - break + r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + + if state == livekit.ParticipantInfo_ACTIVE { + // subscribe participant to existing published tracks + r.subscribeToExistingTracks(p) + + meta := &livekit.AnalyticsClientMeta{ + ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()), } - } - r.telemetry.ParticipantActive(context.Background(), - r.ToProto(), - p.ToProto(), - meta, - false, - ) + cds := p.GetICEConnectionDetails() + for _, cd := range cds { + if cd.Type != types.ICEConnectionTypeUnknown { + meta.ConnectionType = string(cd.Type) + break + } + } + r.telemetry.ParticipantActive(context.Background(), + r.ToProto(), + p.ToProto(), + meta, + false, + ) - p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...) - } else if state == livekit.ParticipantInfo_DISCONNECTED { - // remove participant from room - go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) - } + p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...) + } else if state == livekit.ParticipantInfo_DISCONNECTED { + // remove participant from room + r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected) + } + }) + }) + // it's important to set this before connection, we don't want to miss out on any published tracks + participant.OnTrackPublished(func(p types.LocalParticipant, t types.MediaTrack) { + pw.eventsQueue.Enqueue(func() { + r.onTrackPublished(p, t) + }) + }) + participant.OnTrackUpdated(func(p types.LocalParticipant, t types.MediaTrack) { + pw.eventsQueue.Enqueue(func() { + r.onTrackUpdated(p, t) + }) + }) + participant.OnTrackUnpublished(func(p types.LocalParticipant, t types.MediaTrack) { + pw.eventsQueue.Enqueue(func() { + r.onTrackUnpublished(p, t) + }) + }) + participant.OnParticipantUpdate(func(p types.LocalParticipant) { + pw.eventsQueue.Enqueue(func() { + r.onParticipantUpdate(p) + }) }) - participant.OnTrackUpdated(r.onTrackUpdated) - participant.OnTrackUnpublished(r.onTrackUnpublished) - participant.OnParticipantUpdate(r.onParticipantUpdate) participant.OnDataPacket(r.onDataPacket) participant.OnSubscribeStatusChanged(func(publisherID livekit.ParticipantID, subscribed bool) { - if subscribed { - pub := r.GetParticipantByID(publisherID) - if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE { - // when a participant subscribes to another participant, - // send speaker update if the subscribed to participant is active. - level, active := pub.GetAudioLevel() - if active { - _ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{ - { - Sid: string(pub.ID()), - Level: float32(level), - Active: active, - }, - }, false) - } + pw.eventsQueue.Enqueue(func() { + if subscribed { + pub := r.GetParticipantByID(publisherID) + if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE { + // when a participant subscribes to another participant, + // send speaker update if the subscribed to participant is active. + level, active := pub.GetAudioLevel() + if active { + _ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{ + { + Sid: string(pub.ID()), + Level: float32(level), + Active: active, + }, + }, false) + } - if cq := pub.GetConnectionQuality(); cq != nil { - update := &livekit.ConnectionQualityUpdate{} - update.Updates = append(update.Updates, cq) - _ = participant.SendConnectionQualityUpdate(update) + if cq := pub.GetConnectionQuality(); cq != nil { + update := &livekit.ConnectionQualityUpdate{} + update.Updates = append(update.Updates, cq) + _ = participant.SendConnectionQualityUpdate(update) + } } + } else { + // no longer subscribed to the publisher, clear speaker status + _ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{ + { + Sid: string(publisherID), + Level: 0, + Active: false, + }, + }, true) } - } else { - // no longer subscribed to the publisher, clear speaker status - _ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{ - { - Sid: string(publisherID), - Level: 0, - Active: false, - }, - }, true) - } - + }) }) + r.Logger.Debugw("new participant joined", "pID", participant.ID(), "participant", participant.Identity(), @@ -558,6 +582,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek } delete(r.participants, identity) + r.removeParticipantWorkerLocked(p) delete(r.participantOpts, identity) delete(r.participantRequestSources, identity) delete(r.hasPublished, identity) @@ -784,11 +809,14 @@ func (r *Room) Close() { } close(r.closed) r.lock.Unlock() + r.Logger.Infow("closing room") for _, p := range r.GetParticipants() { _ = p.Close(true, types.ParticipantCloseReasonRoomClose, false) } + r.protoProxy.Stop() + if r.onClose != nil { r.onClose() } @@ -1038,6 +1066,14 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. } }() } + + // send webhook after callbacks are complete, i.e. after persistence and state handling + r.telemetry.TrackPublished( + context.Background(), + participant.ID(), + participant.Identity(), + track.ToProto(), + ) } func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { @@ -1049,6 +1085,14 @@ func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { } func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) { + r.telemetry.TrackUnpublished( + context.Background(), + p.ID(), + p.Identity(), + track.ToProto(), + !p.IsClosed(), + ) + r.trackManager.RemoveTrack(track) if !p.IsClosed() { r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) @@ -1452,6 +1496,52 @@ func (r *Room) DebugInfo() map[string]interface{} { return info } +func (r *Room) addParticipantWorkerLocked(p types.LocalParticipant) *participantWorker { + identity := p.Identity() + pw := r.participantWorkers[identity] + if pw != nil { + found := false + for _, participant := range pw.participants { + if p == participant { + found = true + break + } + } + if !found { + pw.participants = append(pw.participants, p) + } + return pw + } + + pw = &participantWorker{ + eventsQueue: sutils.NewOpsQueue(fmt.Sprintf("participant-worker-%s-%s", r.Name(), identity), 0, true), + participants: []types.LocalParticipant{p}, + } + pw.eventsQueue.Start() + r.participantWorkers[identity] = pw + return pw +} + +func (r *Room) removeParticipantWorkerLocked(p types.LocalParticipant) { + identity := p.Identity() + if pw, ok := r.participantWorkers[identity]; ok { + n := len(pw.participants) + for idx, participant := range pw.participants { + if p == participant { + pw.participants[idx] = pw.participants[n-1] + pw.participants = pw.participants[:n-1] + break + } + } + if len(pw.participants) == 0 { + pw.eventsQueue.Stop() + delete(r.participantWorkers, identity) + } + } +} + +// ------------------------------------------------------------ + func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) { dest := dp.GetUser().GetDestinationSids() var dpData []byte diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 81dfe90c8..4990b64d4 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -110,8 +110,7 @@ func TestRoomJoin(t *testing.T) { stateChangeCB := p.OnStateChangeArgsForCall(0) require.NotNil(t, stateChangeCB) - p.StateReturns(livekit.ParticipantInfo_ACTIVE) - stateChangeCB(p, livekit.ParticipantInfo_JOINED) + stateChangeCB(p, livekit.ParticipantInfo_ACTIVE) // it should become a subscriber when connectivity changes numTracks := 0 @@ -122,7 +121,7 @@ func TestRoomJoin(t *testing.T) { numTracks += len(op.GetPublishedTracks()) } - require.Equal(t, numTracks, p.SubscribeToTrackCallCount()) + require.Eventually(t, func() bool { return p.SubscribeToTrackCallCount() == numTracks }, 5*time.Second, 10*time.Millisecond) }) t.Run("participant state change is broadcasted to others", func(t *testing.T) { @@ -218,7 +217,7 @@ func TestParticipantUpdate(t *testing.T) { expected += 1 } fp := p.(*typesfakes.FakeLocalParticipant) - require.Equal(t, expected, fp.SendParticipantUpdateCallCount()) + require.Eventually(t, func() bool { return fp.SendParticipantUpdateCallCount() == expected }, 5*time.Second, 10*time.Millisecond) } }) } @@ -424,8 +423,8 @@ func TestNewTrack(t *testing.T) { require.NotNil(t, trackCB) trackCB(pub, track) // only p1 should've been subscribed to + require.Eventually(t, func() bool { return p1.SubscribeToTrackCallCount() == 1 }, 5*time.Second, 10*time.Millisecond) require.Equal(t, 0, p0.SubscribeToTrackCallCount()) - require.Equal(t, 1, p1.SubscribeToTrackCallCount()) }) } @@ -679,10 +678,9 @@ func TestHiddenParticipants(t *testing.T) { stateChangeCB := hidden.OnStateChangeArgsForCall(0) require.NotNil(t, stateChangeCB) - hidden.StateReturns(livekit.ParticipantInfo_ACTIVE) - stateChangeCB(hidden, livekit.ParticipantInfo_JOINED) + stateChangeCB(hidden, livekit.ParticipantInfo_ACTIVE) - require.Equal(t, 2, hidden.SubscribeToTrackCallCount()) + require.Eventually(t, func() bool { return hidden.SubscribeToTrackCallCount() == 2 }, 5*time.Second, 10*time.Millisecond) }) } diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 5cbbb216f..c53791aa5 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -573,7 +573,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { m.lock.Unlock() if changedCB != nil && firstSubscribe { - go changedCB(publisherID, true) + changedCB(publisherID, true) } return nil } diff --git a/pkg/rtc/testutils.go b/pkg/rtc/testutils.go index f80230f0c..b63c372a8 100644 --- a/pkg/rtc/testutils.go +++ b/pkg/rtc/testutils.go @@ -83,5 +83,9 @@ func NewMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrac t.IDReturns(livekit.TrackID(utils.NewGuid(utils.TrackPrefix))) t.KindReturns(kind) t.NameReturns(name) + t.ToProtoReturns(&livekit.TrackInfo{ + Type: kind, + Name: name, + }) return t } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 953b14518..aa416b50c 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -39,7 +39,6 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/pacer" "github.com/livekit/livekit-server/pkg/sfu/rtpextension" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/utils" sutils "github.com/livekit/livekit-server/pkg/utils" @@ -238,7 +237,6 @@ type TransportParams struct { Config *WebRTCConfig DirectionConfig DirectionConfig CongestionControlConfig config.CongestionControlConfig - Telemetry telemetry.TelemetryService EnabledCodecs []*livekit.Codec Logger logger.Logger Transport livekit.SignalTarget diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index ec3b10600..7207ace3e 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -33,7 +33,6 @@ import ( "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/pacer" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) @@ -54,7 +53,6 @@ type TransportManagerParams struct { SubscriberAsPrimary bool Config *WebRTCConfig ProtocolVersion types.ProtocolVersion - Telemetry telemetry.TelemetryService CongestionControlConfig config.CongestionControlConfig EnabledSubscribeCodecs []*livekit.Codec EnabledPublishCodecs []*livekit.Codec @@ -119,7 +117,6 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro Config: params.Config, DirectionConfig: params.Config.Publisher, CongestionControlConfig: params.CongestionControlConfig, - Telemetry: params.Telemetry, EnabledCodecs: params.EnabledPublishCodecs, Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER), SimTracks: params.SimTracks, @@ -152,7 +149,6 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro Config: params.Config, DirectionConfig: params.Config.Subscriber, CongestionControlConfig: params.CongestionControlConfig, - Telemetry: params.Telemetry, EnabledCodecs: params.EnabledSubscribeCodecs, Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER), ClientInfo: params.ClientInfo, @@ -722,7 +718,7 @@ func (t *TransportManager) ProcessPendingPublisherDataChannels() { } } -func (t *TransportManager) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { +func (t *TransportManager) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { t.mediaLossProxy.HandleMaxLossFeedback(dt, report) } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 466615c8a..4cbcb117a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -273,7 +273,6 @@ type Participant interface { IsRecorder() bool IsAgent() bool - Start() Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion) @@ -379,7 +378,7 @@ type LocalParticipant interface { IssueFullReconnect(reason ParticipantCloseReason) // callbacks - OnStateChange(func(p LocalParticipant, oldState livekit.ParticipantInfo_State)) + OnStateChange(func(p LocalParticipant, state livekit.ParticipantInfo_State)) OnMigrateStateChange(func(p LocalParticipant, migrateState MigrateState)) // OnTrackPublished - remote added a track OnTrackPublished(func(LocalParticipant, MediaTrack)) @@ -393,9 +392,10 @@ type LocalParticipant interface { OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) - OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) OnTrafficLoad(callback func(trafficLoad *TrafficLoad)) + HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + // session migration MaybeStartMigration(force bool, onStart func()) bool SetMigrateState(s MigrateState) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 756099d2c..32c1c2869 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -355,6 +355,12 @@ type FakeLocalParticipant struct { handleOfferArgsForCall []struct { arg1 webrtc.SessionDescription } + HandleReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport) + handleReceiverReportMutex sync.RWMutex + handleReceiverReportArgsForCall []struct { + arg1 *sfu.DownTrack + arg2 *rtcp.ReceiverReport + } HandleReconnectAndSendResponseStub func(livekit.ReconnectReason, *livekit.ReconnectResponse) error handleReconnectAndSendResponseMutex sync.RWMutex handleReconnectAndSendResponseArgsForCall []struct { @@ -571,16 +577,10 @@ type FakeLocalParticipant struct { onParticipantUpdateArgsForCall []struct { arg1 func(types.LocalParticipant) } - OnReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport) - onReceiverReportMutex sync.RWMutex - onReceiverReportArgsForCall []struct { - arg1 *sfu.DownTrack - arg2 *rtcp.ReceiverReport - } - OnStateChangeStub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) + OnStateChangeStub func(func(p types.LocalParticipant, state livekit.ParticipantInfo_State)) onStateChangeMutex sync.RWMutex onStateChangeArgsForCall []struct { - arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) + arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State) } OnSubscribeStatusChangedStub func(func(publisherID livekit.ParticipantID, subscribed bool)) onSubscribeStatusChangedMutex sync.RWMutex @@ -791,10 +791,6 @@ type FakeLocalParticipant struct { setTrackMutedReturnsOnCall map[int]struct { result1 *livekit.TrackInfo } - StartStub func() - startMutex sync.RWMutex - startArgsForCall []struct { - } StateStub func() livekit.ParticipantInfo_State stateMutex sync.RWMutex stateArgsForCall []struct { @@ -2740,6 +2736,39 @@ func (fake *FakeLocalParticipant) HandleOfferArgsForCall(i int) webrtc.SessionDe return argsForCall.arg1 } +func (fake *FakeLocalParticipant) HandleReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) { + fake.handleReceiverReportMutex.Lock() + fake.handleReceiverReportArgsForCall = append(fake.handleReceiverReportArgsForCall, struct { + arg1 *sfu.DownTrack + arg2 *rtcp.ReceiverReport + }{arg1, arg2}) + stub := fake.HandleReceiverReportStub + fake.recordInvocation("HandleReceiverReport", []interface{}{arg1, arg2}) + fake.handleReceiverReportMutex.Unlock() + if stub != nil { + fake.HandleReceiverReportStub(arg1, arg2) + } +} + +func (fake *FakeLocalParticipant) HandleReceiverReportCallCount() int { + fake.handleReceiverReportMutex.RLock() + defer fake.handleReceiverReportMutex.RUnlock() + return len(fake.handleReceiverReportArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) { + fake.handleReceiverReportMutex.Lock() + defer fake.handleReceiverReportMutex.Unlock() + fake.HandleReceiverReportStub = stub +} + +func (fake *FakeLocalParticipant) HandleReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) { + fake.handleReceiverReportMutex.RLock() + defer fake.handleReceiverReportMutex.RUnlock() + argsForCall := fake.handleReceiverReportArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalParticipant) HandleReconnectAndSendResponse(arg1 livekit.ReconnectReason, arg2 *livekit.ReconnectResponse) error { fake.handleReconnectAndSendResponseMutex.Lock() ret, specificReturn := fake.handleReconnectAndSendResponseReturnsOnCall[len(fake.handleReconnectAndSendResponseArgsForCall)] @@ -3935,43 +3964,10 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ return argsForCall.arg1 } -func (fake *FakeLocalParticipant) OnReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) { - fake.onReceiverReportMutex.Lock() - fake.onReceiverReportArgsForCall = append(fake.onReceiverReportArgsForCall, struct { - arg1 *sfu.DownTrack - arg2 *rtcp.ReceiverReport - }{arg1, arg2}) - stub := fake.OnReceiverReportStub - fake.recordInvocation("OnReceiverReport", []interface{}{arg1, arg2}) - fake.onReceiverReportMutex.Unlock() - if stub != nil { - fake.OnReceiverReportStub(arg1, arg2) - } -} - -func (fake *FakeLocalParticipant) OnReceiverReportCallCount() int { - fake.onReceiverReportMutex.RLock() - defer fake.onReceiverReportMutex.RUnlock() - return len(fake.onReceiverReportArgsForCall) -} - -func (fake *FakeLocalParticipant) OnReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) { - fake.onReceiverReportMutex.Lock() - defer fake.onReceiverReportMutex.Unlock() - fake.OnReceiverReportStub = stub -} - -func (fake *FakeLocalParticipant) OnReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) { - fake.onReceiverReportMutex.RLock() - defer fake.onReceiverReportMutex.RUnlock() - argsForCall := fake.onReceiverReportArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) { +func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State)) { fake.onStateChangeMutex.Lock() fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct { - arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) + arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State) }{arg1}) stub := fake.OnStateChangeStub fake.recordInvocation("OnStateChange", []interface{}{arg1}) @@ -3987,13 +3983,13 @@ func (fake *FakeLocalParticipant) OnStateChangeCallCount() int { return len(fake.onStateChangeArgsForCall) } -func (fake *FakeLocalParticipant) OnStateChangeCalls(stub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State))) { +func (fake *FakeLocalParticipant) OnStateChangeCalls(stub func(func(p types.LocalParticipant, state livekit.ParticipantInfo_State))) { fake.onStateChangeMutex.Lock() defer fake.onStateChangeMutex.Unlock() fake.OnStateChangeStub = stub } -func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) { +func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.LocalParticipant, state livekit.ParticipantInfo_State) { fake.onStateChangeMutex.RLock() defer fake.onStateChangeMutex.RUnlock() argsForCall := fake.onStateChangeArgsForCall[i] @@ -5209,30 +5205,6 @@ func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *liv }{result1} } -func (fake *FakeLocalParticipant) Start() { - fake.startMutex.Lock() - fake.startArgsForCall = append(fake.startArgsForCall, struct { - }{}) - stub := fake.StartStub - fake.recordInvocation("Start", []interface{}{}) - fake.startMutex.Unlock() - if stub != nil { - fake.StartStub() - } -} - -func (fake *FakeLocalParticipant) StartCallCount() int { - fake.startMutex.RLock() - defer fake.startMutex.RUnlock() - return len(fake.startArgsForCall) -} - -func (fake *FakeLocalParticipant) StartCalls(stub func()) { - fake.startMutex.Lock() - defer fake.startMutex.Unlock() - fake.StartStub = stub -} - func (fake *FakeLocalParticipant) State() livekit.ParticipantInfo_State { fake.stateMutex.Lock() ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)] @@ -6282,6 +6254,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.handleAnswerMutex.RUnlock() fake.handleOfferMutex.RLock() defer fake.handleOfferMutex.RUnlock() + fake.handleReceiverReportMutex.RLock() + defer fake.handleReceiverReportMutex.RUnlock() fake.handleReconnectAndSendResponseMutex.RLock() defer fake.handleReconnectAndSendResponseMutex.RUnlock() fake.handleSignalSourceCloseMutex.RLock() @@ -6334,8 +6308,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onMigrateStateChangeMutex.RUnlock() fake.onParticipantUpdateMutex.RLock() defer fake.onParticipantUpdateMutex.RUnlock() - fake.onReceiverReportMutex.RLock() - defer fake.onReceiverReportMutex.RUnlock() fake.onStateChangeMutex.RLock() defer fake.onStateChangeMutex.RUnlock() fake.onSubscribeStatusChangedMutex.RLock() @@ -6392,8 +6364,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setSubscriberChannelCapacityMutex.RUnlock() fake.setTrackMutedMutex.RLock() defer fake.setTrackMutedMutex.RUnlock() - fake.startMutex.RLock() - defer fake.startMutex.RUnlock() fake.stateMutex.RLock() defer fake.stateMutex.RUnlock() fake.subscribeToTrackMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 3f46bdbba..0c5c3929a 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -165,10 +165,6 @@ type FakeParticipant struct { setNameArgsForCall []struct { arg1 string } - StartStub func() - startMutex sync.RWMutex - startArgsForCall []struct { - } StateStub func() livekit.ParticipantInfo_State stateMutex sync.RWMutex stateArgsForCall []struct { @@ -1047,30 +1043,6 @@ func (fake *FakeParticipant) SetNameArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeParticipant) Start() { - fake.startMutex.Lock() - fake.startArgsForCall = append(fake.startArgsForCall, struct { - }{}) - stub := fake.StartStub - fake.recordInvocation("Start", []interface{}{}) - fake.startMutex.Unlock() - if stub != nil { - fake.StartStub() - } -} - -func (fake *FakeParticipant) StartCallCount() int { - fake.startMutex.RLock() - defer fake.startMutex.RUnlock() - return len(fake.startArgsForCall) -} - -func (fake *FakeParticipant) StartCalls(stub func()) { - fake.startMutex.Lock() - defer fake.startMutex.Unlock() - fake.StartStub = stub -} - func (fake *FakeParticipant) State() livekit.ParticipantInfo_State { fake.stateMutex.Lock() ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)] @@ -1393,8 +1365,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.setMetadataMutex.RUnlock() fake.setNameMutex.RLock() defer fake.setNameMutex.RUnlock() - fake.startMutex.RLock() - defer fake.startMutex.RUnlock() fake.stateMutex.RLock() defer fake.stateMutex.RUnlock() fake.subscriptionPermissionMutex.RLock() diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index ce5177661..ed01e1f79 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -62,9 +62,6 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager { } } -func (u *UpTrackManager) Start() { -} - func (u *UpTrackManager) Close(willBeResumed bool) { u.lock.Lock() u.closed = true