Improve thread safety with participant callbacks (#728)

This commit is contained in:
David Zhao
2022-05-27 12:26:07 -07:00
committed by GitHub
parent cb9f0d37c2
commit a4882b9866
+49 -19
View File
@@ -329,17 +329,19 @@ func (p *ParticipantImpl) SetMetadata(metadata string) {
p.lock.Lock()
changed := p.grants.Metadata != metadata
p.grants.Metadata = metadata
onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()
if !changed {
return
}
if p.onParticipantUpdate != nil {
p.onParticipantUpdate(p)
if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
if p.onClaimsChanged != nil {
p.onClaimsChanged(p)
if onClaimsChanged != nil {
onClaimsChanged(p)
}
}
@@ -374,6 +376,8 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
video.Recorder = permission.Recorder
canPublish := video.GetCanPublish()
onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()
// publish permission has been revoked then remove all published tracks
@@ -391,11 +395,11 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
// update isPublisher attribute
p.isPublisher.Store(canPublish && p.publisher.IsEstablished())
if p.onParticipantUpdate != nil {
p.onParticipantUpdate(p)
if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
if p.onClaimsChanged != nil {
p.onClaimsChanged(p)
if onClaimsChanged != nil {
onClaimsChanged(p)
}
return true
}
@@ -427,33 +431,45 @@ func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine {
// callbacks for clients
func (p *ParticipantImpl) OnTrackPublished(callback func(types.LocalParticipant, types.MediaTrack)) {
p.lock.Lock()
p.onTrackPublished = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) {
p.lock.Lock()
defer p.lock.Unlock()
p.onStateChange = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnTrackUpdated(callback func(types.LocalParticipant, types.MediaTrack)) {
p.lock.Lock()
p.onTrackUpdated = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnParticipantUpdate(callback func(types.LocalParticipant)) {
p.lock.Lock()
p.onParticipantUpdate = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *livekit.DataPacket)) {
p.lock.Lock()
p.onDataPacket = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, livekit.ParticipantID)) {
p.lock.Lock()
p.onSubscribedTo = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
p.lock.Lock()
p.onClose = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnClaimsChanged(callback func(types.LocalParticipant)) {
@@ -468,6 +484,7 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
p.lock.Unlock()
return
}
onParticipantUpdate := p.onParticipantUpdate
p.lock.Unlock()
p.params.Logger.Debugw("answering pub offer",
"state", p.State().String(),
@@ -509,8 +526,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
if p.isPublisher.Load() != p.CanPublish() {
p.isPublisher.Store(p.CanPublish())
// trigger update as well if participant is already fully connected
if p.State() == livekit.ParticipantInfo_ACTIVE && p.onParticipantUpdate != nil {
p.onParticipantUpdate(p)
if p.State() == livekit.ParticipantInfo_ACTIVE && onParticipantUpdate != nil {
onParticipantUpdate(p)
}
}
@@ -834,6 +851,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
"publisherIdentity", subTrack.PublisherIdentity(),
"trackID", subTrack.ID())
p.lock.Lock()
onSubscribedTo := p.onSubscribedTo
p.subscribedTracks[subTrack.ID()] = subTrack
settings := p.subscribedTracksSettings[subTrack.ID()]
p.lock.Unlock()
@@ -849,8 +867,8 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
publisherID := subTrack.PublisherID()
isAlreadySubscribed := p.isSubscribedTo(publisherID)
p.subscribedTo.Store(publisherID, struct{}{})
if !isAlreadySubscribed && p.onSubscribedTo != nil {
p.onSubscribedTo(p, publisherID)
if !isAlreadySubscribed && onSubscribedTo != nil {
onSubscribedTo(p, publisherID)
}
}
@@ -958,8 +976,11 @@ func (p *ParticipantImpl) setupUpTrackManager() {
return
}
if p.onTrackUpdated != nil {
p.onTrackUpdated(p, track)
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
if onTrackUpdated != nil {
onTrackUpdated(p, track)
}
})
@@ -1031,8 +1052,14 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"rid", track.RID(),
"SSRC", track.SSRC())
}
if !isNewTrack && publishedTrack != nil && !publishedTrack.HasPendingCodec() && p.IsReady() && p.onTrackUpdated != nil {
p.onTrackUpdated(p, publishedTrack)
if !isNewTrack && publishedTrack != nil && !publishedTrack.HasPendingCodec() && p.IsReady() {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
if onTrackUpdated != nil {
onTrackUpdated(p, publishedTrack)
}
}
}
@@ -1073,9 +1100,12 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [
// only forward on user payloads
switch payload := dp.Value.(type) {
case *livekit.DataPacket_User:
if p.onDataPacket != nil {
p.lock.RLock()
onDataPacket := p.onDataPacket
p.lock.RUnlock()
if onDataPacket != nil {
payload.User.ParticipantSid = string(p.params.SID)
p.onDataPacket(p, &dp)
onDataPacket(p, &dp)
}
default:
p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload)