From 00b2a216c78ee1c01e8c62c07a7e6b083fc41bde Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 8 Feb 2022 16:00:56 +0800 Subject: [PATCH] fix deadlock cause underlying buffer full (#413) * fix deadlock cause underlying buffer full --- pkg/sfu/buffer/buffer.go | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 1a116aa6f..63324fe87 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -86,6 +86,7 @@ type Buffer struct { onAudioLevel func(level uint8, durationMs uint32) feedbackCB func([]rtcp.Packet) feedbackTWCC func(sn uint16, timeNS int64, marker bool) + callbackOps chan func() // logger logger logger.Logger @@ -105,10 +106,11 @@ type Options struct { // NewBuffer constructs a new Buffer func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { b := &Buffer{ - mediaSSRC: ssrc, - videoPool: vp, - audioPool: ap, - logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger + mediaSSRC: ssrc, + videoPool: vp, + audioPool: ap, + logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger + callbackOps: make(chan func(), 50), } b.bitrate.Store(make([]int64, len(b.bitrateHelper))) b.extPackets.SetMinCapacity(7) @@ -122,6 +124,11 @@ func (b *Buffer) SetLogger(logger logger.Logger) { func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) { b.Lock() defer b.Unlock() + if b.bound { + return + } + + go b.doCallbacks() b.clockRate = codec.ClockRate b.maxBitrate = int64(o.MaxBitRate) @@ -179,6 +186,12 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.logger.Debugw("NewBuffer", "MaxBitRate", o.MaxBitRate) } +func (b *Buffer) doCallbacks() { + for op := range b.callbackOps { + op() + } +} + // Write adds an RTP Packet, out of order, new packet may be arrived later func (b *Buffer) Write(pkt []byte) (n int, err error) { b.Lock() @@ -256,6 +269,7 @@ func (b *Buffer) Close() error { } b.closed.set(true) b.onClose() + close(b.callbackOps) }) return nil } @@ -417,7 +431,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { // for bandwidth estimation if b.twcc { if ext := p.GetExtension(b.twccExt); len(ext) > 1 { - b.feedbackTWCC(binary.BigEndian.Uint16(ext[0:2]), arrivalTime, p.Marker) + sn := binary.BigEndian.Uint16(ext[0:2]) + marker := p.Marker + b.callbackOps <- func() { + b.feedbackTWCC(sn, arrivalTime, marker) + } } } @@ -432,7 +450,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) { duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate) if duration > 0 { - b.onAudioLevel(ext.Level, uint32(duration)) + b.callbackOps <- func() { + b.onAudioLevel(ext.Level, uint32(duration)) + } } b.latestTSForAudioLevel = p.Timestamp @@ -510,7 +530,10 @@ func (b *Buffer) doReports(arrivalTime int64) { b.bitrate.Store(bitrates) // RTCP reports - go b.feedbackCB(b.getRTCP()) + pkts := b.getRTCP() + b.callbackOps <- func() { + b.feedbackCB(pkts) + } } func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {