From fc52b18776a2f3a62319d83af426caa702dd9d95 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 11 Nov 2021 14:38:38 +0530 Subject: [PATCH] Try sending small key frames to clear decoded buffer (#179) * Try sending small key frames to clear decoded buffer Problem: -------- With transceiver re-use, client disables/enables tracks. With video, this retains the last picture and when a new track starts, there is a brief moment when the old stream is displayed till there is data from new stream to decode and display. Fix: --- Send small key frames before closing DownTrack to try and clear the decoder display buffer. Testing: -------- Tried with Chrome/Safari/Firefox and they worked. But, very rarely, the last frames do not seem to show up. In fact, 6 frames are sent and webrtc internals (in Firefox) reports anywhere from 1 - 6 frames at the small resolution. Unclear as to why it does not get all the frames or why it reports less than 6. A not so small percentage of times (maybe 1 in 15 - 20), have seen no small frame reported at all. TODO: ---- - Have to support more video codecs - Would this be an issue for audio also? Should we send something to handle that? Probably not necessary as video is more jarring. * Make VP8 Key Frame a const * Need a packet factory buffer for simple tracks too as we are using the VP8 munger for simple tracks too because of the need to send blank frames at the end. Also, making the writeBlankFrameRTP a private function. And adding a check to not send blank frames if nothing has been sent on that DownTrack. --- pkg/rtc/mediatrack.go | 1 + pkg/rtc/participant.go | 2 +- pkg/sfu/downtrack.go | 259 +++++++++++++++++++++++++++++++++++------ pkg/sfu/receiver.go | 1 + 4 files changed, 224 insertions(+), 39 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f7463f946..48b5be00d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -263,6 +263,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { "track", t.ID(), "pIDs", []string{t.params.ParticipantID, sub.ID()}, "participant", sub.Identity(), + "kind", t.Kind(), ) if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { if err == webrtc.ErrConnectionClosed { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 40bb2e84a..19142314a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -806,7 +806,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { // RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(), - "participant", p.Identity(), "track", subTrack.ID()) + "participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind()) p.lock.Lock() delete(p.subscribedTracks, subTrack.ID()) // remove from subscribed map diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c411a9e32..0e71ad1ca 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -27,6 +27,7 @@ const ( const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 + RTPBlankFramesMax = 6 ) var ( @@ -39,6 +40,10 @@ var ( ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") ) +var ( + VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} +) + type simulcastTrackHelpers struct { switchDelay time.Time temporalSupported bool @@ -122,7 +127,7 @@ type DownTrack struct { // NewDownTrack returns a DownTrack. func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, peerID string, mt int) (*DownTrack, error) { - return &DownTrack{ + d := &DownTrack{ id: r.TrackID(), peerID: peerID, maxTrack: mt, @@ -131,7 +136,12 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p receiver: r, codec: c, munger: NewMunger(), - }, nil + } + if strings.ToLower(c.MimeType) == "video/vp8" { + d.vp8Munger = NewVP8Munger() + } + + return d, nil } // Bind is called by the PeerConnection after negotiation is complete @@ -323,7 +333,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize } - sn, ts, err := d.munger.UpdateAndGetPaddingSnTs() + sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(false) if err != nil { return bytesSent } @@ -410,8 +420,15 @@ func (d *DownTrack) Mute(val bool) { // Close track func (d *DownTrack) Close() { d.enabled.set(false) + + // write blank frames after disabling so that other frames do not interfere. + // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. + // Otherwise, with transceiver re-use last frame from previous stream is held in the + // display buffer and there could be a brief moment where the previous stream is displayed. + d.writeBlankFrameRTP() + d.closeOnce.Do(func() { - Logger.V(1).Info("Closing sender", "peer_id", d.peerID) + Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.Kind()) if d.payload != nil { packetFactory.Put(d.payload) } @@ -791,13 +808,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { d.munger.UpdateSnTsOffsets(extPkt, 1, 1) } else { d.munger.SetLastSnTs(extPkt) - if d.mime == "video/vp8" { - if vp8, ok := extPkt.Payload.(buffer.VP8); ok { - if vp8.TIDPresent == 1 { - d.vp8Munger = NewVP8Munger() - } - } - } if d.vp8Munger != nil { d.vp8Munger.SetLast(extPkt) } @@ -859,7 +869,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head) - if meta != nil && d.vp8Munger != nil { + if meta != nil && translatedVP8 != nil { meta.packVP8(translatedVP8) } } @@ -953,12 +963,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } else if lTSCalc == 0 { d.munger.SetLastSnTs(extPkt) - if d.mime == "video/vp8" { - if _, ok := extPkt.Payload.(buffer.VP8); ok { - // need a munger for simulcast with or without temporal filtering - d.vp8Munger = NewVP8Munger() - } - } if d.vp8Munger != nil { d.vp8Munger.SetLast(extPkt) } @@ -1017,7 +1021,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head) - if meta != nil && d.vp8Munger != nil { + if meta != nil && translatedVP8 != nil { meta.packVP8(translatedVP8) } } @@ -1048,6 +1052,87 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err return err } +func (d *DownTrack) writeBlankFrameRTP() error { + // don't send if nothing has been sent + if d.packetCount.get() == 0 { + return nil + } + + // LK-TODO: Support other video codecs + if d.Kind() == webrtc.RTPCodecTypeAudio || d.mime != "video/vp8" { + return nil + } + + // send a number of blank frames just in case there is loss. + // Intentionally ignoring check for mute or bandwidth constrained mute + // as this is used to clear client side buffer. + i := 0 + for { + frameEndNeeded := false + if !d.munger.IsOnFrameBoundary() { + frameEndNeeded = true + } + + sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(frameEndNeeded) + if err != nil { + return err + } + + adjustedTs := ts + uint32(i+1)*(d.codec.ClockRate/30) // assume 30 fps + if frameEndNeeded { + adjustedTs = ts + } + hdr := rtp.Header{ + Version: 2, + Padding: false, + Marker: true, + PayloadType: d.payloadType, + SequenceNumber: sn, + Timestamp: adjustedTs, + SSRC: d.ssrc, + CSRC: []uint32{}, + } + + err = d.WriteRTPHeaderExtensions(&hdr) + if err != nil { + return err + } + + blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) + if err != nil { + return err + } + + // 1x1 key frame + // Used even when closing out a previous frame. Looks like receivers + // do not care about content (it will probably end up being an undecodable + // frame, but that should be okay as there are key frames following) + payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) + vp8Header := payload[:blankVP8.HeaderSize] + err = blankVP8.MarshalTo(vp8Header) + if err != nil { + return err + } + + copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) + + _, err = d.writeStream.WriteRTP(&hdr, payload) + if err != nil { + return err + } + if !frameEndNeeded { + i++ + } + + if i >= RTPBlankFramesMax { + break + } + } + + return nil +} + + func (d *DownTrack) handleRTCP(bytes []byte) { // LK-TODO - should probably handle RTCP even if muted if !d.enabled.get() { @@ -1344,17 +1429,15 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err // if there are gaps, record it in missing sequence number cache diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN if diff > 1 { + var lossStartSN, lossEndSN int + lossStartSN = int(m.highestIncomingSN) + 1 if extPkt.Packet.SequenceNumber > m.highestIncomingSN { - for lostSN := m.highestIncomingSN + 1; lostSN < extPkt.Packet.SequenceNumber; lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } + lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 } else { - for lostSN := m.highestIncomingSN + 1; lostSN <= uint16(buffer.MaxSN-1); lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } - for lostSN := uint16(0); lostSN < extPkt.Packet.SequenceNumber; lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } + lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 + buffer.MaxSN + } + for lostSN := lossStartSN; lostSN <= lossEndSN; lostSN++ { + m.missingSNs[uint16(lostSN&0xffff)] = m.snOffset } } else { // can get duplicate packet due to FEC @@ -1390,11 +1473,11 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err return mungedSN, mungedTS, nil } -func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) { +func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) { m.lock.Lock() defer m.lock.Unlock() - if !m.lastMarker { + if !m.lastMarker && !forceMarker { return 0, 0, ErrPaddingNotOnFrameBoundary } @@ -1404,9 +1487,20 @@ func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) { m.lastSN = sn m.snOffset -= 1 + if forceMarker { + m.lastMarker = true + } + return sn, ts, nil } +func (m *Munger) IsOnFrameBoundary() bool { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.lastMarker +} + // // VP8 munger // @@ -1414,10 +1508,14 @@ type VP8MungerParams struct { pictureIdWrapHandler VP8PictureIdWrapHandler extLastPictureId int32 pictureIdOffset int32 + pictureIdUsed int lastTl0PicIdx uint8 tl0PicIdxOffset uint8 + tl0PicIdxUsed int + tidUsed int lastKeyIdx uint8 keyIdxOffset uint8 + keyIdxUsed int missingPictureIds map[int32]int32 } @@ -1443,10 +1541,23 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { return } - v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) - v.extLastPictureId = int32(vp8.PictureID) - v.lastTl0PicIdx = vp8.TL0PICIDX - v.lastKeyIdx = vp8.KEYIDX + v.pictureIdUsed = vp8.PictureIDPresent + if v.pictureIdUsed == 1 { + v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) + v.extLastPictureId = int32(vp8.PictureID) + } + + v.tl0PicIdxUsed = vp8.TL0PICIDXPresent + if v.tl0PicIdxUsed == 1 { + v.lastTl0PicIdx = vp8.TL0PICIDX + } + + v.tidUsed = vp8.TIDPresent + + v.keyIdxUsed = vp8.KEYIDXPresent + if v.keyIdxUsed == 1 { + v.lastKeyIdx = vp8.KEYIDX + } } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { @@ -1458,10 +1569,18 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { return } - v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) - v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1 - v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1 - v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f + if v.pictureIdUsed == 1 { + v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) + v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1 + } + + if v.tl0PicIdxUsed == 1 { + v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1 + } + + if v.keyIdxUsed == 1 { + v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f + } } func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) { @@ -1556,6 +1675,70 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 return vp8Packet, nil } +func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) { + v.lock.Lock() + defer v.lock.Unlock() + + offset := 0 + if newPicture { + offset = 1 + } + + headerSize := 1 + if (v.pictureIdUsed + v.tl0PicIdxUsed + v.tidUsed + v.keyIdxUsed) != 0 { + headerSize += 1 + } + + extPictureId := v.extLastPictureId + if v.pictureIdUsed == 1 { + extPictureId = v.extLastPictureId + int32(offset) + v.extLastPictureId = extPictureId + v.pictureIdOffset -= int32(offset) + if (extPictureId & 0x7fff) > 127 { + headerSize += 2 + } else { + headerSize += 1 + } + } + pictureId := uint16(extPictureId & 0x7fff) + + tl0PicIdx := uint8(0) + if v.tl0PicIdxUsed == 1 { + tl0PicIdx = v.lastTl0PicIdx + uint8(offset) + v.lastTl0PicIdx = tl0PicIdx + v.tl0PicIdxOffset -= uint8(offset) + headerSize += 1 + } + + if (v.tidUsed + v.keyIdxUsed) != 0 { + headerSize += 1 + } + + keyIdx := uint8(0) + if v.keyIdxUsed == 1 { + keyIdx = (v.lastKeyIdx + uint8(offset)) & 0x1f + v.lastKeyIdx = keyIdx + v.keyIdxOffset -= uint8(offset) + } + + vp8Packet := &buffer.VP8{ + FirstByte: 0x10, // partition 0, start of VP8 Partition, reference frame + PictureIDPresent: v.pictureIdUsed, + PictureID: pictureId, + MBit: pictureId > 127, + TL0PICIDXPresent: v.tl0PicIdxUsed, + TL0PICIDX: tl0PicIdx, + TIDPresent: v.tidUsed, + TID: 0, + Y: 1, + KEYIDXPresent: v.keyIdxUsed, + KEYIDX: keyIdx, + IsKeyFrame: true, + HeaderSize: headerSize, + } + return vp8Packet, nil +} + func isWrapping7Bit(val1 int32, val2 int32) bool { return val2 < val1 && (val1-val2) > (1<<6) } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index ab9b0c0be..d9a33b314 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -295,6 +295,7 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { // LK-TODO-END track.SetInitialLayers(0, 0) track.trackType = SimpleDownTrack + track.payload = packetFactory.Get().(*[]byte) } w.storeDownTrack(track)