Try sending small key frames to clear decoded buffer (#179)

* Try sending small key frames to clear decoded buffer

Problem:
--------
With transceiver re-use, client disables/enables tracks.
With video, this retains the last picture and when a new track
starts, there is a brief moment when the old stream is displayed
till there is data from new stream to decode and display.

Fix:
---
Send small key frames before closing DownTrack to try and clear
the decoder display buffer.

Testing:
--------
Tried with Chrome/Safari/Firefox and they worked. But, very rarely,
the last frames do not seem to show up. In fact, 6 frames are sent
and webrtc internals (in Firefox) reports anywhere from 1 - 6 frames
at the small resolution. Unclear as to why it does not get all the
frames or why it reports less than 6. A not so small percentage of
times (maybe 1 in 15 - 20), have seen no small frame reported at all.

TODO:
----
- Have to support more video codecs
- Would this be an issue for audio also? Should we send something to handle that?
  Probably not necessary as video is more jarring.

* Make VP8 Key Frame a const

* Need a packet factory buffer for simple tracks too
as we are using the VP8 munger for simple tracks too because
of the need to send blank frames at the end.

Also, making the writeBlankFrameRTP a private function.
And adding a check to not send blank frames if nothing has been sent
on that DownTrack.
This commit is contained in:
Raja Subramanian
2021-11-11 14:38:38 +05:30
committed by GitHub
parent 0a769662aa
commit fc52b18776
4 changed files with 224 additions and 39 deletions
+1
View File
@@ -263,6 +263,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
"track", t.ID(),
"pIDs", []string{t.params.ParticipantID, sub.ID()},
"participant", sub.Identity(),
"kind", t.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
+1 -1
View File
@@ -806,7 +806,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(),
"participant", p.Identity(), "track", subTrack.ID())
"participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
p.lock.Lock()
delete(p.subscribedTracks, subTrack.ID())
// remove from subscribed map
+221 -38
View File
@@ -27,6 +27,7 @@ const (
const (
RTPPaddingMaxPayloadSize = 255
RTPPaddingEstimatedHeaderSize = 20
RTPBlankFramesMax = 6
)
var (
@@ -39,6 +40,10 @@ var (
ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer")
)
var (
VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00}
)
type simulcastTrackHelpers struct {
switchDelay time.Time
temporalSupported bool
@@ -122,7 +127,7 @@ type DownTrack struct {
// NewDownTrack returns a DownTrack.
func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, peerID string, mt int) (*DownTrack, error) {
return &DownTrack{
d := &DownTrack{
id: r.TrackID(),
peerID: peerID,
maxTrack: mt,
@@ -131,7 +136,12 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p
receiver: r,
codec: c,
munger: NewMunger(),
}, nil
}
if strings.ToLower(c.MimeType) == "video/vp8" {
d.vp8Munger = NewVP8Munger()
}
return d, nil
}
// Bind is called by the PeerConnection after negotiation is complete
@@ -323,7 +333,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize
}
sn, ts, err := d.munger.UpdateAndGetPaddingSnTs()
sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(false)
if err != nil {
return bytesSent
}
@@ -410,8 +420,15 @@ func (d *DownTrack) Mute(val bool) {
// Close track
func (d *DownTrack) Close() {
d.enabled.set(false)
// write blank frames after disabling so that other frames do not interfere.
// Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end.
// Otherwise, with transceiver re-use last frame from previous stream is held in the
// display buffer and there could be a brief moment where the previous stream is displayed.
d.writeBlankFrameRTP()
d.closeOnce.Do(func() {
Logger.V(1).Info("Closing sender", "peer_id", d.peerID)
Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.Kind())
if d.payload != nil {
packetFactory.Put(d.payload)
}
@@ -791,13 +808,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
d.munger.UpdateSnTsOffsets(extPkt, 1, 1)
} else {
d.munger.SetLastSnTs(extPkt)
if d.mime == "video/vp8" {
if vp8, ok := extPkt.Payload.(buffer.VP8); ok {
if vp8.TIDPresent == 1 {
d.vp8Munger = NewVP8Munger()
}
}
}
if d.vp8Munger != nil {
d.vp8Munger.SetLast(extPkt)
}
@@ -859,7 +869,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
}
if d.sequencer != nil {
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head)
if meta != nil && d.vp8Munger != nil {
if meta != nil && translatedVP8 != nil {
meta.packVP8(translatedVP8)
}
}
@@ -953,12 +963,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
}
} else if lTSCalc == 0 {
d.munger.SetLastSnTs(extPkt)
if d.mime == "video/vp8" {
if _, ok := extPkt.Payload.(buffer.VP8); ok {
// need a munger for simulcast with or without temporal filtering
d.vp8Munger = NewVP8Munger()
}
}
if d.vp8Munger != nil {
d.vp8Munger.SetLast(extPkt)
}
@@ -1017,7 +1021,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
}
if d.sequencer != nil {
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head)
if meta != nil && d.vp8Munger != nil {
if meta != nil && translatedVP8 != nil {
meta.packVP8(translatedVP8)
}
}
@@ -1048,6 +1052,87 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err
return err
}
func (d *DownTrack) writeBlankFrameRTP() error {
// don't send if nothing has been sent
if d.packetCount.get() == 0 {
return nil
}
// LK-TODO: Support other video codecs
if d.Kind() == webrtc.RTPCodecTypeAudio || d.mime != "video/vp8" {
return nil
}
// send a number of blank frames just in case there is loss.
// Intentionally ignoring check for mute or bandwidth constrained mute
// as this is used to clear client side buffer.
i := 0
for {
frameEndNeeded := false
if !d.munger.IsOnFrameBoundary() {
frameEndNeeded = true
}
sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(frameEndNeeded)
if err != nil {
return err
}
adjustedTs := ts + uint32(i+1)*(d.codec.ClockRate/30) // assume 30 fps
if frameEndNeeded {
adjustedTs = ts
}
hdr := rtp.Header{
Version: 2,
Padding: false,
Marker: true,
PayloadType: d.payloadType,
SequenceNumber: sn,
Timestamp: adjustedTs,
SSRC: d.ssrc,
CSRC: []uint32{},
}
err = d.WriteRTPHeaderExtensions(&hdr)
if err != nil {
return err
}
blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded)
if err != nil {
return err
}
// 1x1 key frame
// Used even when closing out a previous frame. Looks like receivers
// do not care about content (it will probably end up being an undecodable
// frame, but that should be okay as there are key frames following)
payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1))
vp8Header := payload[:blankVP8.HeaderSize]
err = blankVP8.MarshalTo(vp8Header)
if err != nil {
return err
}
copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1)
_, err = d.writeStream.WriteRTP(&hdr, payload)
if err != nil {
return err
}
if !frameEndNeeded {
i++
}
if i >= RTPBlankFramesMax {
break
}
}
return nil
}
func (d *DownTrack) handleRTCP(bytes []byte) {
// LK-TODO - should probably handle RTCP even if muted
if !d.enabled.get() {
@@ -1344,17 +1429,15 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
// if there are gaps, record it in missing sequence number cache
diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN
if diff > 1 {
var lossStartSN, lossEndSN int
lossStartSN = int(m.highestIncomingSN) + 1
if extPkt.Packet.SequenceNumber > m.highestIncomingSN {
for lostSN := m.highestIncomingSN + 1; lostSN < extPkt.Packet.SequenceNumber; lostSN++ {
m.missingSNs[lostSN] = m.snOffset
}
lossEndSN = int(extPkt.Packet.SequenceNumber) - 1
} else {
for lostSN := m.highestIncomingSN + 1; lostSN <= uint16(buffer.MaxSN-1); lostSN++ {
m.missingSNs[lostSN] = m.snOffset
}
for lostSN := uint16(0); lostSN < extPkt.Packet.SequenceNumber; lostSN++ {
m.missingSNs[lostSN] = m.snOffset
}
lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 + buffer.MaxSN
}
for lostSN := lossStartSN; lostSN <= lossEndSN; lostSN++ {
m.missingSNs[uint16(lostSN&0xffff)] = m.snOffset
}
} else {
// can get duplicate packet due to FEC
@@ -1390,11 +1473,11 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err
return mungedSN, mungedTS, nil
}
func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) {
func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) {
m.lock.Lock()
defer m.lock.Unlock()
if !m.lastMarker {
if !m.lastMarker && !forceMarker {
return 0, 0, ErrPaddingNotOnFrameBoundary
}
@@ -1404,9 +1487,20 @@ func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) {
m.lastSN = sn
m.snOffset -= 1
if forceMarker {
m.lastMarker = true
}
return sn, ts, nil
}
func (m *Munger) IsOnFrameBoundary() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.lastMarker
}
//
// VP8 munger
//
@@ -1414,10 +1508,14 @@ type VP8MungerParams struct {
pictureIdWrapHandler VP8PictureIdWrapHandler
extLastPictureId int32
pictureIdOffset int32
pictureIdUsed int
lastTl0PicIdx uint8
tl0PicIdxOffset uint8
tl0PicIdxUsed int
tidUsed int
lastKeyIdx uint8
keyIdxOffset uint8
keyIdxUsed int
missingPictureIds map[int32]int32
}
@@ -1443,10 +1541,23 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) {
return
}
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.extLastPictureId = int32(vp8.PictureID)
v.lastTl0PicIdx = vp8.TL0PICIDX
v.lastKeyIdx = vp8.KEYIDX
v.pictureIdUsed = vp8.PictureIDPresent
if v.pictureIdUsed == 1 {
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.extLastPictureId = int32(vp8.PictureID)
}
v.tl0PicIdxUsed = vp8.TL0PICIDXPresent
if v.tl0PicIdxUsed == 1 {
v.lastTl0PicIdx = vp8.TL0PICIDX
}
v.tidUsed = vp8.TIDPresent
v.keyIdxUsed = vp8.KEYIDXPresent
if v.keyIdxUsed == 1 {
v.lastKeyIdx = vp8.KEYIDX
}
}
func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
@@ -1458,10 +1569,18 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
return
}
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1
v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1
v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f
if v.pictureIdUsed == 1 {
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1
}
if v.tl0PicIdxUsed == 1 {
v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1
}
if v.keyIdxUsed == 1 {
v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f
}
}
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) {
@@ -1556,6 +1675,70 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3
return vp8Packet, nil
}
func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) {
v.lock.Lock()
defer v.lock.Unlock()
offset := 0
if newPicture {
offset = 1
}
headerSize := 1
if (v.pictureIdUsed + v.tl0PicIdxUsed + v.tidUsed + v.keyIdxUsed) != 0 {
headerSize += 1
}
extPictureId := v.extLastPictureId
if v.pictureIdUsed == 1 {
extPictureId = v.extLastPictureId + int32(offset)
v.extLastPictureId = extPictureId
v.pictureIdOffset -= int32(offset)
if (extPictureId & 0x7fff) > 127 {
headerSize += 2
} else {
headerSize += 1
}
}
pictureId := uint16(extPictureId & 0x7fff)
tl0PicIdx := uint8(0)
if v.tl0PicIdxUsed == 1 {
tl0PicIdx = v.lastTl0PicIdx + uint8(offset)
v.lastTl0PicIdx = tl0PicIdx
v.tl0PicIdxOffset -= uint8(offset)
headerSize += 1
}
if (v.tidUsed + v.keyIdxUsed) != 0 {
headerSize += 1
}
keyIdx := uint8(0)
if v.keyIdxUsed == 1 {
keyIdx = (v.lastKeyIdx + uint8(offset)) & 0x1f
v.lastKeyIdx = keyIdx
v.keyIdxOffset -= uint8(offset)
}
vp8Packet := &buffer.VP8{
FirstByte: 0x10, // partition 0, start of VP8 Partition, reference frame
PictureIDPresent: v.pictureIdUsed,
PictureID: pictureId,
MBit: pictureId > 127,
TL0PICIDXPresent: v.tl0PicIdxUsed,
TL0PICIDX: tl0PicIdx,
TIDPresent: v.tidUsed,
TID: 0,
Y: 1,
KEYIDXPresent: v.keyIdxUsed,
KEYIDX: keyIdx,
IsKeyFrame: true,
HeaderSize: headerSize,
}
return vp8Packet, nil
}
func isWrapping7Bit(val1 int32, val2 int32) bool {
return val2 < val1 && (val1-val2) > (1<<6)
}
+1
View File
@@ -295,6 +295,7 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {
// LK-TODO-END
track.SetInitialLayers(0, 0)
track.trackType = SimpleDownTrack
track.payload = packetFactory.Get().(*[]byte)
}
w.storeDownTrack(track)