Merge remote-tracking branch 'origin/master' into raja_fr

This commit is contained in:
boks1971
2024-03-21 15:37:13 +05:30
72 changed files with 2042 additions and 1412 deletions
+53 -29
View File
@@ -44,6 +44,9 @@ import (
const (
ReportDelta = time.Second
InitPacketBufferSizeVideo = 300
InitPacketBufferSizeAudio = 70
)
type pendingPacket struct {
@@ -68,8 +71,8 @@ type Buffer struct {
sync.RWMutex
bucket *bucket.Bucket
nacker *nack.NackQueue
videoPool *sync.Pool
audioPool *sync.Pool
maxVideoPkts int
maxAudioPkts int
codecType webrtc.RTPCodecType
payloadType uint8
extPackets deque.Deque[*ExtPacket]
@@ -100,6 +103,7 @@ type Buffer struct {
rtpStats *RTPStatsReceiver
rrSnapshotId uint32
deltaStatsSnapshotId uint32
ppsSnapshotId uint32
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
@@ -126,18 +130,19 @@ type Buffer struct {
extPacketTooMuchCount atomic.Uint32
primaryBufferForRTX *Buffer
rtxPktBuf []byte
}
// NewBuffer constructs a new Buffer
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
func NewBuffer(ssrc uint32, maxVideoPkts, maxAudioPkts int) *Buffer {
l := logger.GetLogger() // will be reset with correct context via SetLogger
b := &Buffer{
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
pliThrottle: int64(500 * time.Millisecond),
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
mediaSSRC: ssrc,
maxVideoPkts: maxVideoPkts,
maxAudioPkts: maxAudioPkts,
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
pliThrottle: int64(500 * time.Millisecond),
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
}
b.extPackets.SetMinCapacity(7)
return b
@@ -188,6 +193,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
})
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
b.deltaStatsSnapshotId = b.rtpStats.NewSnapshotId()
b.ppsSnapshotId = b.rtpStats.NewSnapshotId()
b.clockRate = codec.ClockRate
b.lastReport = time.Now()
@@ -225,10 +231,10 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
switch {
case strings.HasPrefix(b.mime, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = bucket.NewBucket(b.audioPool.Get().(*[]byte))
b.bucket = bucket.NewBucket(InitPacketBufferSizeAudio)
case strings.HasPrefix(b.mime, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = bucket.NewBucket(b.videoPool.Get().(*[]byte))
b.bucket = bucket.NewBucket(InitPacketBufferSizeVideo)
if b.frameRateCalculator[0] == nil {
if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) {
b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger)
@@ -347,22 +353,23 @@ func (b *Buffer) writeRTX(rtxPkt *rtp.Packet) (n int, err error) {
return
}
videoPktPtr := b.videoPool.Get().(*[]byte)
defer b.videoPool.Put(videoPktPtr)
if b.rtxPktBuf == nil {
b.rtxPktBuf = make([]byte, bucket.MaxPktSize)
}
videoPkt := *rtxPkt
videoPkt.PayloadType = b.payloadType
videoPkt.SequenceNumber = binary.BigEndian.Uint16(rtxPkt.Payload[:2])
videoPkt.SSRC = b.mediaSSRC
videoPkt.Payload = rtxPkt.Payload[2:]
n, err = videoPkt.MarshalTo((*videoPktPtr))
n, err = videoPkt.MarshalTo(b.rtxPktBuf)
if err != nil {
b.logger.Errorw("could not marshal repaired packet", err, "ssrc", b.mediaSSRC, "sn", videoPkt.SequenceNumber)
return
}
b.calc((*videoPktPtr)[:n], &videoPkt, time.Now(), true)
b.calc(b.rtxPktBuf[:n], &videoPkt, time.Now(), true)
return
}
@@ -417,13 +424,6 @@ func (b *Buffer) Close() error {
defer b.Unlock()
b.closeOnce.Do(func() {
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeVideo {
b.videoPool.Put(b.bucket.Src())
}
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio {
b.audioPool.Put(b.bucket.Src())
}
b.closed.Store(true)
if b.rtpStats != nil {
@@ -457,14 +457,14 @@ func (b *Buffer) SetPLIThrottle(duration int64) {
func (b *Buffer) SendPLI(force bool) {
b.RLock()
if (b.rtpStats == nil || b.rtpStats.TimeSinceLastPli() < b.pliThrottle) && !force {
b.RUnlock()
rtpStats := b.rtpStats
pliThrottle := b.pliThrottle
b.RUnlock()
if (rtpStats == nil && !force) || !rtpStats.CheckAndUpdatePli(pliThrottle, force) {
return
}
b.rtpStats.UpdatePliAndTime(1)
b.RUnlock()
b.logger.Debugw("send pli", "ssrc", b.mediaSSRC, "force", force)
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC: b.mediaSSRC, MediaSSRC: b.mediaSSRC},
@@ -785,6 +785,30 @@ func (b *Buffer) doReports(arrivalTime time.Time) {
if pkts != nil && b.onRtcpFeedback != nil {
b.onRtcpFeedback(pkts)
}
b.mayGrowBucket()
}
func (b *Buffer) mayGrowBucket() {
cap := b.bucket.Capacity()
maxPkts := b.maxVideoPkts
if b.codecType == webrtc.RTPCodecTypeAudio {
maxPkts = b.maxAudioPkts
}
if cap >= maxPkts {
return
}
oldCap := cap
deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId)
if deltaInfo != nil && deltaInfo.Duration > 500*time.Millisecond {
pps := int(time.Duration(deltaInfo.Packets) * time.Second / deltaInfo.Duration)
for pps > cap && cap < maxPkts {
cap = b.bucket.Grow()
}
if cap > oldCap {
b.logger.Debugw("grow bucket", "from", oldCap, "to", cap, "pps", pps)
}
}
}
func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {
@@ -825,7 +849,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
}
}
func (b *Buffer) GetSenderReportData() (*RTCPSenderReportData, *RTCPSenderReportData) {
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
b.RLock()
defer b.RUnlock()
@@ -833,7 +857,7 @@ func (b *Buffer) GetSenderReportData() (*RTCPSenderReportData, *RTCPSenderReport
return b.rtpStats.GetRtcpSenderReportData()
}
return nil, nil
return nil
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {