mirror of
https://github.com/livekit/livekit.git
synced 2026-05-25 20:45:13 +00:00
fix deadlock cause underlying buffer full (#413)
* fix deadlock cause underlying buffer full
This commit is contained in:
@@ -86,6 +86,7 @@ type Buffer struct {
|
||||
onAudioLevel func(level uint8, durationMs uint32)
|
||||
feedbackCB func([]rtcp.Packet)
|
||||
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
|
||||
callbackOps chan func()
|
||||
|
||||
// logger
|
||||
logger logger.Logger
|
||||
@@ -105,10 +106,11 @@ type Options struct {
|
||||
// NewBuffer constructs a new Buffer
|
||||
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
|
||||
b := &Buffer{
|
||||
mediaSSRC: ssrc,
|
||||
videoPool: vp,
|
||||
audioPool: ap,
|
||||
logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger
|
||||
mediaSSRC: ssrc,
|
||||
videoPool: vp,
|
||||
audioPool: ap,
|
||||
logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger
|
||||
callbackOps: make(chan func(), 50),
|
||||
}
|
||||
b.bitrate.Store(make([]int64, len(b.bitrateHelper)))
|
||||
b.extPackets.SetMinCapacity(7)
|
||||
@@ -122,6 +124,11 @@ func (b *Buffer) SetLogger(logger logger.Logger) {
|
||||
func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
if b.bound {
|
||||
return
|
||||
}
|
||||
|
||||
go b.doCallbacks()
|
||||
|
||||
b.clockRate = codec.ClockRate
|
||||
b.maxBitrate = int64(o.MaxBitRate)
|
||||
@@ -179,6 +186,12 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
|
||||
b.logger.Debugw("NewBuffer", "MaxBitRate", o.MaxBitRate)
|
||||
}
|
||||
|
||||
func (b *Buffer) doCallbacks() {
|
||||
for op := range b.callbackOps {
|
||||
op()
|
||||
}
|
||||
}
|
||||
|
||||
// Write adds an RTP Packet, out of order, new packet may be arrived later
|
||||
func (b *Buffer) Write(pkt []byte) (n int, err error) {
|
||||
b.Lock()
|
||||
@@ -256,6 +269,7 @@ func (b *Buffer) Close() error {
|
||||
}
|
||||
b.closed.set(true)
|
||||
b.onClose()
|
||||
close(b.callbackOps)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -417,7 +431,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
|
||||
// for bandwidth estimation
|
||||
if b.twcc {
|
||||
if ext := p.GetExtension(b.twccExt); len(ext) > 1 {
|
||||
b.feedbackTWCC(binary.BigEndian.Uint16(ext[0:2]), arrivalTime, p.Marker)
|
||||
sn := binary.BigEndian.Uint16(ext[0:2])
|
||||
marker := p.Marker
|
||||
b.callbackOps <- func() {
|
||||
b.feedbackTWCC(sn, arrivalTime, marker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -432,7 +450,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
|
||||
if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) {
|
||||
duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate)
|
||||
if duration > 0 {
|
||||
b.onAudioLevel(ext.Level, uint32(duration))
|
||||
b.callbackOps <- func() {
|
||||
b.onAudioLevel(ext.Level, uint32(duration))
|
||||
}
|
||||
}
|
||||
|
||||
b.latestTSForAudioLevel = p.Timestamp
|
||||
@@ -510,7 +530,10 @@ func (b *Buffer) doReports(arrivalTime int64) {
|
||||
b.bitrate.Store(bitrates)
|
||||
|
||||
// RTCP reports
|
||||
go b.feedbackCB(b.getRTCP())
|
||||
pkts := b.getRTCP()
|
||||
b.callbackOps <- func() {
|
||||
b.feedbackCB(pkts)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {
|
||||
|
||||
Reference in New Issue
Block a user