diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index f6838c2a6..034a6617b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -541,10 +541,6 @@ func (d *DownTrack) switchSpatialLayer(targetLayer int32) error { return nil } -func (d *DownTrack) SwitchSpatialLayerDone(layer int32) { - d.currentSpatialLayer.set(layer) -} - func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error) { if d.trackType == SimulcastDownTrack { currentLayer := uint16(d.CurrentSpatialLayer()) @@ -944,13 +940,49 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { } func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) error { - // Check if packet SSRC is different from before - // if true, the video source changed + tsl := d.TargetSpatialLayer() csl := d.CurrentSpatialLayer() - if csl != layer { + if tsl == layer && csl != tsl { + if extPkt.KeyFrame { + d.currentSpatialLayer.set(layer) + csl = layer + } else { + d.lastPli.set(time.Now().UnixNano()) + d.receiver.SendPLI(layer) + } + } + + if tsl < csl && tsl < d.MaxSpatialLayer() { + // + // If target layer is lower than both the current and + // maximum subscribed layer, it is due to bandwidth + // constraints that the target layer has been switched down. + // Continuing to send higher layer will only exacerbate the + // situation by putting more stress on the channel. So, drop it. + // + // In the other direction, it is okay to keep forwarding till + // switch point to get a smoother stream till the higher + // layer key frame arrives. + // + // Note that in the case of client subscription layer restriction + // coinciding with server restriction due to bandwidth limitation, + // this will take client subscription as the winning vote and + // continue to stream current spatial layer till switch point. + // That could lead to congesting the channel. + // LK-TODO: Improve the above case, i. e. distinguish server + // applied restriction from client requested restriction. + // + d.pktsDropped.add(1) return nil } + if csl != layer { + d.pktsDropped.add(1) + return nil + } + + // Check if packet SSRC is different from before + // if true, the video source changed lastSSRC := d.lastSSRC.get() reSync := d.reSync.get() if lastSSRC != extPkt.Packet.SSRC || reSync { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 70c1d25d0..69d6dcf85 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -37,6 +37,7 @@ type Receiver interface { DeleteDownTrack(peerID string) OnCloseHandler(fn func()) SendRTCP(p []rtcp.Packet) + SendPLI(layer int32) SetRTCPCh(ch chan []rtcp.Packet) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64) @@ -470,6 +471,14 @@ func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) { w.rtcpCh <- p } +func (w *WebRTCReceiver) SendPLI(layer int32) { + pli := []rtcp.Packet{ + &rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(int(layer))}, + } + + w.SendRTCP(pli) +} + func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) { w.rtcpCh = ch } @@ -550,10 +559,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } }() - pli := []rtcp.Packet{ - &rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(int(layer))}, - } - for { w.bufferMu.RLock() pkt, err := w.buffers[layer].ReadExtended() @@ -574,7 +579,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { // serial - not enough down tracks for parallelization to outweigh overhead for _, dt := range downTracks { if dt != nil { - w.writeRTP(layer, dt, pkt, pli) + w.writeRTP(layer, dt, pkt) } } } else { @@ -599,7 +604,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { for i := n - step; i < n && i < end; i++ { if dt := downTracks[i]; dt != nil { - w.writeRTP(layer, dt, pkt, pli) + w.writeRTP(layer, dt, pkt) } } } @@ -610,40 +615,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } } -func (w *WebRTCReceiver) writeRTP(layer int32, dt *DownTrack, pkt *buffer.ExtPacket, pli []rtcp.Packet) { - // LK-TODO-START - // Ideally this code should also be moved into the DownTrack - // structure to keep things modular. Let the down track code - // make decision on forwarding or not - // LK-TODO-END - if w.isSimulcast { - targetLayer := dt.TargetSpatialLayer() - currentLayer := dt.CurrentSpatialLayer() - if targetLayer == layer && currentLayer != targetLayer { - if pkt.KeyFrame { - dt.SwitchSpatialLayerDone(targetLayer) - currentLayer = targetLayer - } else { - dt.lastPli.set(time.Now().UnixNano()) - w.SendRTCP(pli) - } - } - // LK-TODO-START - // Probably need a control here to stop forwarding current layer - // if the current layer is higher than target layer, i. e. target layer - // could have been switched down due to bandwidth constraints and - // continuing to forward higher layer is only going to exacerbate the issue. - // Note that the client might have also requested a lower layer. So, it - // would nice to distinguish between client requested downgrade vs bandwidth - // constrained downgrade and stop higher layer only in the bandwidth - // constrained case. - // LK-TODO-END - if currentLayer != layer { - dt.pktsDropped.add(1) - return - } - } - +func (w *WebRTCReceiver) writeRTP(layer int32, dt *DownTrack, pkt *buffer.ExtPacket) { if err := dt.WriteRTP(pkt, layer); err != nil { log.Error().Err(err).Str("id", dt.id).Msg("Error writing to down track") }