Introducing OpsQueue (#463)

* Introducing OpsQueue

Creating a PR to get feedback on standardizing on this concept.

Can be used for callbacks.
Already a couple of places use this construct. Wondering if we
should standardize on this across the board.
Just changing one place to use the new struct. Another place
that I know of which uses this pattern is the telemetry package.

* atomic flag -> bool
This commit is contained in:
Raja Subramanian
2022-02-25 11:56:26 +05:30
committed by GitHub
parent 6d88154402
commit 2959eebca8
3 changed files with 77 additions and 26 deletions

View File

@@ -1,7 +1,8 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//+build !wireinject
//go:build !wireinject
// +build !wireinject
package service

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
"github.com/pion/rtp"
@@ -89,7 +90,8 @@ type Buffer struct {
onAudioLevel func(level uint8, durationMs uint32)
feedbackCB func([]rtcp.Packet)
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
callbackOps chan func()
callbacksQueue *utils.OpsQueue
// logger
logger logger.Logger
@@ -109,12 +111,12 @@ type Options struct {
// NewBuffer constructs a new Buffer
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
b := &Buffer{
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
pliThrottle: int64(500 * time.Millisecond),
logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger
callbackOps: make(chan func(), 50),
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
pliThrottle: int64(500 * time.Millisecond),
logger: logger.Logger(logger.GetLogger()), // will be reset with correct context via SetLogger
callbacksQueue: utils.NewOpsQueue(),
}
b.bitrate.Store(make([]int64, len(b.bitrateHelper)))
b.extPackets.SetMinCapacity(7)
@@ -132,7 +134,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
return
}
go b.doCallbacks()
b.callbacksQueue.Start()
b.clockRate = codec.ClockRate
b.maxBitrate = int64(o.MaxBitRate)
@@ -190,12 +192,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
b.logger.Debugw("NewBuffer", "MaxBitRate", o.MaxBitRate)
}
func (b *Buffer) doCallbacks() {
for op := range b.callbackOps {
op()
}
}
// Write adds an RTP Packet, out of order, new packet may be arrived later
func (b *Buffer) Write(pkt []byte) (n int, err error) {
b.Lock()
@@ -273,7 +269,7 @@ func (b *Buffer) Close() error {
}
b.closed.set(true)
b.onClose()
close(b.callbackOps)
b.callbacksQueue.Stop()
})
return nil
}
@@ -307,9 +303,9 @@ func (b *Buffer) SendPLI() {
&rtcp.PictureLossIndication{SenderSSRC: rand.Uint32(), MediaSSRC: b.mediaSSRC},
}
b.callbackOps <- func() {
b.callbacksQueue.Enqueue(func() {
b.feedbackCB(pli)
}
})
}
func (b *Buffer) SetRTT(rtt uint32) {
@@ -457,9 +453,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
if ext := p.GetExtension(b.twccExt); len(ext) > 1 {
sn := binary.BigEndian.Uint16(ext[0:2])
marker := p.Marker
b.callbackOps <- func() {
b.callbacksQueue.Enqueue(func() {
b.feedbackTWCC(sn, arrivalTime, marker)
}
})
}
}
@@ -474,9 +470,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
if (p.Timestamp - b.latestTSForAudioLevel) < (1 << 31) {
duration := (int64(p.Timestamp) - int64(b.latestTSForAudioLevel)) * 1e3 / int64(b.clockRate)
if duration > 0 {
b.callbackOps <- func() {
b.callbacksQueue.Enqueue(func() {
b.onAudioLevel(ext.Level, uint32(duration))
}
})
}
b.latestTSForAudioLevel = p.Timestamp
@@ -523,9 +519,9 @@ func (b *Buffer) doNACKs() {
}
if r, numSeqNumsNacked := b.buildNACKPacket(); r != nil {
b.callbackOps <- func() {
b.callbacksQueue.Enqueue(func() {
b.feedbackCB(r)
}
})
b.stats.TotalNACKs += uint32(numSeqNumsNacked)
}
}
@@ -557,9 +553,9 @@ func (b *Buffer) doReports(arrivalTime int64) {
// RTCP reports
pkts := b.getRTCP()
b.callbackOps <- func() {
b.callbacksQueue.Enqueue(func() {
b.feedbackCB(pkts)
}
})
}
func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {

54
pkg/utils/opsqueue.go Normal file
View File

@@ -0,0 +1,54 @@
package utils
import (
"sync"
)
const (
MaxOps = 50
)
type OpsQueue struct {
lock sync.RWMutex
ops chan func()
isStopped bool
}
func NewOpsQueue() *OpsQueue {
return &OpsQueue{
ops: make(chan func(), MaxOps),
}
}
func (oq *OpsQueue) Start() {
go oq.process()
}
func (oq *OpsQueue) Stop() {
oq.lock.Lock()
if oq.isStopped {
oq.lock.Unlock()
return
}
oq.isStopped = true
close(oq.ops)
oq.lock.Unlock()
}
func (oq *OpsQueue) Enqueue(op func()) {
oq.lock.RLock()
if oq.isStopped {
oq.lock.RUnlock()
return
}
oq.ops <- op
oq.lock.RUnlock()
}
func (oq *OpsQueue) process() {
for op := range oq.ops {
op()
}
}