diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 84e1b8bf7..367a68fa7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -329,17 +329,19 @@ func (p *ParticipantImpl) SetMetadata(metadata string) { p.lock.Lock() changed := p.grants.Metadata != metadata p.grants.Metadata = metadata + onParticipantUpdate := p.onParticipantUpdate + onClaimsChanged := p.onClaimsChanged p.lock.Unlock() if !changed { return } - if p.onParticipantUpdate != nil { - p.onParticipantUpdate(p) + if onParticipantUpdate != nil { + onParticipantUpdate(p) } - if p.onClaimsChanged != nil { - p.onClaimsChanged(p) + if onClaimsChanged != nil { + onClaimsChanged(p) } } @@ -374,6 +376,8 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio video.Recorder = permission.Recorder canPublish := video.GetCanPublish() + onParticipantUpdate := p.onParticipantUpdate + onClaimsChanged := p.onClaimsChanged p.lock.Unlock() // publish permission has been revoked then remove all published tracks @@ -391,11 +395,11 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio // update isPublisher attribute p.isPublisher.Store(canPublish && p.publisher.IsEstablished()) - if p.onParticipantUpdate != nil { - p.onParticipantUpdate(p) + if onParticipantUpdate != nil { + onParticipantUpdate(p) } - if p.onClaimsChanged != nil { - p.onClaimsChanged(p) + if onClaimsChanged != nil { + onClaimsChanged(p) } return true } @@ -427,33 +431,45 @@ func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine { // callbacks for clients func (p *ParticipantImpl) OnTrackPublished(callback func(types.LocalParticipant, types.MediaTrack)) { + p.lock.Lock() p.onTrackPublished = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) { p.lock.Lock() - defer p.lock.Unlock() p.onStateChange = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnTrackUpdated(callback func(types.LocalParticipant, types.MediaTrack)) { + p.lock.Lock() p.onTrackUpdated = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnParticipantUpdate(callback func(types.LocalParticipant)) { + p.lock.Lock() p.onParticipantUpdate = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *livekit.DataPacket)) { + p.lock.Lock() p.onDataPacket = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, livekit.ParticipantID)) { + p.lock.Lock() p.onSubscribedTo = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { + p.lock.Lock() p.onClose = callback + p.lock.Unlock() } func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant)) { @@ -468,6 +484,7 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web p.lock.Unlock() return } + onParticipantUpdate := p.onParticipantUpdate p.lock.Unlock() p.params.Logger.Debugw("answering pub offer", "state", p.State().String(), @@ -509,8 +526,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web if p.isPublisher.Load() != p.CanPublish() { p.isPublisher.Store(p.CanPublish()) // trigger update as well if participant is already fully connected - if p.State() == livekit.ParticipantInfo_ACTIVE && p.onParticipantUpdate != nil { - p.onParticipantUpdate(p) + if p.State() == livekit.ParticipantInfo_ACTIVE && onParticipantUpdate != nil { + onParticipantUpdate(p) } } @@ -834,6 +851,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { "publisherIdentity", subTrack.PublisherIdentity(), "trackID", subTrack.ID()) p.lock.Lock() + onSubscribedTo := p.onSubscribedTo p.subscribedTracks[subTrack.ID()] = subTrack settings := p.subscribedTracksSettings[subTrack.ID()] p.lock.Unlock() @@ -849,8 +867,8 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { publisherID := subTrack.PublisherID() isAlreadySubscribed := p.isSubscribedTo(publisherID) p.subscribedTo.Store(publisherID, struct{}{}) - if !isAlreadySubscribed && p.onSubscribedTo != nil { - p.onSubscribedTo(p, publisherID) + if !isAlreadySubscribed && onSubscribedTo != nil { + onSubscribedTo(p, publisherID) } } @@ -958,8 +976,11 @@ func (p *ParticipantImpl) setupUpTrackManager() { return } - if p.onTrackUpdated != nil { - p.onTrackUpdated(p, track) + p.lock.RLock() + onTrackUpdated := p.onTrackUpdated + p.lock.RUnlock() + if onTrackUpdated != nil { + onTrackUpdated(p, track) } }) @@ -1031,8 +1052,14 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w "rid", track.RID(), "SSRC", track.SSRC()) } - if !isNewTrack && publishedTrack != nil && !publishedTrack.HasPendingCodec() && p.IsReady() && p.onTrackUpdated != nil { - p.onTrackUpdated(p, publishedTrack) + + if !isNewTrack && publishedTrack != nil && !publishedTrack.HasPendingCodec() && p.IsReady() { + p.lock.RLock() + onTrackUpdated := p.onTrackUpdated + p.lock.RUnlock() + if onTrackUpdated != nil { + onTrackUpdated(p, publishedTrack) + } } } @@ -1073,9 +1100,12 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [ // only forward on user payloads switch payload := dp.Value.(type) { case *livekit.DataPacket_User: - if p.onDataPacket != nil { + p.lock.RLock() + onDataPacket := p.onDataPacket + p.lock.RUnlock() + if onDataPacket != nil { payload.User.ParticipantSid = string(p.params.SID) - p.onDataPacket(p, &dp) + onDataPacket(p, &dp) } default: p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload)