diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 86b0f82c5..7438c9c72 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -474,7 +474,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) } samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second)) - now := time.Now() + timeSinceFirst := time.Since(r.firstTime) + now := r.firstTime.Add(timeSinceFirst) firstTime := now.Add(-samplesDuration) if firstTime.Before(r.firstTime) { r.logger.Debugw( diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index df4f7381d..22392ec8a 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -281,10 +281,24 @@ func (r *RTPStatsSender) Update( } } + r.logger.Infow( + "adjusting start sequence number", + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + ) r.extStartSN = extSequenceNumber } if extTimestamp < r.extStartTS { + r.logger.Infow( + "adjusting start timestamp", + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + ) r.extStartTS = extTimestamp } diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index eb0965627..e7d33099f 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -69,8 +69,14 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 return } - for _, sendPkt := range pkts { + for i, sendPkt := range pkts { pPkt := *pkt + if i != len(pkts)-1 { + // patch extended sequence number and time stmap for all but the last packet, + // last packet is the primary payload + pPkt.ExtSequenceNumber -= uint64(pkts[len(pkts)-1].SequenceNumber - pkts[i].SequenceNumber) + pPkt.ExtTimestamp -= uint64(pkts[len(pkts)-1].Timestamp - pkts[i].Timestamp) + } pPkt.Packet = sendPkt // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, @@ -143,6 +149,7 @@ func (r *RedPrimaryReceiver) getSendPktsFromRed(rtp *rtp.Packet) ([]*rtp.Packet, switch { case diff == 0: // duplicate break + case diff > 0x8000: // unorder // in history if 65535-diff < 8 { @@ -191,25 +198,36 @@ func extractPktsFromRed(redPkt *rtp.Packet, recoverBits byte) ([]*rtp.Packet, er var blocks []block var blockLength int for { + if len(payload) < 1 { + // illegal data, need at least one byte for primary encoding + return nil, ErrIncompleteRedHeader + } + if payload[0]&0x80 == 0 { // last block is primary encoding data + pt := uint8(payload[0] & 0x7F) + + blocks = append(blocks, block{pt: pt, primary: true}) + payload = payload[1:] - blocks = append(blocks, block{primary: true}) break } else { if len(payload) < 4 { // illegal data return nil, ErrIncompleteRedHeader } + blockHead := binary.BigEndian.Uint32(payload[0:]) length := int(blockHead & 0x03FF) blockHead >>= 10 tsOffset := blockHead & 0x3FFF blockHead >>= 14 pt := uint8(blockHead & 0x7F) - payload = payload[4:] - blockLength += length + blocks = append(blocks, block{pt: pt, length: length, tsOffset: tsOffset}) + + blockLength += length + payload = payload[4:] } } @@ -220,22 +238,26 @@ func extractPktsFromRed(redPkt *rtp.Packet, recoverBits byte) ([]*rtp.Packet, er pkts := make([]*rtp.Packet, 0, len(blocks)) for i, b := range blocks { if b.primary { + header := redPkt.Header + header.PayloadType = b.pt pkts = append(pkts, &rtp.Packet{Header: redPkt.Header, Payload: payload}) break } - // last block is primary encoding recoverIndex := len(blocks) - i - 1 if recoverIndex < 1 || recoverBits&(1<<(recoverIndex-1)) == 0 { + // skip past packet/block that does not need recovery payload = payload[b.length:] continue } + // recover missing packet header := redPkt.Header header.SequenceNumber -= uint16(recoverIndex) header.Timestamp -= b.tsOffset header.PayloadType = b.pt pkts = append(pkts, &rtp.Packet{Header: header, Payload: payload[:b.length]}) + payload = payload[b.length:] } @@ -257,6 +279,11 @@ func extractPrimaryEncodingForRED(payload []byte) ([]byte, error) { var blockLength int for { + if len(payload) < 1 { + // illegal data, need at least one byte for primary encoding + return nil, ErrIncompleteRedHeader + } + if payload[0]&0x80 == 0 { // last block is primary encoding data payload = payload[1:] @@ -266,6 +293,7 @@ func extractPrimaryEncodingForRED(payload []byte) ([]byte, error) { // illegal data return nil, ErrIncompleteRedHeader } + blockLength += int(binary.BigEndian.Uint16(payload[2:]) & 0x03FF) payload = payload[4:] }