Store buffer after creating it. (#4186)

* Refactor receiver and buffer into Base and higher layer.

To be able to share code/functionality with relay.

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* clean up

* deps

* fix test

* fix test

* Store buffer after creating it.

Also changing signature of creator function as it could call TrackInfo()
and get into a deadlock.

* fix double unlock

* add some more debug logging
This commit is contained in:
Raja Subramanian
2025-12-24 02:55:51 +05:30
committed by GitHub
parent 7c8ea11505
commit e71184dea0
3 changed files with 18 additions and 7 deletions

View File

@@ -311,6 +311,11 @@ func (b *Buffer) getOnClose() func() {
func (b *Buffer) sendPLI() {
ssrc := b.BufferBase.SSRC()
if ssrc == 0 {
return
}
b.logger.Debugw("send pli", "mediaSSRC", ssrc)
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{
SenderSSRC: ssrc,

View File

@@ -1411,7 +1411,7 @@ func (b *BufferBase) seedKeyFrame(keyFrameSeederGeneration int32) {
//
// send gratuitous PLIs for some time or until a key frame is seen to
// get the engine rolling
b.logger.Debugw("starting key frame seeder")
b.logger.Debugw("starting key frame seeder", "generattion", keyFrameSeederGeneration)
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
@@ -1423,20 +1423,24 @@ func (b *BufferBase) seedKeyFrame(keyFrameSeederGeneration int32) {
rtpStats := b.rtpStats
b.RUnlock()
if rtpStats == nil {
b.logger.Debugw("cannot do key frame seeding without stats")
b.logger.Debugw("cannot do key frame seeding without stats", "generation", keyFrameSeederGeneration)
return
}
initialCount, _ = rtpStats.KeyFrame()
for {
if b.isClosed.Load() || b.keyFrameSeederGeneration.Load() != keyFrameSeederGeneration {
b.logger.Debugw("stopping key frame seeder: stopped")
b.logger.Debugw(
"stopping key frame seeder: stopped",
"generation", keyFrameSeederGeneration,
"currentGeneration", b.keyFrameSeederGeneration.Load(),
)
return
}
select {
case <-timer.C:
b.logger.Debugw("stopping key frame seeder: timeout")
b.logger.Debugw("stopping key frame seeder: timeout", "generation", keyFrameSeederGeneration)
return
case <-ticker.C:
@@ -1444,6 +1448,7 @@ func (b *BufferBase) seedKeyFrame(keyFrameSeederGeneration int32) {
if cnt > initialCount {
b.logger.Debugw(
"stopping key frame seeder: received key frame",
"generation", keyFrameSeederGeneration,
"keyFrameCountInitial", initialCount,
"keyFrameCount", cnt,
"lastKeyFrame", last,

View File

@@ -603,7 +603,7 @@ func (r *ReceiverBase) getBufferLocked(layer int32) buffer.BufferProvider {
func (r *ReceiverBase) GetOrCreateBuffer(
layer int32,
creatorFn func() (buffer.BufferProvider, error),
creatorFn func(*livekit.TrackInfo) (buffer.BufferProvider, error),
) (buffer.BufferProvider, bool) {
r.bufferMu.Lock()
@@ -617,13 +617,14 @@ func (r *ReceiverBase) GetOrCreateBuffer(
return buff, false
}
buff, err := creatorFn()
buff, err := creatorFn(r.trackInfo)
if err != nil {
r.bufferMu.Unlock()
r.params.Logger.Errorw("could not create buffer", err)
return nil, false
}
r.buffers[layer] = buff
rtt := r.rtt
r.bufferMu.Unlock()
@@ -1017,7 +1018,7 @@ func (r *ReceiverBase) SetCodecState(state ReceiverCodecState) {
func (r *ReceiverBase) SetCodecWithState(codec webrtc.RTPCodecParameters, headerExtensions []webrtc.RTPHeaderExtensionParameter, codecState ReceiverCodecState) {
r.checkCodecChanged(codec, headerExtensions)
r.codecStateLock.Unlock()
r.codecStateLock.Lock()
if codecState == r.codecState {
r.codecStateLock.Unlock()
return