diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 745ae9f4f..d95044ed9 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -118,7 +118,7 @@ type Room struct { trailer []byte - onParticipantChanged func(p types.LocalParticipant) + onParticipantChanged func(p types.LocalParticipant, pi *livekit.ParticipantInfo) onRoomUpdated func() onClose func() @@ -347,11 +347,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me pw := r.addParticipantWorkerLocked(participant) participant.OnStateChange(func(p types.LocalParticipant, state livekit.ParticipantInfo_State) { + ri := r.ToProto() + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) if state == livekit.ParticipantInfo_ACTIVE { // subscribe participant to existing published tracks @@ -368,8 +370,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } } r.telemetry.ParticipantActive(context.Background(), - r.ToProto(), - p.ToProto(), + ri, + pi, meta, false, ) @@ -383,23 +385,29 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me }) // it's important to set this before connection, we don't want to miss out on any published tracks participant.OnTrackPublished(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() + ti := t.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackPublished(p, t) + r.onTrackPublished(p, pi, t, ti) }) }) participant.OnTrackUpdated(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackUpdated(p, t) + r.onTrackUpdated(p, pi, t) }) }) participant.OnTrackUnpublished(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() + ti := t.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackUnpublished(p, t) + r.onTrackUnpublished(p, pi, t, ti) }) }) participant.OnParticipantUpdate(func(p types.LocalParticipant) { + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { - r.onParticipantUpdate(p) + r.onParticipantUpdate(p, pi) }) }) participant.OnDataPacket(r.onDataPacket) @@ -460,7 +468,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me r.participantRequestSources[participant.Identity()] = requestSource if r.onParticipantChanged != nil { - r.onParticipantChanged(participant) + r.onParticipantChanged(participant, participant.ToProto()) } time.AfterFunc(time.Minute, func() { @@ -637,10 +645,11 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek r.leftAt.Store(time.Now().Unix()) if sendUpdates { + pi := p.ToProto() if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) } } @@ -826,7 +835,7 @@ func (r *Room) OnClose(f func()) { r.onClose = f } -func (r *Room) OnParticipantChanged(f func(participant types.LocalParticipant)) { +func (r *Room) OnParticipantChanged(f func(p types.LocalParticipant, pi *livekit.ParticipantInfo)) { r.onParticipantChanged = f } @@ -988,9 +997,14 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS } // a ParticipantImpl in the room added a new track, subscribe other participants to it -func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) { +func (r *Room) onTrackPublished( + participant types.LocalParticipant, + pi *livekit.ParticipantInfo, + track types.MediaTrack, + ti *livekit.TrackInfo, +) { // publish participant update, since track state is changed - r.broadcastParticipantState(participant, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(participant, pi, broadcastOptions{skipSource: true}) r.lock.RLock() // subscribe all existing participants to this MediaTrack @@ -1019,7 +1033,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.lock.RUnlock() if onParticipantChanged != nil { - onParticipantChanged(participant) + onParticipantChanged(participant, pi) } r.trackManager.AddTrack(track, participant.Identity(), participant.ID()) @@ -1072,42 +1086,51 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. context.Background(), participant.ID(), participant.Identity(), - track.ToProto(), + ti, ) } -func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { +func (r *Room) onTrackUpdated( + p types.LocalParticipant, + pi *livekit.ParticipantInfo, + _ types.MediaTrack, +) { // send track updates to everyone, especially if track was updated by admin - r.broadcastParticipantState(p, broadcastOptions{}) + r.broadcastParticipantState(p, pi, broadcastOptions{}) if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } -func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) { +func (r *Room) onTrackUnpublished( + p types.LocalParticipant, + pi *livekit.ParticipantInfo, + track types.MediaTrack, + ti *livekit.TrackInfo, +) { r.telemetry.TrackUnpublished( context.Background(), p.ID(), p.Identity(), - track.ToProto(), + ti, !p.IsClosed(), ) r.trackManager.RemoveTrack(track) if !p.IsClosed() { - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) } if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } -func (r *Room) onParticipantUpdate(p types.LocalParticipant) { +func (r *Room) onParticipantUpdate(p types.LocalParticipant, pi *livekit.ParticipantInfo) { r.protoProxy.MarkDirty(false) // immediately notify when permissions or metadata changed - r.broadcastParticipantState(p, broadcastOptions{immediate: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{immediate: true}) if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } @@ -1142,16 +1165,13 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) { } // broadcast an update about participant p -func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcastOptions) { - pi := p.ToProto() - +func (r *Room) broadcastParticipantState(p types.LocalParticipant, pi *livekit.ParticipantInfo, opts broadcastOptions) { if p.Hidden() { if !opts.skipSource { // send update only to hidden participant err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}) if err != nil { - r.Logger.Errorw("could not send update to participant", err, - "participant", p.Identity(), "pID", p.ID()) + p.GetLogger().Errorw("could not send update to participant", err) } } return diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 4990b64d4..ec564c4af 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -127,7 +127,7 @@ func TestRoomJoin(t *testing.T) { t.Run("participant state change is broadcasted to others", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) var changedParticipant types.Participant - rm.OnParticipantChanged(func(participant types.LocalParticipant) { + rm.OnParticipantChanged(func(participant types.LocalParticipant, _pi *livekit.ParticipantInfo) { changedParticipant = participant }) participants := rm.GetParticipants() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index abfd29331..1c010acbb 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -577,10 +577,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } }) - newRoom.OnParticipantChanged(func(p types.LocalParticipant) { - if !p.IsDisconnected() { - if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil { - newRoom.Logger.Errorw("could not handle participant change", err) + newRoom.OnParticipantChanged(func(p types.LocalParticipant, pi *livekit.ParticipantInfo) { + if pi.State != livekit.ParticipantInfo_DISCONNECTED { + if err := r.roomStore.StoreParticipant(ctx, roomName, pi); err != nil { + p.GetLogger().Errorw("could not handle participant change", err) } } })