diff --git a/go.sum b/go.sum index 2abd27364..87149ee3b 100644 --- a/go.sum +++ b/go.sum @@ -233,10 +233,6 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c h1:wdzwTJjCpzy2FDmwdyVVGVa4+U9iv3E4Jy9qUDe/ubw= github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.3.3-0.20230124045313-d208f342983a h1:j30ZpR5TN1XRd4bPiO1xqGfYVVNxqh6QtI6bBSlpk4U= -github.com/livekit/protocol v1.3.3-0.20230124045313-d208f342983a/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= -github.com/livekit/protocol v1.3.3-0.20230127105819-b75d0aed9e9c h1:6/fmFL/CZWexH9hgcn/AJ4M0Xag98cwUXCs9j2hnoJY= -github.com/livekit/protocol v1.3.3-0.20230127105819-b75d0aed9e9c/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e h1:T+qUuDHioL5Q5Gzjun9tB65oaC9+zWmeWlcvpG+iilc= github.com/livekit/protocol v1.3.3-0.20230127213545-10b378e3bc1e/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/psrpc v0.2.4 h1:Fdxq56uJAIpRHCTgJsvp7ozw51dKtUmD3nxSXq9pCLs= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index b4a33dd0b..5a371edde 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -534,9 +534,13 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() { p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...) p.pendingTracksLock.Unlock() - for _, t := range addedTracks { - p.handleTrackPublished(t) - } + // launch callbacks in goroutine since they could block + // callbacks handle webhooks as well as db persistence + go func() { + for _, t := range addedTracks { + p.handleTrackPublished(t) + } + }() } func (p *ParticipantImpl) removeMutedTrackNotFired(mt *MediaTrack) { @@ -579,7 +583,7 @@ func (p *ParticipantImpl) SetMigrateInfo( for _, t := range mediaTracks { ti := t.GetTrack() - p.supervisor.AddPublication(livekit.TrackID(ti.Sid), nil) + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} @@ -1404,14 +1408,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } p.params.Telemetry.TrackPublishRequested(context.Background(), p.ID(), p.Identity(), ti) - p.supervisor.AddPublication(livekit.TrackID(ti.Sid), func(t types.LocalMediaTrack) { - p.params.Telemetry.TrackPublished( - context.Background(), - t.PublisherID(), - t.PublisherIdentity(), - t.ToProto(), - ) - }) + p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { if p.pendingTracks[req.Cid] == nil { @@ -1643,6 +1640,15 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) { if onTrackPublished != nil { onTrackPublished(p, track) } + + // send webhook after callbacks are complete, persistence and state handling happens + // in `onTrackPublished` cb + p.params.Telemetry.TrackPublished( + context.Background(), + p.ID(), + p.Identity(), + track.ToProto(), + ) } func (p *ParticipantImpl) hasPendingMigratedTrack() bool { diff --git a/pkg/rtc/supervisor/participant_supervisor.go b/pkg/rtc/supervisor/participant_supervisor.go index 0d9991294..99126739c 100644 --- a/pkg/rtc/supervisor/participant_supervisor.go +++ b/pkg/rtc/supervisor/participant_supervisor.go @@ -75,7 +75,7 @@ func (p *ParticipantSupervisor) SetPublisherPeerConnectionConnected(isConnected p.lock.Unlock() } -func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, onSuccess func(track types.LocalMediaTrack)) { +func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) { p.lock.Lock() pm, ok := p.publications[trackID] if !ok { @@ -85,7 +85,6 @@ func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, onSucces TrackID: trackID, IsPeerConnectionConnected: p.isPublisherConnected, Logger: p.params.Logger, - OnSuccess: onSuccess, }, ), } diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go index 7bc013510..b51e8efd1 100644 --- a/pkg/rtc/supervisor/publication_monitor.go +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -28,7 +28,6 @@ type PublicationMonitorParams struct { TrackID livekit.TrackID IsPeerConnectionConnected bool Logger logger.Logger - OnSuccess func(t types.LocalMediaTrack) } type PublicationMonitor struct { @@ -168,12 +167,6 @@ func (p *PublicationMonitor) update() { return } - if pub.isStart && p.publishedTrack != nil { - if p.params.OnSuccess != nil { - p.params.OnSuccess(p.publishedTrack) - } - } - if (pub.isStart && p.publishedTrack == nil) || (!pub.isStart && p.publishedTrack != nil) { // put it back as the condition is not satisfied p.desiredPublishes.PushFront(pub)