chunk room updates (#3880)

* chunk room updates

* move to config

* typo

* default
This commit is contained in:
Paul Wells
2025-08-26 09:16:00 -07:00
committed by GitHub
parent b4e146c5cb
commit 8d270e2a0f
4 changed files with 73 additions and 14 deletions
+8 -5
View File
@@ -174,6 +174,8 @@ type RoomConfig struct {
CreateRoomEnabled bool `yaml:"create_room_enabled,omitempty"`
CreateRoomTimeout time.Duration `yaml:"create_room_timeout,omitempty"`
CreateRoomAttempts int `yaml:"create_room_attempts,omitempty"`
// target room participant update batch chunk size in bytes
UpdateBatchTargetSize int `yaml:"update_batch_target_size,omitempty"`
// deprecated, moved to limits
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
// deprecated, moved to limits
@@ -366,11 +368,12 @@ var DefaultConfig = Config{
{Mime: mime.MimeTypeRTX.String()},
{Mime: mime.MimeTypeH265.String()},
},
EmptyTimeout: 5 * 60,
DepartureTimeout: 20,
CreateRoomEnabled: true,
CreateRoomTimeout: 10 * time.Second,
CreateRoomAttempts: 3,
EmptyTimeout: 5 * 60,
DepartureTimeout: 20,
CreateRoomEnabled: true,
CreateRoomTimeout: 10 * time.Second,
CreateRoomAttempts: 3,
UpdateBatchTargetSize: 128 * 1024,
},
Limit: LimitConfig{
MaxMetadataSize: 64000,
+15 -9
View File
@@ -106,6 +106,7 @@ type Room struct {
logger logger.Logger
config WebRTCConfig
roomConfig config.RoomConfig
audioConfig *sfu.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
@@ -253,6 +254,7 @@ func NewRoom(
livekit.RoomID(room.Sid),
),
config: config,
roomConfig: roomConfig,
audioConfig: audioConfig,
telemetry: telemetry,
egressLauncher: egressLauncher,
@@ -1431,7 +1433,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
r.batchedUpdatesMu.Unlock()
if len(updates) != 0 {
selfSent = true
SendParticipantUpdates(updates, r.GetParticipants())
SendParticipantUpdates(updates, r.GetParticipants(), r.roomConfig.UpdateBatchTargetSize)
}
}
@@ -1488,7 +1490,7 @@ func (r *Room) changeUpdateWorker() {
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate)
r.batchedUpdatesMu.Unlock()
SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants())
SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants(), r.roomConfig.UpdateBatchTargetSize)
case <-cleanDataMessageTicker.C:
r.dataMessageCache.Prune()
@@ -1968,7 +1970,7 @@ func PushAndDequeueUpdates(
return updates
}
func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant) {
func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant, batchTargetSize int) {
if len(updates) == 0 {
return
}
@@ -1992,15 +1994,19 @@ func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.L
fullUpdates = append(fullUpdates, update.ParticipantInfo)
}
filteredUpdateChunks := ChunkProtoBatch(filteredUpdates, batchTargetSize)
fullUpdateChunks := ChunkProtoBatch(fullUpdates, batchTargetSize)
for _, op := range participants {
var err error
updateChunks := fullUpdateChunks
if op.ProtocolVersion().SupportsIdentityBasedReconnection() {
err = op.SendParticipantUpdate(filteredUpdates)
} else {
err = op.SendParticipantUpdate(fullUpdates)
updateChunks = filteredUpdateChunks
}
if err != nil {
op.GetLogger().Errorw("could not send update to participant", err)
for _, chunk := range updateChunks {
if err := op.SendParticipantUpdate(chunk); err != nil {
op.GetLogger().Errorw("could not send update to participant", err)
break
}
}
}
}
+21
View File
@@ -21,6 +21,7 @@ import (
"strings"
"github.com/pion/webrtc/v4"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/protocol/livekit"
@@ -171,3 +172,23 @@ func MaybeTruncateIP(addr string) string {
return addr[:len(addr)-3] + "..."
}
func ChunkProtoBatch[T proto.Message](batch []T, target int) [][]T {
var chunks [][]T
var start, size int
for i, m := range batch {
if s := proto.Size(m); size+s > target {
if start < i {
chunks = append(chunks, batch[start:i])
}
start = i
size = s
} else {
size += s
}
}
if start < len(batch) {
chunks = append(chunks, batch[start:])
}
return chunks
}
+29
View File
@@ -15,11 +15,17 @@
package rtc
import (
"fmt"
"math/rand/v2"
"strings"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils/guid"
)
func TestPackStreamId(t *testing.T) {
@@ -43,3 +49,26 @@ func TestPackDataTrackLabel(t *testing.T) {
require.Equal(t, trackID, tr)
require.Equal(t, label, l)
}
func TestChunkProtoBatch(t *testing.T) {
rng := rand.New(rand.NewPCG(1, 2))
var updates []*livekit.ParticipantInfo
for range 32 {
updates = append(updates, &livekit.ParticipantInfo{
Sid: guid.New(guid.ParticipantPrefix),
Identity: uuid.NewString(),
Metadata: strings.Repeat("x", rng.IntN(128*1024)),
})
}
target := 64 * 1024
batches := ChunkProtoBatch(updates, target)
for _, b := range batches {
var sum int
for _, m := range b {
sum += proto.Size(m)
}
require.True(t, sum < target || len(b) == 1, "batch size exceeds target")
}
fmt.Println(len(batches))
}