From 2959eebca80296f6afcbc7245538b2e401ed9efc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 25 Feb 2022 11:56:26 +0530 Subject: [PATCH] Introducing OpsQueue (#463) * Introducing OpsQueue Creating a PR to get feedback on standardizing on this concept. Can be used for callbacks. Already a couple of places use this construct. Wondering if we should standardize on this across the board. Just changing one place to use the new struct. Another place that I know of which uses this pattern is the telemetry package. * atomic flag -> bool --- pkg/service/wire_gen.go | 3 ++- pkg/sfu/buffer/buffer.go | 46 ++++++++++++++++------------------ pkg/utils/opsqueue.go | 54 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 26 deletions(-) create mode 100644 pkg/utils/opsqueue.go diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 0d53cc8f6..0b85c353e 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 45c6f1c95..665c21363 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/logger" "github.com/pion/rtcp" "github.com/pion/rtp" @@ -89,7 +90,8 @@ type Buffer struct { onAudioLevel func(level uint8, durationMs uint32) feedbackCB func([]rtcp.Packet) feedbackTWCC func(sn uint16, timeNS int64, marker bool) - callbackOps chan func() + + callbacksQueue *utils.OpsQueue // logger logger logger.Logger @@ -109,12 +111,12 @@ type Options struct { // NewBuffer constructs a new Buffer func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { b := &Buffer{ - mediaSSRC: ssrc, - videoPool: vp, - audioPool: ap, - pliThrottle: int64(500 * time.Millisecond), - logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger - callbackOps: make(chan func(), 50), + mediaSSRC: ssrc, + videoPool: vp, + audioPool: ap, + pliThrottle: int64(500 * time.Millisecond), + logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger + callbacksQueue: utils.NewOpsQueue(), } b.bitrate.Store(make([]int64, len(b.bitrateHelper))) b.extPackets.SetMinCapacity(7) @@ -132,7 +134,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili return } - go b.doCallbacks() + b.callbacksQueue.Start() b.clockRate = codec.ClockRate b.maxBitrate = int64(o.MaxBitRate) @@ -190,12 +192,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.logger.Debugw("NewBuffer", "MaxBitRate", o.MaxBitRate) } -func (b *Buffer) doCallbacks() { - for op := range b.callbackOps { - op() - } -} - // Write adds an RTP Packet, out of order, new packet may be arrived later func (b *Buffer) Write(pkt []byte) (n int, err error) { b.Lock() @@ -273,7 +269,7 @@ func (b *Buffer) Close() error { } b.closed.set(true) b.onClose() - close(b.callbackOps) + b.callbacksQueue.Stop() }) return nil } @@ -307,9 +303,9 @@ func (b *Buffer) SendPLI() { &rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: b.mediaSSRC}, } - b.callbackOps <- func() { + b.callbacksQueue.Enqueue(func() { b.feedbackCB(pli) - } + }) } func (b *Buffer) SetRTT(rtt uint32) { @@ -457,9 +453,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { if ext := p.GetExtension(b.twccExt); len(ext) > 1 { sn := binary.BigEndian.Uint16(ext[0:2]) marker := p.Marker - b.callbackOps <- func() { + b.callbacksQueue.Enqueue(func() { b.feedbackTWCC(sn, arrivalTime, marker) - } + }) } } @@ -474,9 +470,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) { duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate) if duration > 0 { - b.callbackOps <- func() { + b.callbacksQueue.Enqueue(func() { b.onAudioLevel(ext.Level, uint32(duration)) - } + }) } b.latestTSForAudioLevel = p.Timestamp @@ -523,9 +519,9 @@ func (b *Buffer) doNACKs() { } if r, numSeqNumsNacked := b.buildNACKPacket(); r != nil { - b.callbackOps <- func() { + b.callbacksQueue.Enqueue(func() { b.feedbackCB(r) - } + }) b.stats.TotalNACKs += uint32(numSeqNumsNacked) } } @@ -557,9 +553,9 @@ func (b *Buffer) doReports(arrivalTime int64) { // RTCP reports pkts := b.getRTCP() - b.callbackOps <- func() { + b.callbacksQueue.Enqueue(func() { b.feedbackCB(pkts) - } + }) } func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) { diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go new file mode 100644 index 000000000..a09c91528 --- /dev/null +++ b/pkg/utils/opsqueue.go @@ -0,0 +1,54 @@ +package utils + +import ( + "sync" +) + +const ( + MaxOps = 50 +) + +type OpsQueue struct { + lock sync.RWMutex + ops chan func() + isStopped bool +} + +func NewOpsQueue() *OpsQueue { + return &OpsQueue{ + ops: make(chan func(), MaxOps), + } +} + +func (oq *OpsQueue) Start() { + go oq.process() +} + +func (oq *OpsQueue) Stop() { + oq.lock.Lock() + if oq.isStopped { + oq.lock.Unlock() + return + } + + oq.isStopped = true + close(oq.ops) + oq.lock.Unlock() +} + +func (oq *OpsQueue) Enqueue(op func()) { + oq.lock.RLock() + if oq.isStopped { + oq.lock.RUnlock() + return + } + + oq.ops <- op + oq.lock.RUnlock() +} + +func (oq *OpsQueue) process() { + for op := range oq.ops { + op() + } +}