Files
livekit/pkg/sfu/buffer/factory.go
T
Raja Subramanian 6895eff496 Buffer size config for video and audio. (#2498)
* Buffer size config for video and audio.

There was only one buffer size in config.
In upstream, config value was used for video.
Audio used a hard coded value of 200 packets.

But, in the down stream sequencer, the config value was used for both
video and audio. So, if video was set up for high bit rate (deep
buffers), audio sequencer ended up using a lot of memory too in
sequencer.

Split config to be able to control that and also not hard code audio.

Another optimisation here would be to not instantiate sequencer unkess
NACK is negotiated.

* deprecate packet_buffer_size
2024-02-21 22:58:56 +05:30

144 lines
3.4 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"io"
"sync"
"github.com/pion/transport/v2/packetio"
"github.com/livekit/mediatransportutil/pkg/bucket"
)
type FactoryOfBufferFactory struct {
videoPool *sync.Pool
audioPool *sync.Pool
}
func NewFactoryOfBufferFactory(trackingPacketsVideo int, trackingPacketsAudio int) *FactoryOfBufferFactory {
return &FactoryOfBufferFactory{
videoPool: &sync.Pool{
New: func() interface{} {
b := make([]byte, trackingPacketsVideo*bucket.MaxPktSize)
return &b
},
},
audioPool: &sync.Pool{
New: func() interface{} {
b := make([]byte, trackingPacketsAudio*bucket.MaxPktSize)
return &b
},
},
}
}
func (f *FactoryOfBufferFactory) CreateBufferFactory() *Factory {
return &Factory{
videoPool: f.videoPool,
audioPool: f.audioPool,
rtpBuffers: make(map[uint32]*Buffer),
rtcpReaders: make(map[uint32]*RTCPReader),
rtxPair: make(map[uint32]uint32),
}
}
type Factory struct {
sync.RWMutex
videoPool *sync.Pool
audioPool *sync.Pool
rtpBuffers map[uint32]*Buffer
rtcpReaders map[uint32]*RTCPReader
rtxPair map[uint32]uint32 // repair -> base
}
func (f *Factory) GetOrNew(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
f.Lock()
defer f.Unlock()
switch packetType {
case packetio.RTCPBufferPacket:
if reader, ok := f.rtcpReaders[ssrc]; ok {
return reader
}
reader := NewRTCPReader(ssrc)
f.rtcpReaders[ssrc] = reader
reader.OnClose(func() {
f.Lock()
delete(f.rtcpReaders, ssrc)
f.Unlock()
})
return reader
case packetio.RTPBufferPacket:
if reader, ok := f.rtpBuffers[ssrc]; ok {
return reader
}
buffer := NewBuffer(ssrc, f.videoPool, f.audioPool)
f.rtpBuffers[ssrc] = buffer
for repair, base := range f.rtxPair {
if repair == ssrc {
baseBuffer, ok := f.rtpBuffers[base]
if ok {
buffer.SetPrimaryBufferForRTX(baseBuffer)
}
break
} else if base == ssrc {
repairBuffer, ok := f.rtpBuffers[repair]
if ok {
repairBuffer.SetPrimaryBufferForRTX(buffer)
}
break
}
}
buffer.OnClose(func() {
f.Lock()
delete(f.rtpBuffers, ssrc)
delete(f.rtxPair, ssrc)
f.Unlock()
})
return buffer
}
return nil
}
func (f *Factory) GetBufferPair(ssrc uint32) (*Buffer, *RTCPReader) {
f.RLock()
defer f.RUnlock()
return f.rtpBuffers[ssrc], f.rtcpReaders[ssrc]
}
func (f *Factory) GetBuffer(ssrc uint32) *Buffer {
f.RLock()
defer f.RUnlock()
return f.rtpBuffers[ssrc]
}
func (f *Factory) GetRTCPReader(ssrc uint32) *RTCPReader {
f.RLock()
defer f.RUnlock()
return f.rtcpReaders[ssrc]
}
func (f *Factory) SetRTXPair(repair, base uint32) {
f.Lock()
repairBuffer, baseBuffer := f.rtpBuffers[repair], f.rtpBuffers[base]
if repairBuffer == nil || baseBuffer == nil {
f.rtxPair[repair] = base
}
f.Unlock()
if repairBuffer != nil && baseBuffer != nil {
repairBuffer.SetPrimaryBufferForRTX(baseBuffer)
}
}