mirror of
https://github.com/livekit/livekit.git
synced 2026-06-03 21:51:44 +00:00
Let down track do the layer filtering (#203)
This commit is contained in:
+39
-7
@@ -541,10 +541,6 @@ func (d *DownTrack) switchSpatialLayer(targetLayer int32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DownTrack) SwitchSpatialLayerDone(layer int32) {
|
||||
d.currentSpatialLayer.set(layer)
|
||||
}
|
||||
|
||||
func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error) {
|
||||
if d.trackType == SimulcastDownTrack {
|
||||
currentLayer := uint16(d.CurrentSpatialLayer())
|
||||
@@ -944,13 +940,49 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
|
||||
}
|
||||
|
||||
func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
// Check if packet SSRC is different from before
|
||||
// if true, the video source changed
|
||||
tsl := d.TargetSpatialLayer()
|
||||
csl := d.CurrentSpatialLayer()
|
||||
if csl != layer {
|
||||
if tsl == layer && csl != tsl {
|
||||
if extPkt.KeyFrame {
|
||||
d.currentSpatialLayer.set(layer)
|
||||
csl = layer
|
||||
} else {
|
||||
d.lastPli.set(time.Now().UnixNano())
|
||||
d.receiver.SendPLI(layer)
|
||||
}
|
||||
}
|
||||
|
||||
if tsl < csl && tsl < d.MaxSpatialLayer() {
|
||||
//
|
||||
// If target layer is lower than both the current and
|
||||
// maximum subscribed layer, it is due to bandwidth
|
||||
// constraints that the target layer has been switched down.
|
||||
// Continuing to send higher layer will only exacerbate the
|
||||
// situation by putting more stress on the channel. So, drop it.
|
||||
//
|
||||
// In the other direction, it is okay to keep forwarding till
|
||||
// switch point to get a smoother stream till the higher
|
||||
// layer key frame arrives.
|
||||
//
|
||||
// Note that in the case of client subscription layer restriction
|
||||
// coinciding with server restriction due to bandwidth limitation,
|
||||
// this will take client subscription as the winning vote and
|
||||
// continue to stream current spatial layer till switch point.
|
||||
// That could lead to congesting the channel.
|
||||
// LK-TODO: Improve the above case, i. e. distinguish server
|
||||
// applied restriction from client requested restriction.
|
||||
//
|
||||
d.pktsDropped.add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
if csl != layer {
|
||||
d.pktsDropped.add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if packet SSRC is different from before
|
||||
// if true, the video source changed
|
||||
lastSSRC := d.lastSSRC.get()
|
||||
reSync := d.reSync.get()
|
||||
if lastSSRC != extPkt.Packet.SSRC || reSync {
|
||||
|
||||
+12
-40
@@ -37,6 +37,7 @@ type Receiver interface {
|
||||
DeleteDownTrack(peerID string)
|
||||
OnCloseHandler(fn func())
|
||||
SendRTCP(p []rtcp.Packet)
|
||||
SendPLI(layer int32)
|
||||
SetRTCPCh(ch chan []rtcp.Packet)
|
||||
|
||||
GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
|
||||
@@ -470,6 +471,14 @@ func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) {
|
||||
w.rtcpCh <- p
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) SendPLI(layer int32) {
|
||||
pli := []rtcp.Packet{
|
||||
&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(int(layer))},
|
||||
}
|
||||
|
||||
w.SendRTCP(pli)
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) {
|
||||
w.rtcpCh = ch
|
||||
}
|
||||
@@ -550,10 +559,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
}()
|
||||
|
||||
pli := []rtcp.Packet{
|
||||
&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: w.SSRC(int(layer))},
|
||||
}
|
||||
|
||||
for {
|
||||
w.bufferMu.RLock()
|
||||
pkt, err := w.buffers[layer].ReadExtended()
|
||||
@@ -574,7 +579,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
// serial - not enough down tracks for parallelization to outweigh overhead
|
||||
for _, dt := range downTracks {
|
||||
if dt != nil {
|
||||
w.writeRTP(layer, dt, pkt, pli)
|
||||
w.writeRTP(layer, dt, pkt)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -599,7 +604,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
|
||||
for i := n - step; i < n && i < end; i++ {
|
||||
if dt := downTracks[i]; dt != nil {
|
||||
w.writeRTP(layer, dt, pkt, pli)
|
||||
w.writeRTP(layer, dt, pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -610,40 +615,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) writeRTP(layer int32, dt *DownTrack, pkt *buffer.ExtPacket, pli []rtcp.Packet) {
|
||||
// LK-TODO-START
|
||||
// Ideally this code should also be moved into the DownTrack
|
||||
// structure to keep things modular. Let the down track code
|
||||
// make decision on forwarding or not
|
||||
// LK-TODO-END
|
||||
if w.isSimulcast {
|
||||
targetLayer := dt.TargetSpatialLayer()
|
||||
currentLayer := dt.CurrentSpatialLayer()
|
||||
if targetLayer == layer && currentLayer != targetLayer {
|
||||
if pkt.KeyFrame {
|
||||
dt.SwitchSpatialLayerDone(targetLayer)
|
||||
currentLayer = targetLayer
|
||||
} else {
|
||||
dt.lastPli.set(time.Now().UnixNano())
|
||||
w.SendRTCP(pli)
|
||||
}
|
||||
}
|
||||
// LK-TODO-START
|
||||
// Probably need a control here to stop forwarding current layer
|
||||
// if the current layer is higher than target layer, i. e. target layer
|
||||
// could have been switched down due to bandwidth constraints and
|
||||
// continuing to forward higher layer is only going to exacerbate the issue.
|
||||
// Note that the client might have also requested a lower layer. So, it
|
||||
// would nice to distinguish between client requested downgrade vs bandwidth
|
||||
// constrained downgrade and stop higher layer only in the bandwidth
|
||||
// constrained case.
|
||||
// LK-TODO-END
|
||||
if currentLayer != layer {
|
||||
dt.pktsDropped.add(1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WebRTCReceiver) writeRTP(layer int32, dt *DownTrack, pkt *buffer.ExtPacket) {
|
||||
if err := dt.WriteRTP(pkt, layer); err != nil {
|
||||
log.Error().Err(err).Str("id", dt.id).Msg("Error writing to down track")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user