Persist participant before firing webhook (#1340)

Fixes #1339
This commit is contained in:
David Zhao
2023-01-27 19:29:38 -08:00
committed by GitHub
parent d83f9fe68b
commit db40272657
4 changed files with 19 additions and 25 deletions
-4
View File
@@ -233,10 +233,6 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c h1:wdzwTJjCpzy2FDmwdyVVGVa4+U9iv3E4Jy9qUDe/ubw=
github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.3.3-0.20230124045313-d208f342983a h1:j30ZpR5TN1XRd4bPiO1xqGfYVVNxqh6QtI6bBSlpk4U=
github.com/livekit/protocol v1.3.3-0.20230124045313-d208f342983a/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/protocol v1.3.3-0.20230127105819-b75d0aed9e9c h1:6/fmFL/CZWexH9hgcn/AJ4M0Xag98cwUXCs9j2hnoJY=
github.com/livekit/protocol v1.3.3-0.20230127105819-b75d0aed9e9c/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e h1:T+qUuDHioL5Q5Gzjun9tB65oaC9+zWmeWlcvpG+iilc=
github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/psrpc v0.2.4 h1:Fdxq56uJAIpRHCTgJsvp7ozw51dKtUmD3nxSXq9pCLs=
+18 -12
View File
@@ -534,9 +534,13 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...)
p.pendingTracksLock.Unlock()
for _, t := range addedTracks {
p.handleTrackPublished(t)
}
// launch callbacks in goroutine since they could block
// callbacks handle webhooks as well as db persistence
go func() {
for _, t := range addedTracks {
p.handleTrackPublished(t)
}
}()
}
func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) {
@@ -579,7 +583,7 @@ func (p *ParticipantImpl) SetMigrateInfo(
for _, t := range mediaTracks {
ti := t.GetTrack()
p.supervisor.AddPublication(livekit.TrackID(ti.Sid), nil)
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true}
@@ -1404,14 +1408,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}
p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti)
p.supervisor.AddPublication(livekit.TrackID(ti.Sid), func(t types.LocalMediaTrack) {
p.params.Telemetry.TrackPublished(
context.Background(),
t.PublisherID(),
t.PublisherIdentity(),
t.ToProto(),
)
})
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil {
if p.pendingTracks[req.Cid] == nil {
@@ -1643,6 +1640,15 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) {
if onTrackPublished != nil {
onTrackPublished(p, track)
}
// send webhook after callbacks are complete, persistence and state handling happens
// in `onTrackPublished` cb
p.params.Telemetry.TrackPublished(
context.Background(),
p.ID(),
p.Identity(),
track.ToProto(),
)
}
func (p *ParticipantImpl) hasPendingMigratedTrack() bool {
+1 -2
View File
@@ -75,7 +75,7 @@ func (p *ParticipantSupervisor) SetPublisherPeerConnectionConnected(isConnected
p.lock.Unlock()
}
func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, onSuccess func(track types.LocalMediaTrack)) {
func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) {
p.lock.Lock()
pm, ok := p.publications[trackID]
if !ok {
@@ -85,7 +85,6 @@ func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, onSucces
TrackID: trackID,
IsPeerConnectionConnected: p.isPublisherConnected,
Logger: p.params.Logger,
OnSuccess: onSuccess,
},
),
}
@@ -28,7 +28,6 @@ type PublicationMonitorParams struct {
TrackID livekit.TrackID
IsPeerConnectionConnected bool
Logger logger.Logger
OnSuccess func(t types.LocalMediaTrack)
}
type PublicationMonitor struct {
@@ -168,12 +167,6 @@ func (p *PublicationMonitor) update() {
return
}
if pub.isStart && p.publishedTrack != nil {
if p.params.OnSuccess != nil {
p.params.OnSuccess(p.publishedTrack)
}
}
if (pub.isStart && p.publishedTrack == nil) || (!pub.isStart && p.publishedTrack != nil) {
// put it back as the condition is not satisfied
p.desiredPublishes.PushFront(pub)