Cache data synchronously for processing in worker. (#2424)

It is possible that state of underlying object has changed between
event posting and event processing. So, cache data synchronously
and use it during event processing.

This is still not perfect as things like `hidden` and `IsClosed` is
accessed in worker. Ideally, it can be a snapshot of current state of
all required values that can be posted to the worker and the worker just
operates with data.
This commit is contained in:
Raja Subramanian
2024-01-29 10:57:41 +05:30
committed by GitHub
parent 134b6f05b4
commit efbc985c82
3 changed files with 57 additions and 37 deletions
+52 -32
View File
@@ -118,7 +118,7 @@ type Room struct {
trailer []byte
onParticipantChanged func(p types.LocalParticipant)
onParticipantChanged func(p types.LocalParticipant, pi *livekit.ParticipantInfo)
onRoomUpdated func()
onClose func()
@@ -347,11 +347,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
pw := r.addParticipantWorkerLocked(participant)
participant.OnStateChange(func(p types.LocalParticipant, state livekit.ParticipantInfo_State) {
ri := r.ToProto()
pi := p.ToProto()
pw.eventsQueue.Enqueue(func() {
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
r.onParticipantChanged(p, pi)
}
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true})
if state == livekit.ParticipantInfo_ACTIVE {
// subscribe participant to existing published tracks
@@ -368,8 +370,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
}
}
r.telemetry.ParticipantActive(context.Background(),
r.ToProto(),
p.ToProto(),
ri,
pi,
meta,
false,
)
@@ -383,23 +385,29 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
})
// it's important to set this before connection, we don't want to miss out on any published tracks
participant.OnTrackPublished(func(p types.LocalParticipant, t types.MediaTrack) {
pi := p.ToProto()
ti := t.ToProto()
pw.eventsQueue.Enqueue(func() {
r.onTrackPublished(p, t)
r.onTrackPublished(p, pi, t, ti)
})
})
participant.OnTrackUpdated(func(p types.LocalParticipant, t types.MediaTrack) {
pi := p.ToProto()
pw.eventsQueue.Enqueue(func() {
r.onTrackUpdated(p, t)
r.onTrackUpdated(p, pi, t)
})
})
participant.OnTrackUnpublished(func(p types.LocalParticipant, t types.MediaTrack) {
pi := p.ToProto()
ti := t.ToProto()
pw.eventsQueue.Enqueue(func() {
r.onTrackUnpublished(p, t)
r.onTrackUnpublished(p, pi, t, ti)
})
})
participant.OnParticipantUpdate(func(p types.LocalParticipant) {
pi := p.ToProto()
pw.eventsQueue.Enqueue(func() {
r.onParticipantUpdate(p)
r.onParticipantUpdate(p, pi)
})
})
participant.OnDataPacket(r.onDataPacket)
@@ -460,7 +468,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
r.participantRequestSources[participant.Identity()] = requestSource
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
r.onParticipantChanged(participant, participant.ToProto())
}
time.AfterFunc(time.Minute, func() {
@@ -637,10 +645,11 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
r.leftAt.Store(time.Now().Unix())
if sendUpdates {
pi := p.ToProto()
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
r.onParticipantChanged(p, pi)
}
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true})
}
}
@@ -826,7 +835,7 @@ func (r *Room) OnClose(f func()) {
r.onClose = f
}
func (r *Room) OnParticipantChanged(f func(participant types.LocalParticipant)) {
func (r *Room) OnParticipantChanged(f func(p types.LocalParticipant, pi *livekit.ParticipantInfo)) {
r.onParticipantChanged = f
}
@@ -988,9 +997,14 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS
}
// a ParticipantImpl in the room added a new track, subscribe other participants to it
func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) {
func (r *Room) onTrackPublished(
participant types.LocalParticipant,
pi *livekit.ParticipantInfo,
track types.MediaTrack,
ti *livekit.TrackInfo,
) {
// publish participant update, since track state is changed
r.broadcastParticipantState(participant, broadcastOptions{skipSource: true})
r.broadcastParticipantState(participant, pi, broadcastOptions{skipSource: true})
r.lock.RLock()
// subscribe all existing participants to this MediaTrack
@@ -1019,7 +1033,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
r.lock.RUnlock()
if onParticipantChanged != nil {
onParticipantChanged(participant)
onParticipantChanged(participant, pi)
}
r.trackManager.AddTrack(track, participant.Identity(), participant.ID())
@@ -1072,42 +1086,51 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
context.Background(),
participant.ID(),
participant.Identity(),
track.ToProto(),
ti,
)
}
func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {
func (r *Room) onTrackUpdated(
p types.LocalParticipant,
pi *livekit.ParticipantInfo,
_ types.MediaTrack,
) {
// send track updates to everyone, especially if track was updated by admin
r.broadcastParticipantState(p, broadcastOptions{})
r.broadcastParticipantState(p, pi, broadcastOptions{})
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
r.onParticipantChanged(p, pi)
}
}
func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) {
func (r *Room) onTrackUnpublished(
p types.LocalParticipant,
pi *livekit.ParticipantInfo,
track types.MediaTrack,
ti *livekit.TrackInfo,
) {
r.telemetry.TrackUnpublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
ti,
!p.IsClosed(),
)
r.trackManager.RemoveTrack(track)
if !p.IsClosed() {
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true})
}
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
r.onParticipantChanged(p, pi)
}
}
func (r *Room) onParticipantUpdate(p types.LocalParticipant) {
func (r *Room) onParticipantUpdate(p types.LocalParticipant, pi *livekit.ParticipantInfo) {
r.protoProxy.MarkDirty(false)
// immediately notify when permissions or metadata changed
r.broadcastParticipantState(p, broadcastOptions{immediate: true})
r.broadcastParticipantState(p, pi, broadcastOptions{immediate: true})
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
r.onParticipantChanged(p, pi)
}
}
@@ -1142,16 +1165,13 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) {
}
// broadcast an update about participant p
func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcastOptions) {
pi := p.ToProto()
func (r *Room) broadcastParticipantState(p types.LocalParticipant, pi *livekit.ParticipantInfo, opts broadcastOptions) {
if p.Hidden() {
if !opts.skipSource {
// send update only to hidden participant
err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
p.GetLogger().Errorw("could not send update to participant", err)
}
}
return
+1 -1
View File
@@ -127,7 +127,7 @@ func TestRoomJoin(t *testing.T) {
t.Run("participant state change is broadcasted to others", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants})
var changedParticipant types.Participant
rm.OnParticipantChanged(func(participant types.LocalParticipant) {
rm.OnParticipantChanged(func(participant types.LocalParticipant, _pi *livekit.ParticipantInfo) {
changedParticipant = participant
})
participants := rm.GetParticipants()
+4 -4
View File
@@ -577,10 +577,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
})
newRoom.OnParticipantChanged(func(p types.LocalParticipant) {
if !p.IsDisconnected() {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
newRoom.Logger.Errorw("could not handle participant change", err)
newRoom.OnParticipantChanged(func(p types.LocalParticipant, pi *livekit.ParticipantInfo) {
if pi.State != livekit.ParticipantInfo_DISCONNECTED {
if err := r.roomStore.StoreParticipant(ctx, roomName, pi); err != nil {
p.GetLogger().Errorw("could not handle participant change", err)
}
}
})