diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index cbf0e25b8..1694ac2b5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -97,7 +97,6 @@ type ParticipantImpl struct { isClosed atomic.Bool state atomic.Value // livekit.ParticipantInfo_State - updateCache *lru.Cache[livekit.ParticipantID, uint32] resSink atomic.Value // routing.MessageSink resSinkValid atomic.Bool grants *auth.ClaimGrants @@ -129,19 +128,27 @@ type ParticipantImpl struct { subscribedTracksSettings map[livekit.TrackID]*livekit.UpdateTrackSettings // keeps track of disallowed tracks disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID - // keep track of other publishers ids that we are subscribed to - subscribedTo map[livekit.ParticipantID]struct{} + // keeps track of other publishers ids that we are subscribed to + subscribedTo map[livekit.ParticipantID]struct{} + // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo + // queued participant updates before join response is sent + // guarded by updateLock + queuedUpdates []*livekit.ParticipantInfo + // cache of recently sent updates, to ensuring ordering by version + // guarded by updateLock + updateCache *lru.Cache[livekit.ParticipantID, uint32] + updateLock sync.Mutex + dataChannelStats *telemetry.BytesTrackStats rttUpdatedAt time.Time lastRTT uint32 - lock sync.RWMutex - once sync.Once - updateLock sync.Mutex - version atomic.Uint32 + lock sync.RWMutex + once sync.Once + version atomic.Uint32 // callbacks & handlers onTrackPublished func(types.LocalParticipant, types.MediaTrack) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 37f0587e3..c94733785 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -30,25 +30,48 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { } func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { - // update state prior to sending message, or the message would not be sent - if p.State() == livekit.ParticipantInfo_JOINING { - p.updateState(livekit.ParticipantInfo_JOINED) + // keep track of participant updates and versions + p.updateLock.Lock() + for _, op := range joinResponse.OtherParticipants { + p.updateCache.Add(livekit.ParticipantID(op.Sid), op.Version) } + p.updateLock.Unlock() // send Join response - return p.writeMessage(&livekit.SignalResponse{ + err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Join{ Join: joinResponse, }, }) + if err != nil { + return err + } + + // update state after to sending message, so that no participant updates could slip through before JoinResponse is + // sent + p.updateLock.Lock() + if p.State() == livekit.ParticipantInfo_JOINING { + p.updateState(livekit.ParticipantInfo_JOINED) + } + queuedUpdates := p.queuedUpdates + p.queuedUpdates = nil + p.updateLock.Unlock() + + if len(queuedUpdates) > 0 { + return p.SendParticipantUpdate(queuedUpdates) + } + + return nil } func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { + p.updateLock.Lock() if !p.IsReady() { - // avoid manipulating cache before it's ready + // queue up updates + p.queuedUpdates = append(p.queuedUpdates, participantsToUpdate...) + p.updateLock.Unlock() return nil } - p.updateLock.Lock() validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate)) for _, pi := range participantsToUpdate { isValid := true @@ -178,7 +201,7 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { } func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { - if !p.IsReady() { + if p.State() == livekit.ParticipantInfo_DISCONNECTED || (!p.IsReady() && msg.GetJoin() == nil) { return nil }