Use a participant worker queue in room. (#2420)

* Use a participant worker queue in room.

Removes selectively needing to call things in goroutine from
participant.

Also, a bit of drive-by clean up.

* spelling

* prevent race

* don't need to remove in goroutine as it is already running in the worker

* worker will get cleaned up in state change callback

* create participant worker only if not created already

* ref count participant worker

* maintain participant list

* clean up oldState
This commit is contained in:
Raja Subramanian
2024-01-28 22:10:35 +05:30
committed by GitHub
parent 38352b6125
commit bcf9fe3f0f
12 changed files with 265 additions and 274 deletions
+1 -1
View File
@@ -196,7 +196,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
})
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
sub.OnReceiverReport(dt, report)
sub.HandleReceiverReport(dt, report)
})
var transceiver *webrtc.RTPTransceiver
+44 -76
View File
@@ -191,7 +191,6 @@ type ParticipantImpl struct {
lastRTT uint32
lock utils.RWMutex
once sync.Once
dirty atomic.Bool
version atomic.Uint32
@@ -201,7 +200,7 @@ type ParticipantImpl struct {
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
onTrackUpdated func(types.LocalParticipant, types.MediaTrack)
onTrackUnpublished func(types.LocalParticipant, types.MediaTrack)
onStateChange func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
onStateChange func(p types.LocalParticipant, state livekit.ParticipantInfo_State)
onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState)
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, *livekit.DataPacket)
@@ -525,18 +524,36 @@ func (p *ParticipantImpl) OnTrackPublished(callback func(types.LocalParticipant,
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnTrackPublished() func(types.LocalParticipant, types.MediaTrack) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrackPublished
}
func (p *ParticipantImpl) OnTrackUnpublished(callback func(types.LocalParticipant, types.MediaTrack)) {
p.lock.Lock()
p.onTrackUnpublished = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) {
func (p *ParticipantImpl) getOnTrackUnpublished() func(types.LocalParticipant, types.MediaTrack) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrackUnpublished
}
func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, state livekit.ParticipantInfo_State)) {
p.lock.Lock()
p.onStateChange = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnStateChange() func(p types.LocalParticipant, state livekit.ParticipantInfo_State) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onStateChange
}
func (p *ParticipantImpl) OnMigrateStateChange(callback func(p types.LocalParticipant, state types.MigrateState)) {
p.lock.Lock()
p.onMigrateStateChange = callback
@@ -546,7 +563,6 @@ func (p *ParticipantImpl) OnMigrateStateChange(callback func(p types.LocalPartic
func (p *ParticipantImpl) getOnMigrateStateChange() func(p types.LocalParticipant, state types.MigrateState) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onMigrateStateChange
}
@@ -556,6 +572,12 @@ func (p *ParticipantImpl) OnTrackUpdated(callback func(types.LocalParticipant, t
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnTrackUpdated() func(types.LocalParticipant, types.MediaTrack) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrackUpdated
}
func (p *ParticipantImpl) OnParticipantUpdate(callback func(types.LocalParticipant)) {
p.lock.Lock()
p.onParticipantUpdate = callback
@@ -667,13 +689,9 @@ func (p *ParticipantImpl) handleMigrateTracks() {
}
p.pendingTracksLock.Unlock()
// launch callbacks in goroutine since they could block.
// callbacks handle webhooks as well as db persistence
go func() {
for _, t := range addedTracks {
p.handleTrackPublished(t)
}
}()
for _, t := range addedTracks {
p.handleTrackPublished(t)
}
}
func (p *ParticipantImpl) removePendingMigratedTrack(mt *MediaTrack) {
@@ -728,12 +746,6 @@ func (p *ParticipantImpl) SetMigrateInfo(
p.TransportManager.SetMigrateInfo(previousOffer, previousAnswer, dataChannels)
}
func (p *ParticipantImpl) Start() {
p.once.Do(func() {
p.UpTrackManager.Start()
})
}
func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error {
if p.isClosed.Swap(true) {
// already closed
@@ -916,7 +928,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
}
if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil {
go onMigrateStateChange(p, s)
onMigrateStateChange(p, s)
}
}
@@ -1146,7 +1158,6 @@ func (p *ParticipantImpl) setupTransportManager() error {
SubscriberAsPrimary: p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe(),
Config: p.params.Config,
ProtocolVersion: p.params.ProtocolVersion,
Telemetry: p.params.Telemetry,
CongestionControlConfig: p.params.CongestionControlConfig,
EnabledPublishCodecs: p.enabledPublishCodecs,
EnabledSubscribeCodecs: p.enabledSubscribeCodecs,
@@ -1225,12 +1236,8 @@ func (p *ParticipantImpl) setupUpTrackManager() {
})
p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
p.dirty.Store(true)
if onTrackUpdated != nil {
if onTrackUpdated := p.getOnTrackUpdated(); onTrackUpdated != nil {
onTrackUpdated(p, track)
}
})
@@ -1261,26 +1268,16 @@ func (p *ParticipantImpl) setupParticipantTrafficLoad() {
}
func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
oldState := p.State()
if !(p.state.Swap(state) != state) {
oldState := p.state.Swap(state).(livekit.ParticipantInfo_State)
if oldState == state {
return
}
p.params.Logger.Debugw("updating participant state", "state", state.String())
p.dirty.Store(true)
p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
if onStateChange != nil {
go func() {
defer func() {
if r := Recover(p.GetLogger()); r != nil {
os.Exit(1)
}
}()
onStateChange(p, oldState)
}()
if onStateChange := p.getOnStateChange(); onStateChange != nil {
onStateChange(p, state)
}
}
@@ -1367,10 +1364,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
)
if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
if onTrackUpdated != nil {
if onTrackUpdated := p.getOnTrackUpdated(); onTrackUpdated != nil {
onTrackUpdated(p, publishedTrack)
}
}
@@ -1885,14 +1879,12 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
}
if newTrack {
go func() {
p.pubLogger.Debugw(
"track published",
"trackID", mt.ID(),
"track", logger.Proto(mt.ToProto()),
)
p.handleTrackPublished(mt)
}()
p.pubLogger.Debugw(
"track published",
"trackID", mt.ID(),
"track", logger.Proto(mt.ToProto()),
)
p.handleTrackPublished(mt)
}
return mt, newTrack
@@ -2000,15 +1992,6 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
p.supervisor.ClearPublishedTrack(trackID, mt)
}
// not logged when closing
p.params.Telemetry.TrackUnpublished(
context.Background(),
p.ID(),
p.Identity(),
mt.ToProto(),
!p.IsClosed(),
)
// re-use Track sid
p.pendingTracksLock.Lock()
if pti := p.pendingTracks[signalCid]; pti != nil {
@@ -2023,10 +2006,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
if !p.IsClosed() {
// unpublished events aren't necessary when participant is closed
p.pubLogger.Debugw("track unpublished", "trackID", ti.Sid, "track", logger.Proto(ti))
p.lock.RLock()
onTrackUnpublished := p.onTrackUnpublished
p.lock.RUnlock()
if onTrackUnpublished != nil {
if onTrackUnpublished := p.getOnTrackUnpublished(); onTrackUnpublished != nil {
onTrackUnpublished(p, mt)
}
}
@@ -2036,22 +2016,10 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
}
func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) {
p.lock.RLock()
onTrackPublished := p.onTrackPublished
p.lock.RUnlock()
if onTrackPublished != nil {
if onTrackPublished := p.getOnTrackPublished(); onTrackPublished != nil {
onTrackPublished(p, track)
}
// send webhook after callbacks are complete, persistence and state handling happens
// in `onTrackPublished` cb
p.params.Telemetry.TrackPublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
)
p.pendingTracksLock.Lock()
delete(p.pendingPublishingTracks, track.ID())
p.pendingTracksLock.Unlock()
+158 -68
View File
@@ -73,6 +73,11 @@ type disconnectSignalOnResumeNoMessages struct {
closedCount int
}
type participantWorker struct {
eventsQueue *sutils.OpsQueue
participants []types.LocalParticipant
}
type Room struct {
lock sync.RWMutex
@@ -94,6 +99,7 @@ type Room struct {
// map of identity -> Participant
participants map[livekit.ParticipantIdentity]types.LocalParticipant
participantWorkers map[livekit.ParticipantIdentity]*participantWorker
participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions
participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource
hasPublished map[livekit.ParticipantIdentity]bool
@@ -151,6 +157,7 @@ func NewRoom(
trackManager: NewRoomTrackManager(),
serverInfo: serverInfo,
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
participantWorkers: make(map[livekit.ParticipantIdentity]*participantWorker),
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource),
hasPublished: make(map[livekit.ParticipantIdentity]bool),
@@ -321,7 +328,6 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
if r.participants[participant.Identity()] != nil {
return ErrAlreadyJoined
}
if r.protoRoom.MaxParticipants > 0 && !participant.IsRecorder() {
numParticipants := uint32(0)
for _, p := range r.participants {
@@ -338,84 +344,102 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
r.joinedAt.Store(time.Now().Unix())
}
// it's important to set this before connection, we don't want to miss out on any published tracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) {
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
}
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
pw := r.addParticipantWorkerLocked(participant)
state := p.State()
if state == livekit.ParticipantInfo_ACTIVE {
// subscribe participant to existing published tracks
r.subscribeToExistingTracks(p)
// start the workers once connectivity is established
p.Start()
meta := &livekit.AnalyticsClientMeta{
ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()),
participant.OnStateChange(func(p types.LocalParticipant, state livekit.ParticipantInfo_State) {
pw.eventsQueue.Enqueue(func() {
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
}
cds := participant.GetICEConnectionDetails()
for _, cd := range cds {
if cd.Type != types.ICEConnectionTypeUnknown {
meta.ConnectionType = string(cd.Type)
break
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
if state == livekit.ParticipantInfo_ACTIVE {
// subscribe participant to existing published tracks
r.subscribeToExistingTracks(p)
meta := &livekit.AnalyticsClientMeta{
ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds()),
}
}
r.telemetry.ParticipantActive(context.Background(),
r.ToProto(),
p.ToProto(),
meta,
false,
)
cds := p.GetICEConnectionDetails()
for _, cd := range cds {
if cd.Type != types.ICEConnectionTypeUnknown {
meta.ConnectionType = string(cd.Type)
break
}
}
r.telemetry.ParticipantActive(context.Background(),
r.ToProto(),
p.ToProto(),
meta,
false,
)
p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...)
} else if state == livekit.ParticipantInfo_DISCONNECTED {
// remove participant from room
go r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected)
}
p.GetLogger().Infow("participant active", connectionDetailsFields(cds)...)
} else if state == livekit.ParticipantInfo_DISCONNECTED {
// remove participant from room
r.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonStateDisconnected)
}
})
})
// it's important to set this before connection, we don't want to miss out on any published tracks
participant.OnTrackPublished(func(p types.LocalParticipant, t types.MediaTrack) {
pw.eventsQueue.Enqueue(func() {
r.onTrackPublished(p, t)
})
})
participant.OnTrackUpdated(func(p types.LocalParticipant, t types.MediaTrack) {
pw.eventsQueue.Enqueue(func() {
r.onTrackUpdated(p, t)
})
})
participant.OnTrackUnpublished(func(p types.LocalParticipant, t types.MediaTrack) {
pw.eventsQueue.Enqueue(func() {
r.onTrackUnpublished(p, t)
})
})
participant.OnParticipantUpdate(func(p types.LocalParticipant) {
pw.eventsQueue.Enqueue(func() {
r.onParticipantUpdate(p)
})
})
participant.OnTrackUpdated(r.onTrackUpdated)
participant.OnTrackUnpublished(r.onTrackUnpublished)
participant.OnParticipantUpdate(r.onParticipantUpdate)
participant.OnDataPacket(r.onDataPacket)
participant.OnSubscribeStatusChanged(func(publisherID livekit.ParticipantID, subscribed bool) {
if subscribed {
pub := r.GetParticipantByID(publisherID)
if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE {
// when a participant subscribes to another participant,
// send speaker update if the subscribed to participant is active.
level, active := pub.GetAudioLevel()
if active {
_ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{
{
Sid: string(pub.ID()),
Level: float32(level),
Active: active,
},
}, false)
}
pw.eventsQueue.Enqueue(func() {
if subscribed {
pub := r.GetParticipantByID(publisherID)
if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE {
// when a participant subscribes to another participant,
// send speaker update if the subscribed to participant is active.
level, active := pub.GetAudioLevel()
if active {
_ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{
{
Sid: string(pub.ID()),
Level: float32(level),
Active: active,
},
}, false)
}
if cq := pub.GetConnectionQuality(); cq != nil {
update := &livekit.ConnectionQualityUpdate{}
update.Updates = append(update.Updates, cq)
_ = participant.SendConnectionQualityUpdate(update)
if cq := pub.GetConnectionQuality(); cq != nil {
update := &livekit.ConnectionQualityUpdate{}
update.Updates = append(update.Updates, cq)
_ = participant.SendConnectionQualityUpdate(update)
}
}
} else {
// no longer subscribed to the publisher, clear speaker status
_ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{
{
Sid: string(publisherID),
Level: 0,
Active: false,
},
}, true)
}
} else {
// no longer subscribed to the publisher, clear speaker status
_ = participant.SendSpeakerUpdate([]*livekit.SpeakerInfo{
{
Sid: string(publisherID),
Level: 0,
Active: false,
},
}, true)
}
})
})
r.Logger.Debugw("new participant joined",
"pID", participant.ID(),
"participant", participant.Identity(),
@@ -558,6 +582,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
}
delete(r.participants, identity)
r.removeParticipantWorkerLocked(p)
delete(r.participantOpts, identity)
delete(r.participantRequestSources, identity)
delete(r.hasPublished, identity)
@@ -784,11 +809,14 @@ func (r *Room) Close() {
}
close(r.closed)
r.lock.Unlock()
r.Logger.Infow("closing room")
for _, p := range r.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonRoomClose, false)
}
r.protoProxy.Stop()
if r.onClose != nil {
r.onClose()
}
@@ -1038,6 +1066,14 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
}
}()
}
// send webhook after callbacks are complete, i.e. after persistence and state handling
r.telemetry.TrackPublished(
context.Background(),
participant.ID(),
participant.Identity(),
track.ToProto(),
)
}
func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {
@@ -1049,6 +1085,14 @@ func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {
}
func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) {
r.telemetry.TrackUnpublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
!p.IsClosed(),
)
r.trackManager.RemoveTrack(track)
if !p.IsClosed() {
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
@@ -1452,6 +1496,52 @@ func (r *Room) DebugInfo() map[string]interface{} {
return info
}
func (r *Room) addParticipantWorkerLocked(p types.LocalParticipant) *participantWorker {
identity := p.Identity()
pw := r.participantWorkers[identity]
if pw != nil {
found := false
for _, participant := range pw.participants {
if p == participant {
found = true
break
}
}
if !found {
pw.participants = append(pw.participants, p)
}
return pw
}
pw = &participantWorker{
eventsQueue: sutils.NewOpsQueue(fmt.Sprintf("participant-worker-%s-%s", r.Name(), identity), 0, true),
participants: []types.LocalParticipant{p},
}
pw.eventsQueue.Start()
r.participantWorkers[identity] = pw
return pw
}
func (r *Room) removeParticipantWorkerLocked(p types.LocalParticipant) {
identity := p.Identity()
if pw, ok := r.participantWorkers[identity]; ok {
n := len(pw.participants)
for idx, participant := range pw.participants {
if p == participant {
pw.participants[idx] = pw.participants[n-1]
pw.participants = pw.participants[:n-1]
break
}
}
if len(pw.participants) == 0 {
pw.eventsQueue.Stop()
delete(r.participantWorkers, identity)
}
}
}
// ------------------------------------------------------------
func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) {
dest := dp.GetUser().GetDestinationSids()
var dpData []byte
+6 -8
View File
@@ -110,8 +110,7 @@ func TestRoomJoin(t *testing.T) {
stateChangeCB := p.OnStateChangeArgsForCall(0)
require.NotNil(t, stateChangeCB)
p.StateReturns(livekit.ParticipantInfo_ACTIVE)
stateChangeCB(p, livekit.ParticipantInfo_JOINED)
stateChangeCB(p, livekit.ParticipantInfo_ACTIVE)
// it should become a subscriber when connectivity changes
numTracks := 0
@@ -122,7 +121,7 @@ func TestRoomJoin(t *testing.T) {
numTracks += len(op.GetPublishedTracks())
}
require.Equal(t, numTracks, p.SubscribeToTrackCallCount())
require.Eventually(t, func() bool { return p.SubscribeToTrackCallCount() == numTracks }, 5*time.Second, 10*time.Millisecond)
})
t.Run("participant state change is broadcasted to others", func(t *testing.T) {
@@ -218,7 +217,7 @@ func TestParticipantUpdate(t *testing.T) {
expected += 1
}
fp := p.(*typesfakes.FakeLocalParticipant)
require.Equal(t, expected, fp.SendParticipantUpdateCallCount())
require.Eventually(t, func() bool { return fp.SendParticipantUpdateCallCount() == expected }, 5*time.Second, 10*time.Millisecond)
}
})
}
@@ -424,8 +423,8 @@ func TestNewTrack(t *testing.T) {
require.NotNil(t, trackCB)
trackCB(pub, track)
// only p1 should've been subscribed to
require.Eventually(t, func() bool { return p1.SubscribeToTrackCallCount() == 1 }, 5*time.Second, 10*time.Millisecond)
require.Equal(t, 0, p0.SubscribeToTrackCallCount())
require.Equal(t, 1, p1.SubscribeToTrackCallCount())
})
}
@@ -679,10 +678,9 @@ func TestHiddenParticipants(t *testing.T) {
stateChangeCB := hidden.OnStateChangeArgsForCall(0)
require.NotNil(t, stateChangeCB)
hidden.StateReturns(livekit.ParticipantInfo_ACTIVE)
stateChangeCB(hidden, livekit.ParticipantInfo_JOINED)
stateChangeCB(hidden, livekit.ParticipantInfo_ACTIVE)
require.Equal(t, 2, hidden.SubscribeToTrackCallCount())
require.Eventually(t, func() bool { return hidden.SubscribeToTrackCallCount() == 2 }, 5*time.Second, 10*time.Millisecond)
})
}
+1 -1
View File
@@ -573,7 +573,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
m.lock.Unlock()
if changedCB != nil && firstSubscribe {
go changedCB(publisherID, true)
changedCB(publisherID, true)
}
return nil
}
+4
View File
@@ -83,5 +83,9 @@ func NewMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrac
t.IDReturns(livekit.TrackID(utils.NewGuid(utils.TrackPrefix)))
t.KindReturns(kind)
t.NameReturns(name)
t.ToProtoReturns(&livekit.TrackInfo{
Type: kind,
Name: name,
})
return t
}
-2
View File
@@ -39,7 +39,6 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/rtpextension"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
@@ -238,7 +237,6 @@ type TransportParams struct {
Config *WebRTCConfig
DirectionConfig DirectionConfig
CongestionControlConfig config.CongestionControlConfig
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
Transport livekit.SignalTarget
+1 -5
View File
@@ -33,7 +33,6 @@ import (
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
@@ -54,7 +53,6 @@ type TransportManagerParams struct {
SubscriberAsPrimary bool
Config *WebRTCConfig
ProtocolVersion types.ProtocolVersion
Telemetry telemetry.TelemetryService
CongestionControlConfig config.CongestionControlConfig
EnabledSubscribeCodecs []*livekit.Codec
EnabledPublishCodecs []*livekit.Codec
@@ -119,7 +117,6 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
Config: params.Config,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER),
SimTracks: params.SimTracks,
@@ -152,7 +149,6 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
Config: params.Config,
DirectionConfig: params.Config.Subscriber,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: params.EnabledSubscribeCodecs,
Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER),
ClientInfo: params.ClientInfo,
@@ -722,7 +718,7 @@ func (t *TransportManager) ProcessPendingPublisherDataChannels() {
}
}
func (t *TransportManager) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
func (t *TransportManager) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
t.mediaLossProxy.HandleMaxLossFeedback(dt, report)
}
+3 -3
View File
@@ -273,7 +273,6 @@ type Participant interface {
IsRecorder() bool
IsAgent() bool
Start()
Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error
SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion)
@@ -379,7 +378,7 @@ type LocalParticipant interface {
IssueFullReconnect(reason ParticipantCloseReason)
// callbacks
OnStateChange(func(p LocalParticipant, oldState livekit.ParticipantInfo_State))
OnStateChange(func(p LocalParticipant, state livekit.ParticipantInfo_State))
OnMigrateStateChange(func(p LocalParticipant, migrateState MigrateState))
// OnTrackPublished - remote added a track
OnTrackPublished(func(LocalParticipant, MediaTrack))
@@ -393,9 +392,10 @@ type LocalParticipant interface {
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
OnTrafficLoad(callback func(trafficLoad *TrafficLoad))
HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
// session migration
MaybeStartMigration(force bool, onStart func()) bool
SetMigrateState(s MigrateState)
@@ -355,6 +355,12 @@ type FakeLocalParticipant struct {
handleOfferArgsForCall []struct {
arg1 webrtc.SessionDescription
}
HandleReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport)
handleReceiverReportMutex sync.RWMutex
handleReceiverReportArgsForCall []struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}
HandleReconnectAndSendResponseStub func(livekit.ReconnectReason, *livekit.ReconnectResponse) error
handleReconnectAndSendResponseMutex sync.RWMutex
handleReconnectAndSendResponseArgsForCall []struct {
@@ -571,16 +577,10 @@ type FakeLocalParticipant struct {
onParticipantUpdateArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport)
onReceiverReportMutex sync.RWMutex
onReceiverReportArgsForCall []struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}
OnStateChangeStub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State))
OnStateChangeStub func(func(p types.LocalParticipant, state livekit.ParticipantInfo_State))
onStateChangeMutex sync.RWMutex
onStateChangeArgsForCall []struct {
arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State)
}
OnSubscribeStatusChangedStub func(func(publisherID livekit.ParticipantID, subscribed bool))
onSubscribeStatusChangedMutex sync.RWMutex
@@ -791,10 +791,6 @@ type FakeLocalParticipant struct {
setTrackMutedReturnsOnCall map[int]struct {
result1 *livekit.TrackInfo
}
StartStub func()
startMutex sync.RWMutex
startArgsForCall []struct {
}
StateStub func() livekit.ParticipantInfo_State
stateMutex sync.RWMutex
stateArgsForCall []struct {
@@ -2740,6 +2736,39 @@ func (fake *FakeLocalParticipant) HandleOfferArgsForCall(i int) webrtc.SessionDe
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) {
fake.handleReceiverReportMutex.Lock()
fake.handleReceiverReportArgsForCall = append(fake.handleReceiverReportArgsForCall, struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}{arg1, arg2})
stub := fake.HandleReceiverReportStub
fake.recordInvocation("HandleReceiverReport", []interface{}{arg1, arg2})
fake.handleReceiverReportMutex.Unlock()
if stub != nil {
fake.HandleReceiverReportStub(arg1, arg2)
}
}
func (fake *FakeLocalParticipant) HandleReceiverReportCallCount() int {
fake.handleReceiverReportMutex.RLock()
defer fake.handleReceiverReportMutex.RUnlock()
return len(fake.handleReceiverReportArgsForCall)
}
func (fake *FakeLocalParticipant) HandleReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) {
fake.handleReceiverReportMutex.Lock()
defer fake.handleReceiverReportMutex.Unlock()
fake.HandleReceiverReportStub = stub
}
func (fake *FakeLocalParticipant) HandleReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) {
fake.handleReceiverReportMutex.RLock()
defer fake.handleReceiverReportMutex.RUnlock()
argsForCall := fake.handleReceiverReportArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) HandleReconnectAndSendResponse(arg1 livekit.ReconnectReason, arg2 *livekit.ReconnectResponse) error {
fake.handleReconnectAndSendResponseMutex.Lock()
ret, specificReturn := fake.handleReconnectAndSendResponseReturnsOnCall[len(fake.handleReconnectAndSendResponseArgsForCall)]
@@ -3935,43 +3964,10 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) {
fake.onReceiverReportMutex.Lock()
fake.onReceiverReportArgsForCall = append(fake.onReceiverReportArgsForCall, struct {
arg1 *sfu.DownTrack
arg2 *rtcp.ReceiverReport
}{arg1, arg2})
stub := fake.OnReceiverReportStub
fake.recordInvocation("OnReceiverReport", []interface{}{arg1, arg2})
fake.onReceiverReportMutex.Unlock()
if stub != nil {
fake.OnReceiverReportStub(arg1, arg2)
}
}
func (fake *FakeLocalParticipant) OnReceiverReportCallCount() int {
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
return len(fake.onReceiverReportArgsForCall)
}
func (fake *FakeLocalParticipant) OnReceiverReportCalls(stub func(*sfu.DownTrack, *rtcp.ReceiverReport)) {
fake.onReceiverReportMutex.Lock()
defer fake.onReceiverReportMutex.Unlock()
fake.OnReceiverReportStub = stub
}
func (fake *FakeLocalParticipant) OnReceiverReportArgsForCall(i int) (*sfu.DownTrack, *rtcp.ReceiverReport) {
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
argsForCall := fake.onReceiverReportArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) {
func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State)) {
fake.onStateChangeMutex.Lock()
fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct {
arg1 func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
arg1 func(p types.LocalParticipant, state livekit.ParticipantInfo_State)
}{arg1})
stub := fake.OnStateChangeStub
fake.recordInvocation("OnStateChange", []interface{}{arg1})
@@ -3987,13 +3983,13 @@ func (fake *FakeLocalParticipant) OnStateChangeCallCount() int {
return len(fake.onStateChangeArgsForCall)
}
func (fake *FakeLocalParticipant) OnStateChangeCalls(stub func(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State))) {
func (fake *FakeLocalParticipant) OnStateChangeCalls(stub func(func(p types.LocalParticipant, state livekit.ParticipantInfo_State))) {
fake.onStateChangeMutex.Lock()
defer fake.onStateChangeMutex.Unlock()
fake.OnStateChangeStub = stub
}
func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) {
func (fake *FakeLocalParticipant) OnStateChangeArgsForCall(i int) func(p types.LocalParticipant, state livekit.ParticipantInfo_State) {
fake.onStateChangeMutex.RLock()
defer fake.onStateChangeMutex.RUnlock()
argsForCall := fake.onStateChangeArgsForCall[i]
@@ -5209,30 +5205,6 @@ func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *liv
}{result1}
}
func (fake *FakeLocalParticipant) Start() {
fake.startMutex.Lock()
fake.startArgsForCall = append(fake.startArgsForCall, struct {
}{})
stub := fake.StartStub
fake.recordInvocation("Start", []interface{}{})
fake.startMutex.Unlock()
if stub != nil {
fake.StartStub()
}
}
func (fake *FakeLocalParticipant) StartCallCount() int {
fake.startMutex.RLock()
defer fake.startMutex.RUnlock()
return len(fake.startArgsForCall)
}
func (fake *FakeLocalParticipant) StartCalls(stub func()) {
fake.startMutex.Lock()
defer fake.startMutex.Unlock()
fake.StartStub = stub
}
func (fake *FakeLocalParticipant) State() livekit.ParticipantInfo_State {
fake.stateMutex.Lock()
ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)]
@@ -6282,6 +6254,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.handleAnswerMutex.RUnlock()
fake.handleOfferMutex.RLock()
defer fake.handleOfferMutex.RUnlock()
fake.handleReceiverReportMutex.RLock()
defer fake.handleReceiverReportMutex.RUnlock()
fake.handleReconnectAndSendResponseMutex.RLock()
defer fake.handleReconnectAndSendResponseMutex.RUnlock()
fake.handleSignalSourceCloseMutex.RLock()
@@ -6334,8 +6308,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onMigrateStateChangeMutex.RUnlock()
fake.onParticipantUpdateMutex.RLock()
defer fake.onParticipantUpdateMutex.RUnlock()
fake.onReceiverReportMutex.RLock()
defer fake.onReceiverReportMutex.RUnlock()
fake.onStateChangeMutex.RLock()
defer fake.onStateChangeMutex.RUnlock()
fake.onSubscribeStatusChangedMutex.RLock()
@@ -6392,8 +6364,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setSubscriberChannelCapacityMutex.RUnlock()
fake.setTrackMutedMutex.RLock()
defer fake.setTrackMutedMutex.RUnlock()
fake.startMutex.RLock()
defer fake.startMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
fake.subscribeToTrackMutex.RLock()
@@ -165,10 +165,6 @@ type FakeParticipant struct {
setNameArgsForCall []struct {
arg1 string
}
StartStub func()
startMutex sync.RWMutex
startArgsForCall []struct {
}
StateStub func() livekit.ParticipantInfo_State
stateMutex sync.RWMutex
stateArgsForCall []struct {
@@ -1047,30 +1043,6 @@ func (fake *FakeParticipant) SetNameArgsForCall(i int) string {
return argsForCall.arg1
}
func (fake *FakeParticipant) Start() {
fake.startMutex.Lock()
fake.startArgsForCall = append(fake.startArgsForCall, struct {
}{})
stub := fake.StartStub
fake.recordInvocation("Start", []interface{}{})
fake.startMutex.Unlock()
if stub != nil {
fake.StartStub()
}
}
func (fake *FakeParticipant) StartCallCount() int {
fake.startMutex.RLock()
defer fake.startMutex.RUnlock()
return len(fake.startArgsForCall)
}
func (fake *FakeParticipant) StartCalls(stub func()) {
fake.startMutex.Lock()
defer fake.startMutex.Unlock()
fake.StartStub = stub
}
func (fake *FakeParticipant) State() livekit.ParticipantInfo_State {
fake.stateMutex.Lock()
ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)]
@@ -1393,8 +1365,6 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.setMetadataMutex.RUnlock()
fake.setNameMutex.RLock()
defer fake.setNameMutex.RUnlock()
fake.startMutex.RLock()
defer fake.startMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
fake.subscriptionPermissionMutex.RLock()
-3
View File
@@ -62,9 +62,6 @@ func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
}
}
func (u *UpTrackManager) Start() {
}
func (u *UpTrackManager) Close(willBeResumed bool) {
u.lock.Lock()
u.closed = true