Improve robustness of JoinResponse and ParticipantUpdate race handling (#1272)

This commit is contained in:
David Zhao
2022-12-29 21:52:55 -08:00
committed by GitHub
parent 86bf5cb62e
commit fe6234329d
2 changed files with 44 additions and 14 deletions

View File

@@ -97,7 +97,6 @@ type ParticipantImpl struct {
isClosed atomic.Bool
state atomic.Value // livekit.ParticipantInfo_State
updateCache *lru.Cache[livekit.ParticipantID, uint32]
resSink atomic.Value // routing.MessageSink
resSinkValid atomic.Bool
grants *auth.ClaimGrants
@@ -129,19 +128,27 @@ type ParticipantImpl struct {
subscribedTracksSettings map[livekit.TrackID]*livekit.UpdateTrackSettings
// keeps track of disallowed tracks
disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID
// keep track of other publishers ids that we are subscribed to
subscribedTo map[livekit.ParticipantID]struct{}
// keeps track of other publishers ids that we are subscribed to
subscribedTo map[livekit.ParticipantID]struct{}
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
// queued participant updates before join response is sent
// guarded by updateLock
queuedUpdates []*livekit.ParticipantInfo
// cache of recently sent updates, to ensuring ordering by version
// guarded by updateLock
updateCache *lru.Cache[livekit.ParticipantID, uint32]
updateLock sync.Mutex
dataChannelStats *telemetry.BytesTrackStats
rttUpdatedAt time.Time
lastRTT uint32
lock sync.RWMutex
once sync.Once
updateLock sync.Mutex
version atomic.Uint32
lock sync.RWMutex
once sync.Once
version atomic.Uint32
// callbacks & handlers
onTrackPublished func(types.LocalParticipant, types.MediaTrack)

View File

@@ -30,25 +30,48 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
}
func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error {
// update state prior to sending message, or the message would not be sent
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
// keep track of participant updates and versions
p.updateLock.Lock()
for _, op := range joinResponse.OtherParticipants {
p.updateCache.Add(livekit.ParticipantID(op.Sid), op.Version)
}
p.updateLock.Unlock()
// send Join response
return p.writeMessage(&livekit.SignalResponse{
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Join{
Join: joinResponse,
},
})
if err != nil {
return err
}
// update state after to sending message, so that no participant updates could slip through before JoinResponse is
// sent
p.updateLock.Lock()
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
}
queuedUpdates := p.queuedUpdates
p.queuedUpdates = nil
p.updateLock.Unlock()
if len(queuedUpdates) > 0 {
return p.SendParticipantUpdate(queuedUpdates)
}
return nil
}
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error {
p.updateLock.Lock()
if !p.IsReady() {
// avoid manipulating cache before it's ready
// queue up updates
p.queuedUpdates = append(p.queuedUpdates, participantsToUpdate...)
p.updateLock.Unlock()
return nil
}
p.updateLock.Lock()
validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate))
for _, pi := range participantsToUpdate {
isValid := true
@@ -178,7 +201,7 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) {
}
func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
if !p.IsReady() {
if p.State() == livekit.ParticipantInfo_DISCONNECTED || (!p.IsReady() && msg.GetJoin() == nil) {
return nil
}