From 469f1cd073b2806c463a94f3831bba5ae4aa6748 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 15 Jul 2023 12:43:05 +0530 Subject: [PATCH 01/10] Minor changes to publisher bool. (#1880) * Minor changes to publisher bool. * address feedback --- pkg/rtc/participant.go | 61 ++++++++++++++++++++++-------------------- pkg/rtc/room.go | 3 +-- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 5de6904c1..e0d5214d4 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -370,13 +370,14 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio return false } - p.GetLogger().Infow("updating participant permission", "permission", permission) + p.params.Logger.Infow("updating participant permission", "permission", permission) video.UpdateFromPermission(permission) p.dirty.Store(true) canPublish := video.GetCanPublish() canSubscribe := video.GetCanSubscribe() + onParticipantUpdate := p.onParticipantUpdate onClaimsChanged := p.onClaimsChanged @@ -387,13 +388,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio // publish permission has been revoked then remove offending tracks for _, track := range p.GetPublishedTracks() { if !video.GetCanPublishSource(track.Source()) { - p.RemovePublishedTrack(track, false, false) - if p.ProtocolVersion().SupportsUnpublish() { - p.sendTrackUnpublished(track.ID()) - } else { - // for older clients that don't support unpublish, mute to avoid them sending data - p.sendTrackMuted(track.ID(), true) - } + p.removePublishedTrack(track) } } @@ -1190,22 +1185,24 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { } func (p *ParticipantImpl) setIsPublisher(isPublisher bool) { - if p.isPublisher.Swap(isPublisher) != isPublisher { - p.lock.Lock() - p.requireBroadcast = true - p.lock.Unlock() + if p.isPublisher.Swap(isPublisher) == isPublisher { + return + } - p.dirty.Store(true) + p.lock.Lock() + p.requireBroadcast = true + p.lock.Unlock() - // trigger update as well if participant is already fully connected - if p.State() == livekit.ParticipantInfo_ACTIVE { - p.lock.RLock() - onParticipantUpdate := p.onParticipantUpdate - p.lock.RUnlock() + p.dirty.Store(true) - if onParticipantUpdate != nil { - onParticipantUpdate(p) - } + // trigger update as well if participant is already fully connected + if p.State() == livekit.ParticipantInfo_ACTIVE { + p.lock.RLock() + onParticipantUpdate := p.onParticipantUpdate + p.lock.RUnlock() + + if onParticipantUpdate != nil { + onParticipantUpdate(p) } } } @@ -1220,6 +1217,16 @@ func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) err }) } +func (p *ParticipantImpl) removePublishedTrack(track types.MediaTrack) { + p.RemovePublishedTrack(track, false, false) + if p.ProtocolVersion().SupportsUnpublish() { + p.sendTrackUnpublished(track.ID()) + } else { + // for older clients that don't support unpublish, mute to avoid them sending data + p.sendTrackMuted(track.ID(), true) + } +} + // when a new remoteTrack is created, creates a Track and adds it to room func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { if p.IsDisconnected() { @@ -1242,12 +1249,12 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w p.params.Logger.Warnw("no permission to publish mediaTrack", nil, "source", publishedTrack.Source(), ) + p.removePublishedTrack(publishedTrack) return } - if !p.IsPublisher() { - p.setIsPublisher(true) - } + p.setIsPublisher(true) + p.dirty.Store(true) p.params.Logger.Infow("mediaTrack published", "kind", track.Kind().String(), @@ -1258,8 +1265,6 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w "mime", track.Codec().MimeType, ) - p.dirty.Store(true) - if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() { p.lock.RLock() onTrackUpdated := p.onTrackUpdated @@ -1300,9 +1305,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload) } - if !p.IsPublisher() { - p.setIsPublisher(true) - } + p.setIsPublisher(true) } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index a90b90676..7602659ac 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -828,7 +828,6 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) { // publish participant update, since track state is changed r.broadcastParticipantState(participant, broadcastOptions{skipSource: true}) - r.protoProxy.MarkDirty(false) r.lock.RLock() // subscribe all existing participants to this MediaTrack @@ -888,7 +887,6 @@ func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) { r.trackManager.RemoveTrack(track) - r.protoProxy.MarkDirty(false) if !p.IsClosed() { r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) } @@ -898,6 +896,7 @@ func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTra } func (r *Room) onParticipantUpdate(p types.LocalParticipant) { + r.protoProxy.MarkDirty(false) // immediately notify when permissions or metadata changed r.broadcastParticipantState(p, broadcastOptions{immediate: true}) if r.onParticipantChanged != nil { From 06d8459234b04a20abe8475ab335a35ac21c0451 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 15 Jul 2023 14:09:22 +0530 Subject: [PATCH 02/10] Pick up proto proxy no update on no change (#1881) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 894e44d78..fad272d8d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 - github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962 + github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427 github.com/livekit/psrpc v0.3.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index dba27c2ea..1c95ae879 100644 --- a/go.sum +++ b/go.sum @@ -124,8 +124,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U= github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34= -github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962 h1:y+rtYNMGmvpEgQlNG/wOUO16S497ygh83wQSUBKHpG4= -github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU= +github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427 h1:NyNdCT8+glCnGGdkgbEO2rpX1iRscwp4HCf1u21Clzo= +github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU= github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= From 11e1eb00fa5d7fedc89ff9b2700738b259707e51 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 16 Jul 2023 23:28:20 +0530 Subject: [PATCH 03/10] Attempt to avoid out-of-order max subscribed layer notifications. (#1882) * Check for request layer lock only in the goroutine * check before sending PLI * max layer notifier worker * test cleanup * clean up * do notification in the callback --- pkg/sfu/downtrack.go | 192 +++++++++++------- pkg/sfu/forwarder.go | 69 ++++--- pkg/sfu/forwarder_test.go | 34 ++-- pkg/sfu/streamallocator/streamallocator.go | 4 + .../dependencydescriptor.go | 20 -- pkg/sfu/videolayerselector/simulcast.go | 13 -- .../videolayerselector/videolayerselector.go | 3 - pkg/sfu/videolayerselector/vp9.go | 20 -- 8 files changed, 183 insertions(+), 172 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c6dbb34fd..e3dd65a04 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -42,6 +42,8 @@ type TrackSender interface { HandleRTCPSenderReportData(payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error } +// ------------------------------------------------------------------- + const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 @@ -60,6 +62,8 @@ const ( maxPaddingOnMuteDuration = 5 * time.Second ) +// ------------------------------------------------------------------- + var ( ErrUnknownKind = errors.New("unknown kind of codec") ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") @@ -197,14 +201,13 @@ type DownTrack struct { transceiver *webrtc.RTPTransceiver writeStream webrtc.TrackLocalWriter rtcpReader *buffer.RTCPReader - onCloseHandler func(willBeResumed bool) - onBinding func(error) listenerLock sync.RWMutex receiverReportListeners []ReceiverReportListener - bindLock sync.Mutex - bound atomic.Bool + bindLock sync.Mutex + bound atomic.Bool + onBinding func(error) isClosed atomic.Bool connected atomic.Bool @@ -235,14 +238,13 @@ type DownTrack struct { pacer pacer.Pacer - // update stats - onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) + maxLayerNotifierCh chan struct{} - // when max subscribed layer changes + cbMu sync.RWMutex + onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32) - - // update rtt - onRttUpdate func(dt *DownTrack, rtt uint32) + onRttUpdate func(dt *DownTrack, rtt uint32) + onCloseHandler func(willBeResumed bool) } // NewDownTrack returns a DownTrack. @@ -266,17 +268,18 @@ func NewDownTrack( } d := &DownTrack{ - logger: logger, - id: r.TrackID(), - subscriberID: subID, - maxTrack: mt, - streamID: r.StreamID(), - bufferFactory: bf, - receiver: r, - upstreamCodecs: codecs, - kind: kind, - codec: codecs[0].RTPCodecCapability, - pacer: pacer, + logger: logger, + id: r.TrackID(), + subscriberID: subID, + maxTrack: mt, + streamID: r.StreamID(), + bufferFactory: bf, + receiver: r, + upstreamCodecs: codecs, + kind: kind, + codec: codecs[0].RTPCodecCapability, + pacer: pacer, + maxLayerNotifierCh: make(chan struct{}, 20), } d.forwarder = NewForwarder( d.kind, @@ -307,11 +310,15 @@ func NewDownTrack( Logger: d.logger.WithValues("direction", "down"), }) d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) { - if d.onStatsUpdate != nil { - d.onStatsUpdate(d, stat) + if onStatsUpdate := d.getOnStatsUpdate(); onStatsUpdate != nil { + onStatsUpdate(d, stat) } }) + if d.kind == webrtc.RTPCodecTypeVideo { + go d.maxLayerNotifierWorker() + } + return d, nil } @@ -541,6 +548,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { if d.IsClosed() || layer == buffer.InvalidLayerSpatial { return } + interval := 2 * d.rtpStats.GetRtt() if interval < keyFrameIntervalMin { interval = keyFrameIntervalMin @@ -550,7 +558,13 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) defer ticker.Stop() + for { + locked, _ := d.forwarder.CheckSync() + if locked { + return + } + if d.connected.Load() { d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) d.receiver.SendPLI(layer, false) @@ -565,6 +579,34 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } } +func (d *DownTrack) postMaxLayerNotifierEvent() { + if d.IsClosed() { + return + } + + select { + case d.maxLayerNotifierCh <- struct{}{}: + default: + d.logger.Warnw("max layer notifier event queue full", nil) + } +} + +func (d *DownTrack) maxLayerNotifierWorker() { + more := true + for more { + _, more = <-d.maxLayerNotifierCh + + maxLayerSpatial := buffer.InvalidLayerSpatial + if more { + maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial() + } + if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil { + d.logger.Infow("max subscribed layer changed", maxLayerSpatial) + onMaxSubscribedLayerChanged(d, maxLayerSpatial) + } + } +} + // WriteRTP writes an RTP Packet to the DownTrack func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if !d.bound.Load() || !d.connected.Load() { @@ -725,17 +767,17 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa // Mute enables or disables media forwarding - subscriber triggered func (d *DownTrack) Mute(muted bool) { - changed, maxLayer := d.forwarder.Mute(muted) - d.handleMute(muted, false, changed, maxLayer) + changed := d.forwarder.Mute(muted) + d.handleMute(muted, changed) } // PubMute enables or disables media forwarding - publisher side func (d *DownTrack) PubMute(pubMuted bool) { - changed, maxLayer := d.forwarder.PubMute(pubMuted) - d.handleMute(pubMuted, true, changed, maxLayer) + changed := d.forwarder.PubMute(pubMuted) + d.handleMute(pubMuted, changed) } -func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer buffer.VideoLayer) { +func (d *DownTrack) handleMute(muted bool, changed bool) { if !changed { return } @@ -762,18 +804,7 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer bu // Note that while publisher mute is active, subscriber changes can also happen // and that could turn on/off layers on publisher side. // - if !isPub && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - notifyLayer := buffer.InvalidLayerSpatial - if !muted { - // - // When unmuting, don't wait for layer lock as - // client might need to be notified to start layers - // before locking can happen in the forwarder. - // - notifyLayer = maxLayer.Spatial - } - d.onMaxSubscribedLayerChanged(d, notifyLayer) - } + d.postMaxLayerNotifierEvent() if sal := d.getStreamAllocatorListener(); sal != nil { sal.OnSubscriptionChanged(d) @@ -856,12 +887,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtpStats.Stop() d.logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) - if d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - d.onMaxSubscribedLayerChanged(d, buffer.InvalidLayerSpatial) - } + close(d.maxLayerNotifierCh) - if d.onCloseHandler != nil { - d.onCloseHandler(!flush) + if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { + onCloseHandler(!flush) } d.stopKeyFrameRequester() @@ -869,21 +898,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { - changed, maxLayer, currentLayer := d.forwarder.SetMaxSpatialLayer(spatialLayer) + changed, maxLayer := d.forwarder.SetMaxSpatialLayer(spatialLayer) if !changed { return } - if d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo && maxLayer.SpatialGreaterThanOrEqual(currentLayer) { - // - // Notify when new max is - // 1. Equal to current -> already locked to the new max - // 2. Greater than current -> two scenarios - // a. is higher than previous max -> client may need to start higher layer before forwarder can lock - // b. is lower than previous max -> client can stop higher layer(s) - // - d.onMaxSubscribedLayerChanged(d, maxLayer.Spatial) - } + d.postMaxLayerNotifierEvent() if sal := d.getStreamAllocatorListener(); sal != nil { sal.OnSubscribedLayerChanged(d, maxLayer) @@ -891,7 +911,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { } func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { - changed, maxLayer, _ := d.forwarder.SetMaxTemporalLayer(temporalLayer) + changed, maxLayer := d.forwarder.SetMaxTemporalLayer(temporalLayer) if !changed { return } @@ -973,10 +993,23 @@ func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitra // OnCloseHandler method to be called on remote tracked removed func (d *DownTrack) OnCloseHandler(fn func(willBeResumed bool)) { + d.cbMu.Lock() + defer d.cbMu.Unlock() + d.onCloseHandler = fn } +func (d *DownTrack) getOnCloseHandler() func(willBeResumed bool) { + d.cbMu.RLock() + defer d.cbMu.RUnlock() + + return d.onCloseHandler +} + func (d *DownTrack) OnBinding(fn func(error)) { + d.bindLock.Lock() + defer d.bindLock.Unlock() + d.onBinding = fn } @@ -988,17 +1021,47 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) { } func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) { + d.cbMu.Lock() + defer d.cbMu.Unlock() + d.onStatsUpdate = fn } +func (d *DownTrack) getOnStatsUpdate() func(dt *DownTrack, stat *livekit.AnalyticsStat) { + d.cbMu.RLock() + defer d.cbMu.RUnlock() + + return d.onStatsUpdate +} + func (d *DownTrack) OnRttUpdate(fn func(dt *DownTrack, rtt uint32)) { + d.cbMu.Lock() + defer d.cbMu.Unlock() + d.onRttUpdate = fn } +func (d *DownTrack) getOnRttUpdate() func(dt *DownTrack, rtt uint32) { + d.cbMu.RLock() + defer d.cbMu.RUnlock() + + return d.onRttUpdate +} + func (d *DownTrack) OnMaxLayerChanged(fn func(dt *DownTrack, layer int32)) { + d.cbMu.Lock() + defer d.cbMu.Unlock() + d.onMaxSubscribedLayerChanged = fn } +func (d *DownTrack) getOnMaxLayerChanged() func(dt *DownTrack, layer int32) { + d.cbMu.RLock() + defer d.cbMu.RUnlock() + + return d.onMaxSubscribedLayerChanged +} + func (d *DownTrack) IsDeficient() bool { return d.forwarder.IsDeficient() } @@ -1355,8 +1418,8 @@ func (d *DownTrack) handleRTCP(bytes []byte) { d.sequencer.setRTT(rttToReport) } - if d.onRttUpdate != nil { - d.onRttUpdate(d, rttToReport) + if onRttUpdate := d.getOnRttUpdate(); onRttUpdate != nil { + onRttUpdate(d, rttToReport) } } } @@ -1744,15 +1807,8 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int, } if spmd.tp != nil { - if spmd.tp.isSwitchingToMaxSpatial && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - d.onMaxSubscribedLayerChanged(d, spmd.tp.maxSpatialLayer) - } - - if spmd.tp.isSwitchingToRequestSpatial { - locked, _ := d.forwarder.CheckSync() - if locked { - d.stopKeyFrameRequester() - } + if spmd.tp.isSwitching { + d.postMaxLayerNotifierEvent() } if spmd.tp.isResuming { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 1da413ee8..23a8d694d 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -129,15 +129,13 @@ func (v VideoTransition) String() string { // ------------------------------------------------------------------- type TranslationParams struct { - shouldDrop bool - isResuming bool - isSwitchingToRequestSpatial bool - isSwitchingToMaxSpatial bool - maxSpatialLayer int32 - rtp *TranslationParamsRTP - codecBytes []byte - ddBytes []byte - marker bool + shouldDrop bool + isResuming bool + isSwitching bool + rtp *TranslationParamsRTP + codecBytes []byte + ddBytes []byte + marker bool } // ------------------------------------------------------------------- @@ -360,12 +358,12 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.refTSOffset = state.RefTSOffset } -func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) { +func (f *Forwarder) Mute(muted bool) bool { f.lock.Lock() defer f.lock.Unlock() if f.muted == muted { - return false, f.vls.GetMax() + return false } // Do not mute when paused due to bandwidth limitation. @@ -384,7 +382,7 @@ func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) { // the case of intentional mute. if muted && f.isDeficientLocked() && f.lastAllocation.PauseReason == VideoPauseReasonBandwidth { f.logger.Infow("ignoring forwarder mute, paused due to congestion") - return false, f.vls.GetMax() + return false } f.logger.Debugw("setting forwarder mute", "muted", muted) @@ -395,7 +393,7 @@ func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) { f.resyncLocked() } - return true, f.vls.GetMax() + return true } func (f *Forwarder) IsMuted() bool { @@ -405,12 +403,12 @@ func (f *Forwarder) IsMuted() bool { return f.muted } -func (f *Forwarder) PubMute(pubMuted bool) (bool, buffer.VideoLayer) { +func (f *Forwarder) PubMute(pubMuted bool) bool { f.lock.Lock() defer f.lock.Unlock() if f.pubMuted == pubMuted { - return false, f.vls.GetMax() + return false } f.logger.Debugw("setting forwarder pub mute", "pubMuted", pubMuted) @@ -432,7 +430,7 @@ func (f *Forwarder) PubMute(pubMuted bool) (bool, buffer.VideoLayer) { } } - return true, f.vls.GetMax() + return true } func (f *Forwarder) IsPubMuted() bool { @@ -449,17 +447,17 @@ func (f *Forwarder) IsAnyMuted() bool { return f.muted || f.pubMuted } -func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLayer, buffer.VideoLayer) { +func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLayer) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { - return false, buffer.InvalidLayer, buffer.InvalidLayer + return false, buffer.InvalidLayer } existingMax := f.vls.GetMax() if spatialLayer == existingMax.Spatial { - return false, existingMax, f.vls.GetCurrent() + return false, existingMax } f.logger.Debugw("setting max spatial layer", "layer", spatialLayer) @@ -467,20 +465,20 @@ func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLa f.clearParkedLayer() - return true, f.vls.GetMax(), f.vls.GetCurrent() + return true, f.vls.GetMax() } -func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.VideoLayer, buffer.VideoLayer) { +func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.VideoLayer) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { - return false, buffer.InvalidLayer, buffer.InvalidLayer + return false, buffer.InvalidLayer } existingMax := f.vls.GetMax() if temporalLayer == existingMax.Temporal { - return false, existingMax, f.vls.GetCurrent() + return false, existingMax } f.logger.Debugw("setting max temporal layer", "layer", temporalLayer) @@ -488,7 +486,7 @@ func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.Video f.clearParkedLayer() - return true, f.vls.GetMax(), f.vls.GetCurrent() + return true, f.vls.GetMax() } func (f *Forwarder) MaxLayer() buffer.VideoLayer { @@ -512,6 +510,25 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer { return f.vls.GetTarget() } +func (f *Forwarder) GetMaxSubscribedSpatial() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + layer := buffer.InvalidLayerSpatial // covers muted case + if !f.muted { + layer = f.vls.GetMax().Spatial + + // If current is higher, mark the current layer as max subscribed layer + // to prevent the current layer from stopping before forwarder switches + // to the new and lower max layer, + if layer < f.vls.GetCurrent().Spatial { + layer = f.vls.GetCurrent().Spatial + } + } + + return layer +} + func (f *Forwarder) isDeficientLocked() bool { return f.lastAllocation.IsDeficient } @@ -1690,9 +1707,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } tp.isResuming = result.IsResuming - tp.isSwitchingToRequestSpatial = result.IsSwitchingToRequestSpatial - tp.isSwitchingToMaxSpatial = result.IsSwitchingToMaxSpatial - tp.maxSpatialLayer = result.MaxSpatialLayer + tp.isSwitching = result.IsSwitching tp.ddBytes = result.DependencyDescriptorExtension tp.marker = result.RTPMarker diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index a0e340800..2da1c524b 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -26,13 +26,13 @@ func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo func TestForwarderMute(t *testing.T) { f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) require.False(t, f.IsMuted()) - muted, _ := f.Mute(false) + muted := f.Mute(false) require.False(t, muted) // no change in mute state require.False(t, f.IsMuted()) - muted, _ = f.Mute(true) + muted = f.Mute(true) require.True(t, muted) require.True(t, f.IsMuted()) - muted, _ = f.Mute(false) + muted = f.Mute(false) require.True(t, muted) require.False(t, f.IsMuted()) } @@ -45,15 +45,13 @@ func TestForwarderLayersAudio(t *testing.T) { require.Equal(t, buffer.InvalidLayer, f.CurrentLayer()) require.Equal(t, buffer.InvalidLayer, f.TargetLayer()) - changed, maxLayer, currentLayer := f.SetMaxSpatialLayer(1) + changed, maxLayer := f.SetMaxSpatialLayer(1) require.False(t, changed) require.Equal(t, buffer.InvalidLayer, maxLayer) - require.Equal(t, buffer.InvalidLayer, currentLayer) - changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(1) + changed, maxLayer = f.SetMaxTemporalLayer(1) require.False(t, changed) require.Equal(t, buffer.InvalidLayer, maxLayer) - require.Equal(t, buffer.InvalidLayer, currentLayer) require.Equal(t, buffer.InvalidLayer, f.MaxLayer()) } @@ -72,12 +70,11 @@ func TestForwarderLayersVideo(t *testing.T) { Spatial: buffer.DefaultMaxLayerSpatial, Temporal: buffer.DefaultMaxLayerTemporal, } - changed, maxLayer, currentLayer := f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial) + changed, maxLayer := f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial) require.True(t, changed) require.Equal(t, expectedLayers, maxLayer) - require.Equal(t, buffer.InvalidLayer, currentLayer) - changed, maxLayer, currentLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1) + changed, maxLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1) require.True(t, changed) expectedLayers = buffer.VideoLayer{ Spatial: buffer.DefaultMaxLayerSpatial - 1, @@ -85,21 +82,18 @@ func TestForwarderLayersVideo(t *testing.T) { } require.Equal(t, expectedLayers, maxLayer) require.Equal(t, expectedLayers, f.MaxLayer()) - require.Equal(t, buffer.InvalidLayer, currentLayer) f.vls.SetCurrent(buffer.VideoLayer{Spatial: 0, Temporal: 1}) - changed, maxLayer, currentLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1) + changed, maxLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1) require.False(t, changed) require.Equal(t, expectedLayers, maxLayer) require.Equal(t, expectedLayers, f.MaxLayer()) - require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer) - changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal) + changed, maxLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal) require.False(t, changed) require.Equal(t, expectedLayers, maxLayer) - require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer) - changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal - 1) + changed, maxLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal - 1) require.True(t, changed) expectedLayers = buffer.VideoLayer{ Spatial: buffer.DefaultMaxLayerSpatial - 1, @@ -107,7 +101,6 @@ func TestForwarderLayersVideo(t *testing.T) { } require.Equal(t, expectedLayers, maxLayer) require.Equal(t, expectedLayers, f.MaxLayer()) - require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer) } func TestForwarderAllocateOptimal(t *testing.T) { @@ -1404,8 +1397,8 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { marshalledVP8, err := expectedVP8.Marshal() require.NoError(t, err) expectedTP = TranslationParams{ - isSwitchingToMaxSpatial: true, - isResuming: true, + isSwitching: true, + isResuming: true, rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, sequenceNumber: 23333, @@ -1716,8 +1709,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { marshalledVP8, err = expectedVP8.Marshal() require.NoError(t, err) expectedTP = TranslationParams{ - isSwitchingToMaxSpatial: true, - maxSpatialLayer: 1, + isSwitching: true, rtp: &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, sequenceNumber: 23339, diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 475b38e94..f0c17db9e 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -548,6 +548,10 @@ func (s *StreamAllocator) postEvent(event Event) { func (s *StreamAllocator) processEvents() { for event := range s.eventCh { + if s.isStopped.Load() { + break + } + s.handleEvent(&event) } diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 7b8c95310..34f8b0b55 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -204,26 +204,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.previousActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets) - - if d.currentLayer.Spatial == d.requestSpatial { - result.IsSwitchingToRequestSpatial = true - } - if d.currentLayer.Spatial == d.maxLayer.Spatial { - result.IsSwitchingToMaxSpatial = true - result.MaxSpatialLayer = d.currentLayer.Spatial - d.logger.Infow( - "reached max layer", - "previous", d.previousLayer, - "current", d.currentLayer, - "previousTarget", d.previousTargetLayer, - "target", d.targetLayer, - "max", d.maxLayer, - "layer", fd.SpatialId, - "req", d.requestSpatial, - "maxSeen", d.maxSeenLayer, - "feed", extPkt.Packet.SSRC, - ) - } } ddExtension := &dede.DependencyDescriptorExtension{ diff --git a/pkg/sfu/videolayerselector/simulcast.go b/pkg/sfu/videolayerselector/simulcast.go index 4d0c9294d..06e0bad72 100644 --- a/pkg/sfu/videolayerselector/simulcast.go +++ b/pkg/sfu/videolayerselector/simulcast.go @@ -35,19 +35,6 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL result.IsResuming = true } - if s.currentLayer.Spatial == s.requestSpatial { - result.IsSwitchingToRequestSpatial = true - } - - if s.currentLayer.Spatial >= s.maxLayer.Spatial { - result.IsSwitchingToMaxSpatial = true - result.MaxSpatialLayer = s.currentLayer.Spatial - if reason != "" { - reason += ", " - } - reason += "reached max layer" - } - if reason != "" { s.logger.Infow( reason, diff --git a/pkg/sfu/videolayerselector/videolayerselector.go b/pkg/sfu/videolayerselector/videolayerselector.go index ffbb9f42c..f17d745d1 100644 --- a/pkg/sfu/videolayerselector/videolayerselector.go +++ b/pkg/sfu/videolayerselector/videolayerselector.go @@ -10,9 +10,6 @@ type VideoLayerSelectorResult struct { IsRelevant bool IsSwitching bool IsResuming bool - IsSwitchingToRequestSpatial bool - IsSwitchingToMaxSpatial bool - MaxSpatialLayer int32 RTPMarker bool DependencyDescriptorExtension []byte } diff --git a/pkg/sfu/videolayerselector/vp9.go b/pkg/sfu/videolayerselector/vp9.go index ed1a165b6..508cdf289 100644 --- a/pkg/sfu/videolayerselector/vp9.go +++ b/pkg/sfu/videolayerselector/vp9.go @@ -80,26 +80,6 @@ func (v *VP9) Select(extPkt *buffer.ExtPacket, _layer int32) (result VideoLayerS result.IsResuming = true } - if v.currentLayer.Spatial != v.requestSpatial && updatedLayer.Spatial == v.requestSpatial { - result.IsSwitchingToRequestSpatial = true - } - - if v.currentLayer.Spatial != v.maxLayer.Spatial && updatedLayer.Spatial == v.maxLayer.Spatial { - result.IsSwitchingToMaxSpatial = true - result.MaxSpatialLayer = updatedLayer.Spatial - v.logger.Infow( - "reached max layer", - "current", v.currentLayer, - "updated", updatedLayer, - "target", v.targetLayer, - "max", v.maxLayer, - "layer", extPkt.VideoLayer.Spatial, - "req", v.requestSpatial, - "maxSeen", v.maxSeenLayer, - "feed", extPkt.Packet.SSRC, - ) - } - v.previousLayer = v.currentLayer v.currentLayer = updatedLayer } From e6a47a24a726caca1aab58b45fa4df20c39f821d Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 16 Jul 2023 13:24:06 -0700 Subject: [PATCH 04/10] Update livekit deps (#1869) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index fad272d8d..34d36beea 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,8 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.4 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 - github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427 + github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a + github.com/livekit/protocol v1.5.10 github.com/livekit/psrpc v0.3.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 1c95ae879..417006f39 100644 --- a/go.sum +++ b/go.sum @@ -122,10 +122,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U= -github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34= -github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427 h1:NyNdCT8+glCnGGdkgbEO2rpX1iRscwp4HCf1u21Clzo= -github.com/livekit/protocol v1.5.10-0.20230715082801-9d0d6c9e9427/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU= +github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A= +github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/protocol v1.5.10 h1:lnaHMa27cbRkHybi/jvOVuRSaLsho2wCLRjKiC6ce2Y= +github.com/livekit/protocol v1.5.10/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU= github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew= github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= From 7dc60bb1bf59394dcd05d5f8ff9046aabd9c4834 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 16 Jul 2023 13:40:53 -0700 Subject: [PATCH 05/10] start reading signal messages before session handler finishes (#1883) * start reading signal messages before session handler finishes * fix err scope --- pkg/service/signal.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/service/signal.go b/pkg/service/signal.go index c6907dadb..aa6ab5578 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -139,9 +139,6 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe "connID", ss.ConnectionId, ) - reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId)) - defer reqChan.Close() - sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{ Logger: l, Stream: stream, @@ -149,6 +146,19 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe Writer: signalResponseMessageWriter{}, ConnectionID: livekit.ConnectionID(ss.ConnectionId), }) + reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId)) + + go func() { + err := routing.CopySignalStreamToMessageChannel[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]( + stream, + reqChan, + signalRequestMessageReader{}, + r.config, + ) + l.Infow("signal stream closed", "error", err) + + reqChan.Close() + }() err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { @@ -156,9 +166,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe return } - err = routing.CopySignalStreamToMessageChannel[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest](stream, reqChan, signalRequestMessageReader{}, r.config) - l.Infow("signal stream closed", "error", err) - + stream.Hijack() return } From 8784449fc6c476f45889189e85478e15139a998e Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 16 Jul 2023 15:03:47 -0700 Subject: [PATCH 06/10] manually cancel signal relay context (#1884) --- pkg/service/signal.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/service/signal.go b/pkg/service/signal.go index aa6ab5578..2c28465d8 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -112,12 +112,6 @@ type signalService struct { } func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]) (err error) { - // copy the context to prevent a race between the session handler closing - // and the delivery of any parting messages from the client. take care to - // copy the incoming rpc headers to avoid dropping any session vars. - ctx, cancel := context.WithCancel(metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))) - defer cancel() - req, ok := <-stream.Channel() if !ok { return nil @@ -139,6 +133,11 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe "connID", ss.ConnectionId, ) + // copy the context to prevent a race between the session handler closing + // and the delivery of any parting messages from the client. take care to + // copy the incoming rpc headers to avoid dropping any session vars. + ctx, cancel := context.WithCancel(metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))) + sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{ Logger: l, Stream: stream, @@ -158,11 +157,13 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe l.Infow("signal stream closed", "error", err) reqChan.Close() + cancel() }() err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { l.Errorw("could not handle new participant", err) + cancel() return } From 5535916ff27d985430cf95acae1f4007c3676d1e Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 16 Jul 2023 19:01:53 -0700 Subject: [PATCH 07/10] prevent signal context from closing before room setup finishes (#1885) --- pkg/service/signal.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 2c28465d8..299df055a 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -138,6 +138,12 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe // copy the incoming rpc headers to avoid dropping any session vars. ctx, cancel := context.WithCancel(metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))) + // wait until room setup finishes to cancel the session context even if the + // stream has closed. this is required to complete setup i/o after room api + // has discarded the temp client. + sessionHandlerDone := make(chan struct{}) + defer close(sessionHandlerDone) + sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{ Logger: l, Stream: stream, @@ -157,13 +163,14 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe l.Infow("signal stream closed", "error", err) reqChan.Close() + + <-sessionHandlerDone cancel() }() err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { l.Errorw("could not handle new participant", err) - cancel() return } From 5d1d454a9861226dace41f2ee8b7ea86e361b9f2 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 16 Jul 2023 20:05:41 -0700 Subject: [PATCH 08/10] Fix missed label arg in logger (#1886) --- pkg/sfu/downtrack.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e3dd65a04..e21a001f9 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -601,7 +601,7 @@ func (d *DownTrack) maxLayerNotifierWorker() { maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial() } if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil { - d.logger.Infow("max subscribed layer changed", maxLayerSpatial) + d.logger.Infow("max subscribed layer changed", "maxLayerSpatial", maxLayerSpatial) onMaxSubscribedLayerChanged(d, maxLayerSpatial) } } From 9f3c975b1c415b4bd16310a77c12ed4c0c814c7b Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 16 Jul 2023 20:24:11 -0700 Subject: [PATCH 09/10] leave signal context open after stream closes (#1887) --- pkg/service/signal.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 299df055a..3de03c1ae 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -133,17 +133,6 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe "connID", ss.ConnectionId, ) - // copy the context to prevent a race between the session handler closing - // and the delivery of any parting messages from the client. take care to - // copy the incoming rpc headers to avoid dropping any session vars. - ctx, cancel := context.WithCancel(metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))) - - // wait until room setup finishes to cancel the session context even if the - // stream has closed. this is required to complete setup i/o after room api - // has discarded the temp client. - sessionHandlerDone := make(chan struct{}) - defer close(sessionHandlerDone) - sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{ Logger: l, Stream: stream, @@ -163,11 +152,13 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe l.Infow("signal stream closed", "error", err) reqChan.Close() - - <-sessionHandlerDone - cancel() }() + // copy the context to prevent a race between the session handler closing + // and the delivery of any parting messages from the client. take care to + // copy the incoming rpc headers to avoid dropping any session vars. + ctx := metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context())) + err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { l.Errorw("could not handle new participant", err) From f41b93657e6e9c3663445ea9563adb5a806d4e29 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 18 Jul 2023 09:14:41 +0530 Subject: [PATCH 10/10] Log a bit more in sender report warp report. (#1888) --- pkg/sfu/buffer/rtpstats.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 8bf34b958..1774eb28c 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -888,8 +888,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) tsCycles++ } nowRTPExt := getExtTS(nowRTP, tsCycles) + var nowRTPExtUsingRate uint64 if calculatedClockRate != 0 { - nowRTPExtUsingRate := r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds()) + nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds()) if nowRTPExtUsingRate > nowRTPExt { nowRTPExt = nowRTPExtUsingRate nowRTP = uint32(nowRTPExtUsingRate) @@ -939,6 +940,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) "reportDrift", reportDriftResult.String(), "highestTS", r.highestTS, "highestTime", r.highestTime.String(), + "calculatedClockRate", calculatedClockRate, + "nowRTPExt", nowRTPExt, + "nowRTPExtUsingRate", nowRTPExtUsingRate, ) }