diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 69fa1b8cc..e3a31133d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -707,8 +707,7 @@ func (p *ParticipantImpl) GetConnectionQuality() livekit.ConnectionQuality { if subTrack.IsMuted() { continue } - // LK-TODO: maybe this should check CurrentSpatialLayer as target may not have been achieved - if subTrack.DownTrack().TargetSpatialLayer() < subTrack.DownTrack().MaxSpatialLayer() { + if subTrack.DownTrack().GetForwardingStatus() != sfu.ForwardingStatusOptimal { reducedQualitySub = true } subLoss += subTrack.SubscribeLossPercentage() diff --git a/pkg/sfu/buffer/helpers.go b/pkg/sfu/buffer/helpers.go index aff3efa88..5ff0cc6ec 100644 --- a/pkg/sfu/buffer/helpers.go +++ b/pkg/sfu/buffer/helpers.go @@ -45,17 +45,14 @@ func (a *atomicBool) get() bool { +-+-+-+-+-+-+-+-+ */ type VP8 struct { - TemporalSupported bool // LK-TODO: CLEANUP-REMOVE - FirstByte byte + FirstByte byte PictureIDPresent int PictureID uint16 /* 8 or 16 bits, picture ID */ - PicIDIdx int // LK-TODO: CLEANUP-REMOVE MBit bool TL0PICIDXPresent int TL0PICIDX uint8 /* 8 bits temporal level zero index */ - TlzIdx int // LK-TODO: CLEANUP-REMOVE // Optional Header If either of the T or K bits are set to 1, // the TID/Y/KEYIDX extension field MUST be present. @@ -100,15 +97,12 @@ func (p *VP8) Unmarshal(payload []byte) error { if L && !T { return errInvalidPacket } - // Check if T is present, if not, no temporal layer is available - p.TemporalSupported = payload[idx]&0x20 > 0 // Check for PictureID if I { idx++ if payloadLen < idx+1 { return errShortPacket } - p.PicIDIdx = idx p.PictureIDPresent = 1 pid := payload[idx] & 0x7f // Check if m is 1, then Picture ID is 15 bits @@ -129,7 +123,6 @@ func (p *VP8) Unmarshal(payload []byte) error { if payloadLen < idx+1 { return errShortPacket } - p.TlzIdx = idx p.TL0PICIDXPresent = 1 if int(idx) >= payloadLen { diff --git a/pkg/sfu/buffer/helpers_test.go b/pkg/sfu/buffer/helpers_test.go index 5cdff9359..0386d5530 100644 --- a/pkg/sfu/buffer/helpers_test.go +++ b/pkg/sfu/buffer/helpers_test.go @@ -75,7 +75,7 @@ func TestVP8Helper_Unmarshal(t *testing.T) { t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) } if tt.checkTemporal { - assert.Equal(t, tt.temporalSupport, p.TemporalSupported) + assert.Equal(t, tt.temporalSupport, p.TIDPresent == 1) } if tt.checkKeyFrame { assert.Equal(t, tt.keyFrame, p.IsKeyFrame) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 23aff14e1..ddd8e7330 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -54,10 +54,19 @@ const ( SequenceNumberOrderingContiguous SequenceNumberOrdering = iota SequenceNumberOrderingOutOfOrder SequenceNumberOrderingGap - SequenceNumberOrderingUnknown + SequenceNumberOrderingDuplicate +) + +type ForwardingStatus int + +const ( + ForwardingStatusOff ForwardingStatus = iota + ForwardingStatusPartial + ForwardingStatusOptimal ) var ( + ErrUnknownKind = errors.New("unknown kind of codec") ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") ErrDuplicatePacket = errors.New("duplicate packet") @@ -78,11 +87,26 @@ var ( H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} ) -type simulcastTrackHelpers struct { - switchDelay time.Time - temporalSupported bool - temporalEnabled bool - lTSCalc atomicInt64 +type TranslationParamsRTP struct { + snOrdering SequenceNumberOrdering + sequenceNumber uint16 + timestamp uint32 +} + +type TranslationParamsVP8 struct { + header *buffer.VP8 +} + +type TranslationParams struct { + shouldDrop bool + shouldSendPLI bool + rtp *TranslationParamsRTP + vp8 *TranslationParamsVP8 +} + +type SnTs struct { + sequenceNumber uint16 + timestamp uint32 } type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) @@ -94,6 +118,7 @@ type DownTrack struct { id string peerID string bound atomicBool + kind webrtc.RTPCodecType mime string ssrc uint32 streamID string @@ -104,22 +129,7 @@ type DownTrack struct { bufferFactory *buffer.Factory payload *[]byte - currentSpatialLayer atomicInt32 - targetSpatialLayer atomicInt32 - - currentTemporalLayer atomicInt32 - targetTemporalLayer atomicInt32 - - enabled atomicBool - reSync atomicBool - lastSSRC atomicUint32 - - munger *Munger - vp8Munger *VP8Munger - - simulcast simulcastTrackHelpers - maxSpatialLayer atomicInt32 - maxTemporalLayer atomicInt32 + forwarder *Forwarder codec webrtc.RTPCodecCapability rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter @@ -140,11 +150,8 @@ type DownTrack struct { // Debug info lastPli atomicInt64 lastRTP atomicInt64 - pktsMuted atomicUint32 pktsDropped atomicUint32 - maxPacketTs uint32 - // RTCP callbacks onRTCP func([]rtcp.Packet) onREMB func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) @@ -164,6 +171,16 @@ type DownTrack struct { // NewDownTrack returns a DownTrack. func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Factory, peerID string, mt int) (*DownTrack, error) { + var kind webrtc.RTPCodecType + switch { + case strings.HasPrefix(c.MimeType, "audio/"): + kind = webrtc.RTPCodecTypeAudio + case strings.HasPrefix(c.MimeType, "video/"): + kind = webrtc.RTPCodecTypeVideo + default: + kind = webrtc.RTPCodecType(0) + } + d := &DownTrack{ id: r.TrackID(), peerID: peerID, @@ -172,30 +189,14 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto bufferFactory: bf, receiver: r, codec: c, - munger: NewMunger(), + kind: kind, + forwarder: NewForwarder(c, kind), } if strings.ToLower(c.MimeType) == "video/vp8" { - d.vp8Munger = NewVP8Munger() d.payload = packetFactory.Get().(*[]byte) } - if d.Kind() == webrtc.RTPCodecTypeVideo { - d.maxSpatialLayer.set(2) - d.maxTemporalLayer.set(2) - } else { - d.maxSpatialLayer.set(InvalidSpatialLayer) - d.maxTemporalLayer.set(InvalidTemporalLayer) - } - - // start off with nothing, let streamallocator set things - d.currentSpatialLayer.set(InvalidSpatialLayer) - d.targetSpatialLayer.set(InvalidSpatialLayer) - - // start off with nothing, let streamallocator set things - d.currentTemporalLayer.set(InvalidTemporalLayer) - d.targetTemporalLayer.set(InvalidTemporalLayer) - return d, nil } @@ -217,8 +218,6 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.payloadType = uint8(codec.PayloadType) d.writeStream = t.WriteStream() d.mime = strings.ToLower(codec.MimeType) - d.reSync.set(true) - d.enabled.set(true) if rr := d.bufferFactory.GetOrNew(packetio.RTCPBufferPacket, uint32(t.SSRC())).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { d.handleRTCP(pkt) @@ -264,14 +263,7 @@ func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeade // Kind controls if this TrackLocal is audio or video func (d *DownTrack) Kind() webrtc.RTPCodecType { - switch { - case strings.HasPrefix(d.codec.MimeType, "audio/"): - return webrtc.RTPCodecTypeAudio - case strings.HasPrefix(d.codec.MimeType, "video/"): - return webrtc.RTPCodecTypeVideo - default: - return webrtc.RTPCodecType(0) - } + return d.kind } func (d *DownTrack) SSRC() uint32 { @@ -289,85 +281,66 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver = transceiver } -func (d *DownTrack) maybeTranslateVP8(pkt *rtp.Packet, meta packetMeta) error { - if d.vp8Munger == nil || len(pkt.Payload) == 0 { - return nil - } - - var incomingVP8 buffer.VP8 - if err := incomingVP8.Unmarshal(pkt.Payload); err != nil { - return err - } - - translatedVP8 := meta.unpackVP8() - payload, err := d.translateVP8Packet(pkt, &incomingVP8, translatedVP8, false) - if err != nil { - return err - } - - pkt.Payload = payload - return nil -} - -// Writes RTP header extensions of track -func (d *DownTrack) writeRTPHeaderExtensions(hdr *rtp.Header) error { - // clear out extensions that may have been in the forwarded header - hdr.Extension = false - hdr.ExtensionProfile = 0 - hdr.Extensions = []rtp.Extension{} - - for _, ext := range d.rtpHeaderExtensions { - if ext.URI != sdp.ABSSendTimeURI { - // supporting only abs-send-time - continue - } - - sendTime := rtp.NewAbsSendTimeExtension(time.Now()) - b, err := sendTime.Marshal() - if err != nil { - return err - } - - err = hdr.SetExtension(uint8(ext.ID), b) - if err != nil { - return err - } - } - - return nil -} - // WriteRTP writes a RTP Packet to the DownTrack -func (d *DownTrack) WriteRTP(p *buffer.ExtPacket, layer int32) error { +func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { d.lastRTP.set(time.Now().UnixNano()) if !d.bound.get() { return nil } - if !d.enabled.get() { - d.pktsMuted.add(1) - return nil + + tp, err := d.forwarder.GetTranslationParams(extPkt, layer) + if tp.shouldSendPLI { + d.lastPli.set(time.Now().UnixNano()) + d.receiver.SendPLI(layer) + } + if tp.shouldDrop { + d.pktsDropped.add(1) + return err } - switch d.trackType { - case SimpleDownTrack: - return d.writeSimpleRTP(p) - case SimulcastDownTrack: - return d.writeSimulcastRTP(p, layer) + payload := extPkt.Packet.Payload + if tp.vp8 != nil { + incomingVP8, _ := extPkt.Payload.(buffer.VP8) + payload, err = d.translateVP8Packet(&extPkt.Packet, &incomingVP8, tp.vp8.header) + if err != nil { + d.pktsDropped.add(1) + return err + } } - return nil + + if d.sequencer != nil { + meta := d.sequencer.push(extPkt.Packet.SequenceNumber, tp.rtp.sequenceNumber, tp.rtp.timestamp, 0, extPkt.Head) + if meta != nil && tp.vp8 != nil { + meta.packVP8(tp.vp8.header) + } + } + + hdr, err := d.getTranslatedRTPHeader(extPkt, tp.rtp) + if err != nil { + d.pktsDropped.add(1) + return err + } + + _, err = d.writeStream.WriteRTP(hdr, payload) + if err == nil { + for _, f := range d.onPacketSent { + f(d, hdr.MarshalSize()+len(payload)) + } + } else { + d.pktsDropped.add(1) + } + + // LK-TODO maybe include RTP header size also + d.UpdateStats(uint32(len(payload))) + + return err } // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { - // LK-TODO-START - // Potentially write padding even if muted. Given that padding - // can be sent only on frame boudaries, writing on disabled tracks - // will give more options. But, it is possible that forwarding stopped - // on a non-frame boundary when the track is muted. - // LK-TODO-END - if !d.enabled.get() || d.packetCount.get() == 0 { + if d.packetCount.get() == 0 { return 0 } @@ -378,30 +351,34 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { // padding is mainly used to probe for excess available bandwidth. // So, to be safe, limit to video tracks // LK-TODO-END - if d.Kind() == webrtc.RTPCodecTypeAudio { + if d.kind == webrtc.RTPCodecTypeAudio { + return 0 + } + + // LK-TODO-START + // Potentially write padding even if muted. Given that padding + // can be sent only on frame boudaries, writing on disabled tracks + // will give more options. + // LK-TODO-END + if d.forwarder.Muted() { + return 0 + } + + // RTP padding maximum is 255 bytes. Break it up. + // Use 20 byte as estimate of RTP header size (12 byte header + 8 byte extension) + num := (bytesToSend + RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize - 1) / (RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize) + if num == 0 { + return 0 + } + + snts, err := d.forwarder.GetSnTsForPadding(num) + if err != nil { return 0 } // LK-TODO Look at load balancing a la sfu.Receiver to spread across available CPUs bytesSent := 0 - for { - size := bytesToSend - // RTP padding maximum is 255 bytes. Break it up. - // Use 20 byte as estimate of RTP header size (12 byte header + 8 byte extension) - if size > RTPPaddingMaxPayloadSize+RTPPaddingEstimatedHeaderSize { - size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize - } - - // padding is used for probing. Padding packets should be - // at frame boundaries only to ensure decoder sequencer does - // not get out-of-sync. But, when a stream is paused, - // force a frame marker as a restart of the stream will - // start with a key frame which will reset the decoder. - sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(d.TargetSpatialLayer() == InvalidSpatialLayer) - if err != nil { - return bytesSent - } - + for i := 0; i < len(snts); i++ { // LK-TODO-START // Hold sending padding packets till first RTCP-RR is received for this RTP stream. // That is definitive proof that the remote side knows about this RTP stream. @@ -409,18 +386,13 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { // on as yet unstarted streams which is a reasonble check. // LK-TODO-END - // intentionally ignoring check for bandwidth constrained mute - // as padding is typically used to probe for channel capacity - // and sending it on any track achieves the purpose of probing - // the channel. - hdr := rtp.Header{ Version: 2, Padding: true, Marker: false, PayloadType: d.payloadType, - SequenceNumber: sn, - Timestamp: ts, + SequenceNumber: snts[i].sequenceNumber, + Timestamp: snts[i].timestamp, SSRC: d.ssrc, CSRC: []uint32{}, } @@ -430,10 +402,9 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { return bytesSent } - payloadSize := size - RTPPaddingEstimatedHeaderSize - payload := make([]byte, payloadSize) + payload := make([]byte, RTPPaddingMaxPayloadSize) // last byte of padding has padding size including that byte - payload[payloadSize-1] = byte(payloadSize) + payload[RTPPaddingMaxPayloadSize-1] = byte(RTPPaddingMaxPayloadSize) _, err = d.writeStream.WriteRTP(&hdr, payload) if err != nil { @@ -441,7 +412,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { } // LK-TODO - check if we should keep separate padding stats - size = hdr.MarshalSize() + len(payload) + size := hdr.MarshalSize() + len(payload) d.UpdateStats(uint32(size)) // LK-TODO-START @@ -452,28 +423,20 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { // LK-TODO-END bytesSent += size - bytesToSend -= size - if bytesToSend <= 0 { - break - } } return bytesSent } -func (d *DownTrack) Enabled() bool { - return d.enabled.get() -} - // Mute enables or disables media forwarding func (d *DownTrack) Mute(val bool) { - if d.enabled.get() != val { + changed := d.forwarder.Mute(val) + if !changed { return } - d.enabled.set(!val) + if val { d.lossFraction.set(0) - d.reSync.set(val) } if d.onSubscriptionChanged != nil { @@ -483,7 +446,7 @@ func (d *DownTrack) Mute(val bool) { // Close track func (d *DownTrack) Close() { - d.enabled.set(false) + d.forwarder.Mute(true) // write blank frames after disabling so that other frames do not interfere. // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. @@ -492,7 +455,7 @@ func (d *DownTrack) Close() { d.writeBlankFrameRTP() d.closeOnce.Do(func() { - Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.Kind()) + Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.kind) if d.payload != nil { packetFactory.Put(d.payload) } @@ -502,76 +465,42 @@ func (d *DownTrack) Close() { }) } -func (d *DownTrack) CurrentSpatialLayer() int32 { - return d.currentSpatialLayer.get() -} - -func (d *DownTrack) TargetSpatialLayer() int32 { - return d.targetSpatialLayer.get() -} - -func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error { - // LK-TODO: support SVC - if d.trackType != SimulcastDownTrack { - return ErrSpatialNotSupported - } - - if spatialLayer == d.MaxSpatialLayer() { - return nil - } - - d.maxSpatialLayer.set(spatialLayer) - - if d.enabled.get() && d.onSubscribedLayersChanged != nil { - d.onSubscribedLayersChanged(d, spatialLayer, d.MaxTemporalLayer()) - } - - return nil -} - -func (d *DownTrack) MaxSpatialLayer() int32 { - return d.maxSpatialLayer.get() -} - -func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { - if temporalLayer == d.MaxTemporalLayer() { +func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) { + changed := d.forwarder.SetMaxSpatialLayer(spatialLayer) + if !changed { return } - d.maxTemporalLayer.set(temporalLayer) - - if d.onSubscribedLayersChanged != nil { - d.onSubscribedLayersChanged(d, d.MaxSpatialLayer(), temporalLayer) + if !d.forwarder.Muted() && d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, spatialLayer, d.forwarder.MaxTemporalLayer()) } } -func (d *DownTrack) MaxTemporalLayer() int32 { - return d.maxTemporalLayer.get() +func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { + changed := d.forwarder.SetMaxTemporalLayer(temporalLayer) + if !changed { + return + } + + if !d.forwarder.Muted() && d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, d.forwarder.MaxSpatialLayer(), temporalLayer) + } } -// switchSpatialLayer switches the target layer -func (d *DownTrack) switchSpatialLayer(targetLayer int32) { - d.targetSpatialLayer.set(targetLayer) +func (d *DownTrack) MaxLayers() (int32, int32) { + return d.forwarder.MaxLayers() +} + +func (d *DownTrack) GetForwardingStatus() ForwardingStatus { + return d.forwarder.GetForwardingStatus() } func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) { - if d.onAvailableLayersChanged != nil { + if !d.forwarder.Muted() && d.onAvailableLayersChanged != nil { d.onAvailableLayersChanged(d, layerAdded) } } -func (d *DownTrack) switchTemporalLayer(targetLayer int32) { - d.targetTemporalLayer.set(targetLayer) -} - -func (d *DownTrack) disableSend() { - d.currentSpatialLayer.set(InvalidSpatialLayer) - d.targetSpatialLayer.set(InvalidSpatialLayer) - - d.currentTemporalLayer.set(InvalidTemporalLayer) - d.targetTemporalLayer.set(InvalidTemporalLayer) -} - // OnCloseHandler method to be called on remote tracked removed func (d *DownTrack) OnCloseHandler(fn func()) { d.onCloseHandler = fn @@ -615,129 +544,12 @@ func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) { d.onPacketSent = append(d.onPacketSent, fn) } -func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (isPausing, isResuming bool, bandwidthRequested, optimalBandwidthNeeded uint64) { - isPausing = false - isResuming = false - bandwidthRequested = 0 - optimalBandwidthNeeded = 0 - - if d.Kind() == webrtc.RTPCodecTypeAudio || !d.enabled.get() { - return - } - - // LK-TODO for temporal preference, traverse the bitrates array the other way - optimalBandwidthNeeded = uint64(0) - brs := d.receiver.GetBitrateTemporalCumulative() - for i := d.maxSpatialLayer.get(); i >= 0; i-- { - for j := d.maxTemporalLayer.get(); j >= 0; j-- { - if brs[i][j] == 0 { - continue - } - if optimalBandwidthNeeded == 0 { - optimalBandwidthNeeded = brs[i][j] - } - if brs[i][j] < availableChannelCapacity { - isResuming = d.TargetSpatialLayer() == InvalidSpatialLayer - bandwidthRequested = brs[i][j] - - d.switchSpatialLayer(int32(i)) - d.switchTemporalLayer(int32(j)) - - return - } - } - } - - if optimalBandwidthNeeded != 0 { - // no layer fits in the available channel capacity, disable the track - isPausing = d.TargetSpatialLayer() != InvalidSpatialLayer - d.disableSend() - - d.reSync.set(true) // re-sync required on next layer switch - } - return +func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (bool, bool, uint64, uint64) { + return d.forwarder.AdjustAllocation(availableChannelCapacity, d.receiver.GetBitrateTemporalCumulative()) } func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { - // LK-TODO-START - // This is mainly used in probing to try a slightly higher layer. - // But, if down track is not a simulcast track, then the next - // available layer (i. e. the only layer of simple track) may boost - // things by a lot (it could happen in simulcast jumps too). - // May need to take in a layer increase threshold as an argument - // (in terms of bps) and increase layer only if the jump is within - // that threshold. - // LK-TODO-END - if d.Kind() == webrtc.RTPCodecTypeAudio || !d.enabled.get() { - return false, 0, 0 - } - - currentSpatialLayer := d.CurrentSpatialLayer() - targetSpatialLayer := d.TargetSpatialLayer() - - currentTemporalLayer := d.currentTemporalLayer.get() - targetTemporalLayer := d.targetTemporalLayer.get() - - // if targets are still pending, don't increase - if targetSpatialLayer != InvalidSpatialLayer { - if targetSpatialLayer != currentSpatialLayer || targetTemporalLayer != currentTemporalLayer { - return false, 0, 0 - } - } - - // move to the next available layer - optimalBandwidthNeeded := uint64(0) - brs := d.receiver.GetBitrateTemporalCumulative() - for i := d.maxSpatialLayer.get(); i >= 0; i-- { - for j := d.maxTemporalLayer.get(); j >= 0; j-- { - if brs[i][j] == 0 { - continue - } - if optimalBandwidthNeeded == 0 { - optimalBandwidthNeeded = brs[i][j] - break - } - } - - if optimalBandwidthNeeded != 0 { - break - } - } - - if d.TargetSpatialLayer() == InvalidSpatialLayer { - // try the lowest spatial and temporal layer if available - // LK-TODO-START - // note that this will never be zero because we do not track - // layer 0 in available layers. So, this will need fixing. - // LK-TODO-END - if brs[0][0] == 0 { - // no feed available - return false, 0, 0 - } - - d.switchSpatialLayer(int32(0)) - d.switchTemporalLayer(int32(0)) - return true, brs[0][0], optimalBandwidthNeeded - } - - // try moving temporal layer up in the current spatial layer - // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it - nextTemporalLayer := currentTemporalLayer + 1 - if nextTemporalLayer <= d.maxTemporalLayer.get() && brs[currentSpatialLayer][nextTemporalLayer] > 0 { - d.switchTemporalLayer(nextTemporalLayer) - return true, brs[currentSpatialLayer][nextTemporalLayer], optimalBandwidthNeeded - } - - // try moving spatial layer up if already at max temporal layer of current spatial layer - // LK-TODO currentSpatialLayer may be outside available range because of inital value being out of range, fix it - nextSpatialLayer := currentSpatialLayer + 1 - if nextSpatialLayer <= d.maxSpatialLayer.get() && brs[nextSpatialLayer][0] > 0 { - d.switchSpatialLayer(nextSpatialLayer) - d.switchTemporalLayer(0) - return true, brs[nextSpatialLayer][0], optimalBandwidthNeeded - } - - return false, 0, 0 + return d.forwarder.IncreaseAllocation(d.receiver.GetBitrateTemporalCumulative()) } func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk { @@ -766,7 +578,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - currentSpatialLayer := d.CurrentSpatialLayer() + currentSpatialLayer := d.forwarder.CurrentSpatialLayer() if currentSpatialLayer == InvalidSpatialLayer { return nil } @@ -798,320 +610,6 @@ func (d *DownTrack) UpdateStats(packetLen uint32) { d.packetCount.add(1) } -func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { - if d.reSync.get() { - if d.Kind() == webrtc.RTPCodecTypeVideo { - if d.TargetSpatialLayer() == InvalidSpatialLayer { - d.pktsDropped.add(1) - return nil - } - - if !extPkt.KeyFrame { - d.lastPli.set(time.Now().UnixNano()) - d.receiver.SendPLI(0) - d.pktsDropped.add(1) - return nil - } else { - // although one spatial layer, this is done so it - // works proper with stream allocator. - d.currentSpatialLayer.set(d.TargetSpatialLayer()) - } - } - if d.packetCount.get() > 0 { - // LK-TODO-START - // TS offset of 1 is not accurate. It should ideally - // be driven by packetization of the incoming track. - // But, this handles track switch on a simple track scenario. - // It is not a supported use case. So, it is okay. But, if - // we support switch track (i. e. same down track switches - // to a different up track), this needs to be looked at. - // LK-TODO-END - d.munger.UpdateSnTsOffsets(extPkt, 1, 1) - } else { - d.munger.SetLastSnTs(extPkt) - if d.vp8Munger != nil { - d.vp8Munger.SetLast(extPkt) - } - } - d.lastSSRC.set(extPkt.Packet.SSRC) - d.reSync.set(false) - } - - newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } - - payload := extPkt.Packet.Payload - - var translatedVP8 *buffer.VP8 - if d.vp8Munger != nil && len(payload) > 0 { - // LK-TODO-START - // Errors below do not update sequence number. That is a problem if the stream - // is expected to continue past the error. The translation should not error out. - // But, if there is a legitimate error case and the stream can continue beyond - // that, the sequence numbers should be updated to ensure that subsequent packet - // translations works fine and produce proper translated sequence numbers. - // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get()) - if err != nil { - if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { - if err == ErrFilteredVP8TemporalLayer { - // filtered temporal layer, update sequence number offset to prevent holes - d.munger.PacketDropped(extPkt) - } - d.pktsDropped.add(1) - return nil - } - - d.pktsDropped.add(1) - return err - } - - incomingVP8, ok := extPkt.Payload.(buffer.VP8) - if !ok { - d.pktsDropped.add(1) - return ErrNotVP8 - } - - payload, err = d.translateVP8Packet(&extPkt.Packet, &incomingVP8, translatedVP8, ordering != SequenceNumberOrderingOutOfOrder) - if err != nil { - d.pktsDropped.add(1) - return err - } - } - - if d.sequencer != nil { - meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head) - if meta != nil && translatedVP8 != nil { - meta.packVP8(translatedVP8) - } - } - - // LK-TODO maybe include RTP header size also - d.UpdateStats(uint32(len(payload))) - - hdr := extPkt.Packet.Header - hdr.PayloadType = d.payloadType - hdr.Timestamp = newTS - hdr.SequenceNumber = newSN - hdr.SSRC = d.ssrc - - err = d.writeRTPHeaderExtensions(&hdr) - if err != nil { - return err - } - - _, err = d.writeStream.WriteRTP(&hdr, payload) - if err == nil { - for _, f := range d.onPacketSent { - f(d, hdr.MarshalSize()+len(payload)) - } - } - - return err -} - -func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) error { - tsl := d.TargetSpatialLayer() - if tsl == InvalidSpatialLayer { - d.pktsDropped.add(1) - return nil - } - - csl := d.CurrentSpatialLayer() - if tsl == layer && csl != tsl { - if extPkt.KeyFrame { - d.currentSpatialLayer.set(layer) - csl = layer - } else { - d.lastPli.set(time.Now().UnixNano()) - d.receiver.SendPLI(layer) - } - } - - if tsl < csl && tsl < d.MaxSpatialLayer() { - // - // If target layer is lower than both the current and - // maximum subscribed layer, it is due to bandwidth - // constraints that the target layer has been switched down. - // Continuing to send higher layer will only exacerbate the - // situation by putting more stress on the channel. So, drop it. - // - // In the other direction, it is okay to keep forwarding till - // switch point to get a smoother stream till the higher - // layer key frame arrives. - // - // Note that in the case of client subscription layer restriction - // coinciding with server restriction due to bandwidth limitation, - // this will take client subscription as the winning vote and - // continue to stream current spatial layer till switch point. - // That could lead to congesting the channel. - // LK-TODO: Improve the above case, i. e. distinguish server - // applied restriction from client requested restriction. - // - d.pktsDropped.add(1) - return nil - } - - if csl != layer { - d.pktsDropped.add(1) - return nil - } - - // Check if packet SSRC is different from before - // if true, the video source changed - lastSSRC := d.lastSSRC.get() - reSync := d.reSync.get() - if lastSSRC != extPkt.Packet.SSRC || reSync { - // Wait for a keyframe to sync new source - if reSync && !extPkt.KeyFrame { - // Packet is not a keyframe, discard it - // LK-TODO-START - // Some of this happens is happening in sfu.Receiver also. - // If performance is not a concern, sfu.Receiver should send - // all the packets to down tracks and down track should be - // the only one deciding whether to switch/forward/drop - // LK-TODO-END - d.receiver.SendPLI(layer) - d.lastPli.set(time.Now().UnixNano()) - d.pktsDropped.add(1) - return nil - } - - if reSync && d.simulcast.lTSCalc.get() != 0 { - d.simulcast.lTSCalc.set(extPkt.Arrival) - } - - d.lastSSRC.set(extPkt.Packet.SSRC) - d.reSync.set(false) - } - - // LK-TODO-START - // The below offset calculation is not technically correct. - // Timestamps based on the system time of an intermediate box like - // SFU is not going to be accurate. Packets arrival/processing - // are subject to vagaries of network delays, SFU processing etc. - // But, the correct way is a lot harder. Will have to - // look at RTCP SR to get timestamps and figure out alignment - // of layers and use that during layer switch. That can - // get tricky. Given the complexity of that approach, maybe - // this is just fine till it is not :-). - // LK-TODO-END - - // Compute how much time passed between the old RTP extPkt - // and the current packet, and fix timestamp on source change - lTSCalc := d.simulcast.lTSCalc.get() - if lTSCalc != 0 && lastSSRC != extPkt.Packet.SSRC { - tDiff := (extPkt.Arrival - lTSCalc) / 1e6 - // LK-TODO-START - // this is assuming clock rate of 90000. - // Should be fine for video, but ideally should use ClockRate of the track - // LK-TODO-END - td := uint32((tDiff * 90) / 1000) - if td == 0 { - td = 1 - } - d.munger.UpdateSnTsOffsets(extPkt, 1, td) - if d.vp8Munger != nil { - d.vp8Munger.UpdateOffsets(extPkt) - } - } else if lTSCalc == 0 { - d.munger.SetLastSnTs(extPkt) - if d.vp8Munger != nil { - d.vp8Munger.SetLast(extPkt) - } - } - - newSN, newTS, ordering, err := d.munger.UpdateAndGetSnTs(extPkt) - if err != nil { - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { - return nil - } - - d.pktsDropped.add(1) - return err - } - - payload := extPkt.Packet.Payload - - var translatedVP8 *buffer.VP8 - if d.vp8Munger != nil && len(payload) > 0 { - // LK-TODO-START - // Errors below do not update sequence number. That is a problem if the stream - // is expected to continue past the error. The translation should not error out. - // But, if there is a legitimate error case and the stream can continue beyond - // that, the sequence numbers should be updated to ensure that subsequent packet - // translations works fine and produce proper translated sequence numbers. - // LK-TODO-END - translatedVP8, err = d.vp8Munger.UpdateAndGet(extPkt, ordering, d.targetTemporalLayer.get()) - if err != nil { - if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { - if err == ErrFilteredVP8TemporalLayer { - // filtered temporal layer, update sequence number offset to prevent holes - d.munger.PacketDropped(extPkt) - } - d.pktsDropped.add(1) - return nil - } - - d.pktsDropped.add(1) - return err - } - - incomingVP8, ok := extPkt.Payload.(buffer.VP8) - if !ok { - d.pktsDropped.add(1) - return ErrNotVP8 - } - - payload, err = d.translateVP8Packet(&extPkt.Packet, &incomingVP8, translatedVP8, ordering != SequenceNumberOrderingOutOfOrder) - if err != nil { - d.pktsDropped.add(1) - return err - } - } - - if d.sequencer != nil { - meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head) - if meta != nil && translatedVP8 != nil { - meta.packVP8(translatedVP8) - } - } - - // LK-TODO - maybe include RTP header? - d.UpdateStats(uint32(len(payload))) - - // Update base - d.simulcast.lTSCalc.set(extPkt.Arrival) - - // Update extPkt headers - hdr := extPkt.Packet.Header - hdr.SequenceNumber = newSN - hdr.Timestamp = newTS - hdr.SSRC = d.ssrc - hdr.PayloadType = d.payloadType - - err = d.writeRTPHeaderExtensions(&hdr) - if err != nil { - return err - } - - _, err = d.writeStream.WriteRTP(&hdr, payload) - if err == nil { - for _, f := range d.onPacketSent { - f(d, hdr.MarshalSize()+len(payload)) - } - } - - return err -} - func (d *DownTrack) writeBlankFrameRTP() error { // don't send if nothing has been sent if d.packetCount.get() == 0 { @@ -1119,35 +617,26 @@ func (d *DownTrack) writeBlankFrameRTP() error { } // LK-TODO: Support other video codecs - if d.Kind() == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") { + if d.kind == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") { return nil } + snts, frameEndNeeded, err := d.forwarder.GetSnTsForBlankFrames() + if err != nil { + return err + } + // send a number of blank frames just in case there is loss. // Intentionally ignoring check for mute or bandwidth constrained mute // as this is used to clear client side buffer. - for i := 0; i < RTPBlankFramesMax; { - frameEndNeeded := false - if !d.munger.IsOnFrameBoundary() { - frameEndNeeded = true - } - - sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(frameEndNeeded) - if err != nil { - return err - } - - adjustedTs := ts + uint32(i+1)*(d.codec.ClockRate/30) // assume 30 fps - if frameEndNeeded { - adjustedTs = ts - } + for i := 0; i < len(snts); i++ { hdr := rtp.Header{ Version: 2, Padding: false, Marker: true, PayloadType: d.payloadType, - SequenceNumber: sn, - Timestamp: adjustedTs, + SequenceNumber: snts[i].sequenceNumber, + Timestamp: snts[i].timestamp, SSRC: d.ssrc, CSRC: []uint32{}, } @@ -1166,17 +655,16 @@ func (d *DownTrack) writeBlankFrameRTP() error { return nil } - if !frameEndNeeded { - i++ - } - + // only the first frame will need frameEndNeeded to close out the + // previous picture, rest are small key frames + frameEndNeeded = false } return nil } func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { - blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) + blankVP8, err := d.forwarder.GetPaddingVP8(frameEndNeeded) if err != nil { return err } @@ -1217,43 +705,29 @@ func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) er } func (d *DownTrack) handleRTCP(bytes []byte) { - // LK-TODO - should probably handle RTCP even if muted - enabled := d.enabled.get() - if !enabled && d.onRTCP == nil { - return - } - pkts, err := rtcp.Unmarshal(bytes) if err != nil { Logger.Error(err, "Unmarshal rtcp receiver packets err") + return } if d.onRTCP != nil { d.onRTCP(pkts) - if !enabled { - return - } } pliOnce := true - - var ( - maxRatePacketLoss uint8 - ) - - ssrc := d.lastSSRC.get() - if ssrc == 0 { - return - } - sendPliOnce := func() { - if pliOnce && d.TargetSpatialLayer() != InvalidSpatialLayer { - d.lastPli.set(time.Now().UnixNano()) - d.receiver.SendPLI(d.TargetSpatialLayer()) - pliOnce = false + if pliOnce { + targetSpatialLayer := d.forwarder.TargetSpatialLayer() + if targetSpatialLayer != InvalidSpatialLayer { + d.lastPli.set(time.Now().UnixNano()) + d.receiver.SendPLI(targetSpatialLayer) + pliOnce = false + } } } + maxRatePacketLoss := uint8(0) for _, pkt := range pkts { switch p := pkt.(type) { case *rtcp.PictureLossIndication: @@ -1297,6 +771,26 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } } +func (d *DownTrack) maybeTranslateVP8(pkt *rtp.Packet, meta packetMeta) error { + if d.mime != "video/vp8" || len(pkt.Payload) == 0 { + return nil + } + + var incomingVP8 buffer.VP8 + if err := incomingVP8.Unmarshal(pkt.Payload); err != nil { + return err + } + + translatedVP8 := meta.unpackVP8() + payload, err := d.translateVP8Packet(pkt, &incomingVP8, translatedVP8) + if err != nil { + return err + } + + pkt.Payload = payload + return nil +} + func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) { src := packetFactory.Get().(*[]byte) defer packetFactory.Put(src) @@ -1342,18 +836,50 @@ func (d *DownTrack) getSRStats() (octets, packets uint32) { return d.octetCount.get(), d.packetCount.get() } -func (d *DownTrack) translateVP8Packet(pkt *rtp.Packet, incomingVP8 *buffer.VP8, translatedVP8 *buffer.VP8, adjustTemporal bool) (buf []byte, err error) { - if adjustTemporal { - currentTemporalLayer := d.currentTemporalLayer.get() - targetTemporalLayer := d.targetTemporalLayer.get() - // catch up temporal layer if necessary - if currentTemporalLayer != targetTemporalLayer { - if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(targetTemporalLayer) { - d.currentTemporalLayer.set(targetTemporalLayer) - } +// writes RTP header extensions of track +func (d *DownTrack) writeRTPHeaderExtensions(hdr *rtp.Header) error { + // clear out extensions that may have been in the forwarded header + hdr.Extension = false + hdr.ExtensionProfile = 0 + hdr.Extensions = []rtp.Extension{} + + for _, ext := range d.rtpHeaderExtensions { + if ext.URI != sdp.ABSSendTimeURI { + // supporting only abs-send-time + continue + } + + sendTime := rtp.NewAbsSendTimeExtension(time.Now()) + b, err := sendTime.Marshal() + if err != nil { + return err + } + + err = hdr.SetExtension(uint8(ext.ID), b) + if err != nil { + return err } } + return nil +} + +func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tpRTP *TranslationParamsRTP) (*rtp.Header, error) { + hdr := extPkt.Packet.Header + hdr.PayloadType = d.payloadType + hdr.Timestamp = tpRTP.timestamp + hdr.SequenceNumber = tpRTP.sequenceNumber + hdr.SSRC = d.ssrc + + err := d.writeRTPHeaderExtensions(&hdr) + if err != nil { + return nil, err + } + + return &hdr, nil +} + +func (d *DownTrack) translateVP8Packet(pkt *rtp.Packet, incomingVP8 *buffer.VP8, translatedVP8 *buffer.VP8) (buf []byte, err error) { buf = *d.payload buf = buf[:len(pkt.Payload)+translatedVP8.HeaderSize-incomingVP8.HeaderSize] @@ -1368,18 +894,17 @@ func (d *DownTrack) translateVP8Packet(pkt *rtp.Packet, incomingVP8 *buffer.VP8, } func (d *DownTrack) DebugInfo() map[string]interface{} { - mungerParams := d.munger.getParams() + rtpMungerParams := d.forwarder.GetRTPMungerParams() stats := map[string]interface{}{ - "HighestIncomingSN": mungerParams.highestIncomingSN, - "LastSN": mungerParams.lastSN, - "SNOffset": mungerParams.snOffset, - "LastTS": mungerParams.lastTS, - "TSOffset": mungerParams.tsOffset, - "LastMarker": mungerParams.lastMarker, + "HighestIncomingSN": rtpMungerParams.highestIncomingSN, + "LastSN": rtpMungerParams.lastSN, + "SNOffset": rtpMungerParams.snOffset, + "LastTS": rtpMungerParams.lastTS, + "TSOffset": rtpMungerParams.tsOffset, + "LastMarker": rtpMungerParams.lastMarker, "LastRTP": d.lastRTP.get(), "LastPli": d.lastPli.get(), "PacketsDropped": d.pktsDropped.get(), - "PacketsMuted": d.pktsMuted.get(), } senderReport := d.CreateSenderReport() @@ -1396,17 +921,513 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { "SSRC": d.ssrc, "MimeType": d.codec.MimeType, "Bound": d.bound.get(), - "Enabled": d.enabled.get(), - "Resync": d.reSync.get(), - "CurrentSpatialLayer": d.CurrentSpatialLayer(), + "Muted": d.forwarder.Muted(), + "CurrentSpatialLayer": d.forwarder.CurrentSpatialLayer, "Stats": stats, } } +//--------------------------------------------------- + // -// munger +// Forwarder // -type MungerParams struct { +type Forwarder struct { + lock sync.RWMutex + codec webrtc.RTPCodecCapability + kind webrtc.RTPCodecType + + muted bool + + started bool + lastSSRC uint32 + lTSCalc int64 + + maxSpatialLayer int32 + currentSpatialLayer int32 + targetSpatialLayer int32 + + maxTemporalLayer int32 + currentTemporalLayer int32 + targetTemporalLayer int32 + + rtpMunger *RTPMunger + vp8Munger *VP8Munger +} + +func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder { + f := &Forwarder{ + codec: codec, + kind: kind, + + // start off with nothing, let streamallocator set things + currentSpatialLayer: InvalidSpatialLayer, + targetSpatialLayer: InvalidSpatialLayer, + currentTemporalLayer: InvalidTemporalLayer, + targetTemporalLayer: InvalidTemporalLayer, + + rtpMunger: NewRTPMunger(), + } + + if strings.ToLower(codec.MimeType) == "video/vp8" { + f.vp8Munger = NewVP8Munger() + } + + if f.kind == webrtc.RTPCodecTypeVideo { + f.maxSpatialLayer = 2 + f.maxTemporalLayer = 2 + } else { + f.maxSpatialLayer = InvalidSpatialLayer + f.maxTemporalLayer = InvalidTemporalLayer + } + + return f +} + +func (f *Forwarder) Mute(val bool) bool { + f.lock.Lock() + defer f.lock.Unlock() + + if f.muted == val { + return false + } + + f.muted = val + return true +} + +func (f *Forwarder) Muted() bool { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.muted +} + +func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) bool { + f.lock.Lock() + defer f.lock.Unlock() + + if spatialLayer == f.maxSpatialLayer { + return false + } + + f.maxSpatialLayer = spatialLayer + return true +} + +func (f *Forwarder) MaxSpatialLayer() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.maxSpatialLayer +} + +func (f *Forwarder) CurrentSpatialLayer() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.currentSpatialLayer +} + +func (f *Forwarder) TargetSpatialLayer() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.targetSpatialLayer +} + +func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) bool { + f.lock.Lock() + defer f.lock.Unlock() + + if temporalLayer == f.maxTemporalLayer { + return false + } + + f.maxTemporalLayer = temporalLayer + return true +} + +func (f *Forwarder) MaxTemporalLayer() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.maxTemporalLayer +} + +func (f *Forwarder) MaxLayers() (int32, int32) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.maxSpatialLayer, f.maxTemporalLayer +} + +func (f *Forwarder) GetForwardingStatus() ForwardingStatus { + f.lock.RLock() + defer f.lock.RUnlock() + + if f.targetSpatialLayer == InvalidSpatialLayer { + return ForwardingStatusOff + } + + if f.targetSpatialLayer < f.maxSpatialLayer { + return ForwardingStatusPartial + } + + return ForwardingStatusOptimal +} + +func (f *Forwarder) AdjustAllocation(availableChannelCapacity uint64, brs [3][4]uint64) (isPausing, isResuming bool, bandwidthRequested, optimalBandwidthNeeded uint64) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.kind == webrtc.RTPCodecTypeAudio || f.muted { + return + } + + optimalBandwidthNeeded = uint64(0) + // LK-TODO for temporal preference, traverse the bitrates array the other way + for i := f.maxSpatialLayer; i >= 0; i-- { + for j := f.maxTemporalLayer; j >= 0; j-- { + if brs[i][j] == 0 { + continue + } + if optimalBandwidthNeeded == 0 { + optimalBandwidthNeeded = brs[i][j] + } + if brs[i][j] < availableChannelCapacity { + isResuming = f.targetSpatialLayer == InvalidSpatialLayer + bandwidthRequested = brs[i][j] + + f.targetSpatialLayer = int32(i) + f.targetTemporalLayer = int32(j) + return + } + } + } + + if optimalBandwidthNeeded != 0 { + // no layer fits in the available channel capacity, disable the track + isPausing = f.targetSpatialLayer != InvalidSpatialLayer + + f.currentSpatialLayer = InvalidSpatialLayer + f.targetSpatialLayer = InvalidSpatialLayer + + f.currentTemporalLayer = InvalidTemporalLayer + f.targetTemporalLayer = InvalidTemporalLayer + } + return +} + +func (f *Forwarder) IncreaseAllocation(brs [3][4]uint64) (increased bool, bandwidthRequested, optimalBandwidthNeeded uint64) { + // LK-TODO-START + // This is mainly used in probing to try a slightly higher layer. + // But, if down track is not a simulcast track, then the next + // available layer (i. e. the only layer of simple track) may boost + // things by a lot (it could happen in simulcast jumps too). + // May need to take in a layer increase threshold as an argument + // (in terms of bps) and increase layer only if the jump is within + // that threshold. + // LK-TODO-END + f.lock.Lock() + defer f.lock.Unlock() + + if f.kind == webrtc.RTPCodecTypeAudio || f.muted { + return + } + + // if targets are still pending, don't increase + if f.targetSpatialLayer != InvalidSpatialLayer { + if f.targetSpatialLayer != f.currentSpatialLayer || f.targetTemporalLayer != f.currentTemporalLayer { + return + } + } + + // move to the next available layer + for i := f.maxSpatialLayer; i >= 0; i-- { + for j := f.maxTemporalLayer; j >= 0; j-- { + if brs[i][j] == 0 { + continue + } + if optimalBandwidthNeeded == 0 { + optimalBandwidthNeeded = brs[i][j] + break + } + } + + if optimalBandwidthNeeded != 0 { + break + } + } + if optimalBandwidthNeeded == 0 { + // feed is dry + return + } + + // try moving temporal layer up in the current spatial layer + nextTemporalLayer := f.currentTemporalLayer + 1 + currentSpatialLayer := f.currentSpatialLayer + if currentSpatialLayer == InvalidSpatialLayer { + currentSpatialLayer = 0 + } + if nextTemporalLayer <= f.maxTemporalLayer && brs[currentSpatialLayer][nextTemporalLayer] > 0 { + f.targetSpatialLayer = currentSpatialLayer + f.targetTemporalLayer = nextTemporalLayer + + increased = true + bandwidthRequested = brs[currentSpatialLayer][nextTemporalLayer] + return + } + + // try moving spatial layer up if already at max temporal layer of current spatial layer + nextSpatialLayer := f.currentSpatialLayer + 1 + if nextSpatialLayer <= f.maxSpatialLayer && brs[nextSpatialLayer][0] > 0 { + f.targetSpatialLayer = nextSpatialLayer + f.targetTemporalLayer = 0 + + increased = true + bandwidthRequested = brs[nextSpatialLayer][0] + return + } + + return +} + +func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.muted { + return &TranslationParams{ + shouldDrop: true, + }, nil + } + + switch f.kind { + case webrtc.RTPCodecTypeAudio: + return f.getTranslationParamsAudio(extPkt) + case webrtc.RTPCodecTypeVideo: + return f.getTranslationParamsVideo(extPkt, layer) + } + + return nil, ErrUnknownKind +} + +// should be called with lock held +func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) { + if f.lastSSRC != extPkt.Packet.SSRC { + if !f.started { + // start of stream + f.started = true + f.rtpMunger.SetLastSnTs(extPkt) + } else { + // LK-TODO-START + // TS offset of 1 is not accurate. It should ideally + // be driven by packetization of the incoming track. + // But, on a track switch, won't have any historic data + // of a new track though. + // LK-TODO-END + f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, 1) + } + + f.lastSSRC = extPkt.Packet.SSRC + } + + tp := &TranslationParams{} + + tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt) + if err != nil { + tp.shouldDrop = true + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return tp, nil + } + + return tp, err + } + + tp.rtp = tpRTP + return tp, nil +} + +// should be called with lock held +func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { + tp := &TranslationParams{} + + if f.targetSpatialLayer == InvalidSpatialLayer { + // stream is paused by streamallocator + tp.shouldDrop = true + return tp, nil + } + + tp.shouldSendPLI = false + if f.targetSpatialLayer != f.currentSpatialLayer { + if f.targetSpatialLayer == layer { + if extPkt.KeyFrame { + // lock to target layer + f.currentSpatialLayer = f.targetSpatialLayer + } else { + tp.shouldSendPLI = true + } + } + } + + if f.currentSpatialLayer != layer { + tp.shouldDrop = true + return tp, nil + } + + if f.targetSpatialLayer < f.currentSpatialLayer && f.targetSpatialLayer < f.maxSpatialLayer { + // + // If target layer is lower than both the current and + // maximum subscribed layer, it is due to bandwidth + // constraints that the target layer has been switched down. + // Continuing to send higher layer will only exacerbate the + // situation by putting more stress on the channel. So, drop it. + // + // In the other direction, it is okay to keep forwarding till + // switch point to get a smoother stream till the higher + // layer key frame arrives. + // + // Note that in the case of client subscription layer restriction + // coinciding with server restriction due to bandwidth limitation, + // this will take client subscription as the winning vote and + // continue to stream current spatial layer till switch point. + // That could lead to congesting the channel. + // LK-TODO: Improve the above case, i. e. distinguish server + // applied restriction from client requested restriction. + // + tp.shouldDrop = true + return tp, nil + } + + if f.lastSSRC != extPkt.Packet.SSRC { + if !f.started { + f.started = true + f.rtpMunger.SetLastSnTs(extPkt) + if f.vp8Munger != nil { + f.vp8Munger.SetLast(extPkt) + } + } else { + // LK-TODO-START + // The below offset calculation is not technically correct. + // Timestamps based on the system time of an intermediate box like + // SFU is not going to be accurate. Packets arrival/processing + // are subject to vagaries of network delays, SFU processing etc. + // But, the correct way is a lot harder. Will have to + // look at RTCP SR to get timestamps and figure out alignment + // of layers and use that during layer switch. That can + // get tricky. Given the complexity of that approach, maybe + // this is just fine till it is not :-). + // LK-TODO-END + + // Compute how much time passed between the old RTP extPkt + // and the current packet, and fix timestamp on source change + tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6 + td := uint32((tDiffMs * (int64(f.codec.ClockRate) / 1000)) / 1000) + if td == 0 { + td = 1 + } + f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td) + if f.vp8Munger != nil { + f.vp8Munger.UpdateOffsets(extPkt) + } + } + + f.lastSSRC = extPkt.Packet.SSRC + } + + f.lTSCalc = extPkt.Arrival + + tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt) + if err != nil { + tp.shouldDrop = true + if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + return tp, nil + } + + return tp, err + } + + if f.vp8Munger == nil { + tp.rtp = tpRTP + return tp, nil + } + + tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tpRTP.snOrdering, f.currentTemporalLayer) + if err != nil { + tp.shouldDrop = true + if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { + if err == ErrFilteredVP8TemporalLayer { + // filtered temporal layer, update sequence number offset to prevent holes + f.rtpMunger.PacketDropped(extPkt) + } + return tp, nil + } + + return tp, err + } + + // catch up temporal layer if necessary + if tpVP8 != nil && f.currentTemporalLayer != f.targetTemporalLayer { + if tpVP8.header.TIDPresent == 1 && tpVP8.header.TID <= uint8(f.targetTemporalLayer) { + f.currentTemporalLayer = f.targetTemporalLayer + } + } + + tp.rtp = tpRTP + tp.vp8 = tpVP8 + return tp, nil +} + +func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) { + f.lock.Lock() + defer f.lock.Unlock() + + // padding is used for probing. Padding packets should be + // at frame boundaries only to ensure decoder sequencer does + // not get out-of-sync. But, when a stream is paused, + // force a frame marker as a restart of the stream will + // start with a key frame which will reset the decoder. + forceMarker := false + if f.targetSpatialLayer == InvalidSpatialLayer { + forceMarker = true + } + return f.rtpMunger.UpdateAndGetPaddingSnTs(num, 0, 0, forceMarker) +} + +func (f *Forwarder) GetSnTsForBlankFrames() ([]SnTs, bool, error) { + f.lock.Lock() + defer f.lock.Unlock() + + num := RTPBlankFramesMax + frameEndNeeded := !f.rtpMunger.IsOnFrameBoundary() + if frameEndNeeded { + num++ + } + snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(num, f.codec.ClockRate, 30, frameEndNeeded) + return snts, frameEndNeeded, err +} + +func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) (*buffer.VP8, error) { + f.lock.Lock() + defer f.lock.Unlock() + + return f.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) +} + +func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.rtpMunger.GetParams() +} + +//--------------------------------------------------- + +// +// RTPMunger +// +type RTPMungerParams struct { highestIncomingSN uint16 lastSN uint16 snOffset uint16 @@ -1417,100 +1438,85 @@ type MungerParams struct { missingSNs map[uint16]uint16 } -type Munger struct { - lock sync.RWMutex - - MungerParams +type RTPMunger struct { + RTPMungerParams } -func NewMunger() *Munger { - return &Munger{MungerParams: MungerParams{ +func NewRTPMunger() *RTPMunger { + return &RTPMunger{RTPMungerParams: RTPMungerParams{ missingSNs: make(map[uint16]uint16, 10), }} } -func (m *Munger) getParams() MungerParams { - m.lock.RLock() - defer m.lock.RUnlock() - - return MungerParams{ - highestIncomingSN: m.highestIncomingSN, - lastSN: m.lastSN, - snOffset: m.snOffset, - lastTS: m.lastTS, - tsOffset: m.tsOffset, - lastMarker: m.lastMarker, +func (r *RTPMunger) GetParams() RTPMungerParams { + return RTPMungerParams{ + highestIncomingSN: r.highestIncomingSN, + lastSN: r.lastSN, + snOffset: r.snOffset, + lastTS: r.lastTS, + tsOffset: r.tsOffset, + lastMarker: r.lastMarker, } } -func (m *Munger) SetLastSnTs(extPkt *buffer.ExtPacket) { - m.lock.Lock() - defer m.lock.Unlock() - - m.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - m.lastSN = extPkt.Packet.SequenceNumber - m.lastTS = extPkt.Packet.Timestamp +func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { + r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.lastSN = extPkt.Packet.SequenceNumber + r.lastTS = extPkt.Packet.Timestamp } -func (m *Munger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { - m.lock.Lock() - defer m.lock.Unlock() +func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { + r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust + r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust - m.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - m.snOffset = extPkt.Packet.SequenceNumber - m.lastSN - snAdjust - m.tsOffset = extPkt.Packet.Timestamp - m.lastTS - tsAdjust - - // clear incoming missing sequence numbers on layer switch - m.missingSNs = make(map[uint16]uint16, 10) + // clear incoming missing sequence numbers on layer/source switch + r.missingSNs = make(map[uint16]uint16, 10) } -func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket) { - m.lock.Lock() - defer m.lock.Unlock() - +func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { if !extPkt.Head { return } - m.highestIncomingSN = extPkt.Packet.SequenceNumber - m.snOffset += 1 + r.highestIncomingSN = extPkt.Packet.SequenceNumber + r.snOffset += 1 } -func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, SequenceNumberOrdering, error) { - m.lock.Lock() - defer m.lock.Unlock() - +func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { // if out-of-order, look up missing sequence number cache if !extPkt.Head { - snOffset, ok := m.missingSNs[extPkt.Packet.SequenceNumber] + snOffset, ok := r.missingSNs[extPkt.Packet.SequenceNumber] if !ok { - return 0, 0, SequenceNumberOrderingOutOfOrder, ErrOutOfOrderSequenceNumberCacheMiss + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingOutOfOrder, + }, ErrOutOfOrderSequenceNumberCacheMiss } - delete(m.missingSNs, extPkt.Packet.SequenceNumber) - return extPkt.Packet.SequenceNumber - snOffset, extPkt.Packet.Timestamp - m.tsOffset, SequenceNumberOrderingOutOfOrder, nil + delete(r.missingSNs, extPkt.Packet.SequenceNumber) + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingOutOfOrder, + sequenceNumber: extPkt.Packet.SequenceNumber - snOffset, + timestamp: extPkt.Packet.Timestamp - r.tsOffset, + }, nil } ordering := SequenceNumberOrderingContiguous // if there are gaps, record it in missing sequence number cache - diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN + diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN if diff > 1 { ordering = SequenceNumberOrderingGap - var lossStartSN, lossEndSN int - lossStartSN = int(m.highestIncomingSN) + 1 - if extPkt.Packet.SequenceNumber > m.highestIncomingSN { - lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 - } else { - lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 + buffer.MaxSN - } - for lostSN := lossStartSN; lostSN <= lossEndSN; lostSN++ { - m.missingSNs[uint16(lostSN&0xffff)] = m.snOffset + + for i := r.highestIncomingSN + 1; i != extPkt.Packet.SequenceNumber; i++ { + r.missingSNs[i] = r.snOffset } } else { // can get duplicate packet due to FEC if diff == 0 { - return 0, 0, SequenceNumberOrderingUnknown, ErrDuplicatePacket + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingDuplicate, + }, ErrDuplicatePacket } // if padding only packet, can be dropped and sequence number adjusted @@ -1518,9 +1524,12 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, Seq // incoming sequence number and it is a good point to adjust // sequence number offset. if len(extPkt.Packet.Payload) == 0 { - m.highestIncomingSN = extPkt.Packet.SequenceNumber - m.snOffset += 1 - return 0, 0, SequenceNumberOrderingContiguous, ErrPaddingOnlyPacket + r.highestIncomingSN = extPkt.Packet.SequenceNumber + r.snOffset += 1 + + return &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + }, ErrPaddingOnlyPacket } } @@ -1530,45 +1539,58 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, Seq // it is unclear if the current packet should be dropped if it is not // contiguous. Hence forward anything that is not contiguous. // Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html - mungedSN := extPkt.Packet.SequenceNumber - m.snOffset - mungedTS := extPkt.Packet.Timestamp - m.tsOffset + mungedSN := extPkt.Packet.SequenceNumber - r.snOffset + mungedTS := extPkt.Packet.Timestamp - r.tsOffset - m.highestIncomingSN = extPkt.Packet.SequenceNumber - m.lastSN = mungedSN - m.lastTS = mungedTS - m.lastMarker = extPkt.Packet.Marker + r.highestIncomingSN = extPkt.Packet.SequenceNumber + r.lastSN = mungedSN + r.lastTS = mungedTS + r.lastMarker = extPkt.Packet.Marker - return mungedSN, mungedTS, ordering, nil + return &TranslationParamsRTP{ + snOrdering: ordering, + sequenceNumber: mungedSN, + timestamp: mungedTS, + }, nil } -func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) { - m.lock.Lock() - defer m.lock.Unlock() - - if !m.lastMarker && !forceMarker { - return 0, 0, ErrPaddingNotOnFrameBoundary +func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool) ([]SnTs, error) { + tsOffset := 0 + if !r.lastMarker { + if !forceMarker { + return nil, ErrPaddingNotOnFrameBoundary + } else { + // if forcing frame end, use timestamp of latest received frame for the first one + tsOffset = 1 + } } - sn := m.lastSN + 1 - ts := m.lastTS + vals := make([]SnTs, num) + for i := 0; i < num; i++ { + vals[i].sequenceNumber = r.lastSN + uint16(i) + 1 + if frameRate != 0 { + vals[i].timestamp = r.lastTS + uint32(i+1-tsOffset)*(clockRate/frameRate) + } else { + vals[i].timestamp = r.lastTS + } + } - m.lastSN = sn - m.snOffset -= 1 + r.lastSN = vals[num-1].sequenceNumber + r.snOffset -= uint16(num) if forceMarker { - m.lastMarker = true + r.lastMarker = true } - return sn, ts, nil + return vals, nil } -func (m *Munger) IsOnFrameBoundary() bool { - m.lock.RLock() - defer m.lock.RUnlock() - - return m.lastMarker +func (r *RTPMunger) IsOnFrameBoundary() bool { + return r.lastMarker } +//--------------------------------------------------- + // // VP8 munger // @@ -1590,8 +1612,6 @@ type VP8MungerParams struct { } type VP8Munger struct { - lock sync.Mutex - VP8MungerParams } @@ -1603,9 +1623,6 @@ func NewVP8Munger() *VP8Munger { } func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { - v.lock.Lock() - defer v.lock.Unlock() - vp8, ok := extPkt.Payload.(buffer.VP8) if !ok { return @@ -1633,9 +1650,6 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { - v.lock.Lock() - defer v.lock.Unlock() - vp8, ok := extPkt.Payload.(buffer.VP8) if !ok { return @@ -1660,10 +1674,7 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { v.lastDroppedPictureId = -1 } -func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*buffer.VP8, error) { - v.lock.Lock() - defer v.lock.Unlock() - +func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error) { vp8, ok := extPkt.Payload.(buffer.VP8) if !ok { return nil, ErrNotVP8 @@ -1701,7 +1712,9 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumb IsKeyFrame: vp8.IsKeyFrame, HeaderSize: vp8.HeaderSize + buffer.VP8PictureIdSizeDiff(mungedPictureId > 127, vp8.MBit), } - return vp8Packet, nil + return &TranslationParamsVP8{ + header: vp8Packet, + }, nil } prevMaxPictureId := v.pictureIdWrapHandler.MaxPictureId() @@ -1782,13 +1795,12 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumb IsKeyFrame: vp8.IsKeyFrame, HeaderSize: vp8.HeaderSize + buffer.VP8PictureIdSizeDiff(mungedPictureId > 127, vp8.MBit), } - return vp8Packet, nil + return &TranslationParamsVP8{ + header: vp8Packet, + }, nil } func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) { - v.lock.Lock() - defer v.lock.Unlock() - offset := 0 if newPicture { offset = 1 @@ -1849,6 +1861,11 @@ func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) { return vp8Packet, nil } +//----------------------------- + +// +// VP8Munger +// func isWrapping7Bit(val1 int32, val2 int32) bool { return val2 < val1 && (val1-val2) > (1<<6) } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index f51ed2926..e389b5a25 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -1225,11 +1225,13 @@ type Track struct { } func newTrack(downTrack *DownTrack, peerID string) *Track { + maxSpatialLayer, maxTemporalLayer := downTrack.MaxLayers() + return &Track{ downTrack: downTrack, peerID: peerID, - maxSpatialLayer: downTrack.MaxSpatialLayer(), - maxTemporalLayer: downTrack.MaxTemporalLayer(), + maxSpatialLayer: maxSpatialLayer, + maxTemporalLayer: maxTemporalLayer, } }