diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f7463f946..48b5be00d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -263,6 +263,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { "track", t.ID(), "pIDs", []string{t.params.ParticipantID, sub.ID()}, "participant", sub.Identity(), + "kind", t.Kind(), ) if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { if err == webrtc.ErrConnectionClosed { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 40bb2e84a..19142314a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -806,7 +806,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { // RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(), - "participant", p.Identity(), "track", subTrack.ID()) + "participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind()) p.lock.Lock() delete(p.subscribedTracks, subTrack.ID()) // remove from subscribed map diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c411a9e32..0e71ad1ca 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -27,6 +27,7 @@ const ( const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 + RTPBlankFramesMax = 6 ) var ( @@ -39,6 +40,10 @@ var ( ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") ) +var ( + VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} +) + type simulcastTrackHelpers struct { switchDelay time.Time temporalSupported bool @@ -122,7 +127,7 @@ type DownTrack struct { // NewDownTrack returns a DownTrack. func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, peerID string, mt int) (*DownTrack, error) { - return &DownTrack{ + d := &DownTrack{ id: r.TrackID(), peerID: peerID, maxTrack: mt, @@ -131,7 +136,12 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p receiver: r, codec: c, munger: NewMunger(), - }, nil + } + if strings.ToLower(c.MimeType) == "video/vp8" { + d.vp8Munger = NewVP8Munger() + } + + return d, nil } // Bind is called by the PeerConnection after negotiation is complete @@ -323,7 +333,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { size = RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize } - sn, ts, err := d.munger.UpdateAndGetPaddingSnTs() + sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(false) if err != nil { return bytesSent } @@ -410,8 +420,15 @@ func (d *DownTrack) Mute(val bool) { // Close track func (d *DownTrack) Close() { d.enabled.set(false) + + // write blank frames after disabling so that other frames do not interfere. + // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. + // Otherwise, with transceiver re-use last frame from previous stream is held in the + // display buffer and there could be a brief moment where the previous stream is displayed. + d.writeBlankFrameRTP() + d.closeOnce.Do(func() { - Logger.V(1).Info("Closing sender", "peer_id", d.peerID) + Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.Kind()) if d.payload != nil { packetFactory.Put(d.payload) } @@ -791,13 +808,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { d.munger.UpdateSnTsOffsets(extPkt, 1, 1) } else { d.munger.SetLastSnTs(extPkt) - if d.mime == "video/vp8" { - if vp8, ok := extPkt.Payload.(buffer.VP8); ok { - if vp8.TIDPresent == 1 { - d.vp8Munger = NewVP8Munger() - } - } - } if d.vp8Munger != nil { d.vp8Munger.SetLast(extPkt) } @@ -859,7 +869,7 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, 0, extPkt.Head) - if meta != nil && d.vp8Munger != nil { + if meta != nil && translatedVP8 != nil { meta.packVP8(translatedVP8) } } @@ -953,12 +963,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } } else if lTSCalc == 0 { d.munger.SetLastSnTs(extPkt) - if d.mime == "video/vp8" { - if _, ok := extPkt.Payload.(buffer.VP8); ok { - // need a munger for simulcast with or without temporal filtering - d.vp8Munger = NewVP8Munger() - } - } if d.vp8Munger != nil { d.vp8Munger.SetLast(extPkt) } @@ -1017,7 +1021,7 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err } if d.sequencer != nil { meta := d.sequencer.push(extPkt.Packet.SequenceNumber, newSN, newTS, uint8(csl), extPkt.Head) - if meta != nil && d.vp8Munger != nil { + if meta != nil && translatedVP8 != nil { meta.packVP8(translatedVP8) } } @@ -1048,6 +1052,87 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket, layer int32) err return err } +func (d *DownTrack) writeBlankFrameRTP() error { + // don't send if nothing has been sent + if d.packetCount.get() == 0 { + return nil + } + + // LK-TODO: Support other video codecs + if d.Kind() == webrtc.RTPCodecTypeAudio || d.mime != "video/vp8" { + return nil + } + + // send a number of blank frames just in case there is loss. + // Intentionally ignoring check for mute or bandwidth constrained mute + // as this is used to clear client side buffer. + i := 0 + for { + frameEndNeeded := false + if !d.munger.IsOnFrameBoundary() { + frameEndNeeded = true + } + + sn, ts, err := d.munger.UpdateAndGetPaddingSnTs(frameEndNeeded) + if err != nil { + return err + } + + adjustedTs := ts + uint32(i+1)*(d.codec.ClockRate/30) // assume 30 fps + if frameEndNeeded { + adjustedTs = ts + } + hdr := rtp.Header{ + Version: 2, + Padding: false, + Marker: true, + PayloadType: d.payloadType, + SequenceNumber: sn, + Timestamp: adjustedTs, + SSRC: d.ssrc, + CSRC: []uint32{}, + } + + err = d.WriteRTPHeaderExtensions(&hdr) + if err != nil { + return err + } + + blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) + if err != nil { + return err + } + + // 1x1 key frame + // Used even when closing out a previous frame. Looks like receivers + // do not care about content (it will probably end up being an undecodable + // frame, but that should be okay as there are key frames following) + payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) + vp8Header := payload[:blankVP8.HeaderSize] + err = blankVP8.MarshalTo(vp8Header) + if err != nil { + return err + } + + copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) + + _, err = d.writeStream.WriteRTP(&hdr, payload) + if err != nil { + return err + } + if !frameEndNeeded { + i++ + } + + if i >= RTPBlankFramesMax { + break + } + } + + return nil +} + + func (d *DownTrack) handleRTCP(bytes []byte) { // LK-TODO - should probably handle RTCP even if muted if !d.enabled.get() { @@ -1344,17 +1429,15 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err // if there are gaps, record it in missing sequence number cache diff := extPkt.Packet.SequenceNumber - m.highestIncomingSN if diff > 1 { + var lossStartSN, lossEndSN int + lossStartSN = int(m.highestIncomingSN) + 1 if extPkt.Packet.SequenceNumber > m.highestIncomingSN { - for lostSN := m.highestIncomingSN + 1; lostSN < extPkt.Packet.SequenceNumber; lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } + lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 } else { - for lostSN := m.highestIncomingSN + 1; lostSN <= uint16(buffer.MaxSN-1); lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } - for lostSN := uint16(0); lostSN < extPkt.Packet.SequenceNumber; lostSN++ { - m.missingSNs[lostSN] = m.snOffset - } + lossEndSN = int(extPkt.Packet.SequenceNumber) - 1 + buffer.MaxSN + } + for lostSN := lossStartSN; lostSN <= lossEndSN; lostSN++ { + m.missingSNs[uint16(lostSN&0xffff)] = m.snOffset } } else { // can get duplicate packet due to FEC @@ -1390,11 +1473,11 @@ func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, err return mungedSN, mungedTS, nil } -func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) { +func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error) { m.lock.Lock() defer m.lock.Unlock() - if !m.lastMarker { + if !m.lastMarker && !forceMarker { return 0, 0, ErrPaddingNotOnFrameBoundary } @@ -1404,9 +1487,20 @@ func (m *Munger) UpdateAndGetPaddingSnTs() (uint16, uint32, error) { m.lastSN = sn m.snOffset -= 1 + if forceMarker { + m.lastMarker = true + } + return sn, ts, nil } +func (m *Munger) IsOnFrameBoundary() bool { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.lastMarker +} + // // VP8 munger // @@ -1414,10 +1508,14 @@ type VP8MungerParams struct { pictureIdWrapHandler VP8PictureIdWrapHandler extLastPictureId int32 pictureIdOffset int32 + pictureIdUsed int lastTl0PicIdx uint8 tl0PicIdxOffset uint8 + tl0PicIdxUsed int + tidUsed int lastKeyIdx uint8 keyIdxOffset uint8 + keyIdxUsed int missingPictureIds map[int32]int32 } @@ -1443,10 +1541,23 @@ func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) { return } - v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) - v.extLastPictureId = int32(vp8.PictureID) - v.lastTl0PicIdx = vp8.TL0PICIDX - v.lastKeyIdx = vp8.KEYIDX + v.pictureIdUsed = vp8.PictureIDPresent + if v.pictureIdUsed == 1 { + v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) + v.extLastPictureId = int32(vp8.PictureID) + } + + v.tl0PicIdxUsed = vp8.TL0PICIDXPresent + if v.tl0PicIdxUsed == 1 { + v.lastTl0PicIdx = vp8.TL0PICIDX + } + + v.tidUsed = vp8.TIDPresent + + v.keyIdxUsed = vp8.KEYIDXPresent + if v.keyIdxUsed == 1 { + v.lastKeyIdx = vp8.KEYIDX + } } func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { @@ -1458,10 +1569,18 @@ func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) { return } - v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) - v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1 - v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1 - v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f + if v.pictureIdUsed == 1 { + v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit) + v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1 + } + + if v.tl0PicIdxUsed == 1 { + v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1 + } + + if v.keyIdxUsed == 1 { + v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f + } } func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int32) (*buffer.VP8, error) { @@ -1556,6 +1675,70 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, maxTemporalLayer int3 return vp8Packet, nil } +func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) { + v.lock.Lock() + defer v.lock.Unlock() + + offset := 0 + if newPicture { + offset = 1 + } + + headerSize := 1 + if (v.pictureIdUsed + v.tl0PicIdxUsed + v.tidUsed + v.keyIdxUsed) != 0 { + headerSize += 1 + } + + extPictureId := v.extLastPictureId + if v.pictureIdUsed == 1 { + extPictureId = v.extLastPictureId + int32(offset) + v.extLastPictureId = extPictureId + v.pictureIdOffset -= int32(offset) + if (extPictureId & 0x7fff) > 127 { + headerSize += 2 + } else { + headerSize += 1 + } + } + pictureId := uint16(extPictureId & 0x7fff) + + tl0PicIdx := uint8(0) + if v.tl0PicIdxUsed == 1 { + tl0PicIdx = v.lastTl0PicIdx + uint8(offset) + v.lastTl0PicIdx = tl0PicIdx + v.tl0PicIdxOffset -= uint8(offset) + headerSize += 1 + } + + if (v.tidUsed + v.keyIdxUsed) != 0 { + headerSize += 1 + } + + keyIdx := uint8(0) + if v.keyIdxUsed == 1 { + keyIdx = (v.lastKeyIdx + uint8(offset)) & 0x1f + v.lastKeyIdx = keyIdx + v.keyIdxOffset -= uint8(offset) + } + + vp8Packet := &buffer.VP8{ + FirstByte: 0x10, // partition 0, start of VP8 Partition, reference frame + PictureIDPresent: v.pictureIdUsed, + PictureID: pictureId, + MBit: pictureId > 127, + TL0PICIDXPresent: v.tl0PicIdxUsed, + TL0PICIDX: tl0PicIdx, + TIDPresent: v.tidUsed, + TID: 0, + Y: 1, + KEYIDXPresent: v.keyIdxUsed, + KEYIDX: keyIdx, + IsKeyFrame: true, + HeaderSize: headerSize, + } + return vp8Packet, nil +} + func isWrapping7Bit(val1 int32, val2 int32) bool { return val2 < val1 && (val1-val2) > (1<<6) } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index ab9b0c0be..d9a33b314 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -295,6 +295,7 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { // LK-TODO-END track.SetInitialLayers(0, 0) track.trackType = SimpleDownTrack + track.payload = packetFactory.Get().(*[]byte) } w.storeDownTrack(track)