From 8d270e2a0f6b4381b80561beccebbe2a7f5c5c22 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 26 Aug 2025 09:16:00 -0700 Subject: [PATCH] chunk room updates (#3880) * chunk room updates * move to config * typo * default --- pkg/config/config.go | 13 ++++++++----- pkg/rtc/room.go | 24 +++++++++++++++--------- pkg/rtc/utils.go | 21 +++++++++++++++++++++ pkg/rtc/utils_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 14 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 17683dbf1..9c0576072 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -174,6 +174,8 @@ type RoomConfig struct { CreateRoomEnabled bool `yaml:"create_room_enabled,omitempty"` CreateRoomTimeout time.Duration `yaml:"create_room_timeout,omitempty"` CreateRoomAttempts int `yaml:"create_room_attempts,omitempty"` + // target room participant update batch chunk size in bytes + UpdateBatchTargetSize int `yaml:"update_batch_target_size,omitempty"` // deprecated, moved to limits MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"` // deprecated, moved to limits @@ -366,11 +368,12 @@ var DefaultConfig = Config{ {Mime: mime.MimeTypeRTX.String()}, {Mime: mime.MimeTypeH265.String()}, }, - EmptyTimeout: 5 * 60, - DepartureTimeout: 20, - CreateRoomEnabled: true, - CreateRoomTimeout: 10 * time.Second, - CreateRoomAttempts: 3, + EmptyTimeout: 5 * 60, + DepartureTimeout: 20, + CreateRoomEnabled: true, + CreateRoomTimeout: 10 * time.Second, + CreateRoomAttempts: 3, + UpdateBatchTargetSize: 128 * 1024, }, Limit: LimitConfig{ MaxMetadataSize: 64000, diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index b41d91e6c..ae7250953 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -106,6 +106,7 @@ type Room struct { logger logger.Logger config WebRTCConfig + roomConfig config.RoomConfig audioConfig *sfu.AudioConfig serverInfo *livekit.ServerInfo telemetry telemetry.TelemetryService @@ -253,6 +254,7 @@ func NewRoom( livekit.RoomID(room.Sid), ), config: config, + roomConfig: roomConfig, audioConfig: audioConfig, telemetry: telemetry, egressLauncher: egressLauncher, @@ -1431,7 +1433,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas r.batchedUpdatesMu.Unlock() if len(updates) != 0 { selfSent = true - SendParticipantUpdates(updates, r.GetParticipants()) + SendParticipantUpdates(updates, r.GetParticipants(), r.roomConfig.UpdateBatchTargetSize) } } @@ -1488,7 +1490,7 @@ func (r *Room) changeUpdateWorker() { r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate) r.batchedUpdatesMu.Unlock() - SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants()) + SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants(), r.roomConfig.UpdateBatchTargetSize) case <-cleanDataMessageTicker.C: r.dataMessageCache.Prune() @@ -1968,7 +1970,7 @@ func PushAndDequeueUpdates( return updates } -func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant) { +func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant, batchTargetSize int) { if len(updates) == 0 { return } @@ -1992,15 +1994,19 @@ func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.L fullUpdates = append(fullUpdates, update.ParticipantInfo) } + filteredUpdateChunks := ChunkProtoBatch(filteredUpdates, batchTargetSize) + fullUpdateChunks := ChunkProtoBatch(fullUpdates, batchTargetSize) + for _, op := range participants { - var err error + updateChunks := fullUpdateChunks if op.ProtocolVersion().SupportsIdentityBasedReconnection() { - err = op.SendParticipantUpdate(filteredUpdates) - } else { - err = op.SendParticipantUpdate(fullUpdates) + updateChunks = filteredUpdateChunks } - if err != nil { - op.GetLogger().Errorw("could not send update to participant", err) + for _, chunk := range updateChunks { + if err := op.SendParticipantUpdate(chunk); err != nil { + op.GetLogger().Errorw("could not send update to participant", err) + break + } } } } diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 9d77153f2..1056748e7 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/pion/webrtc/v4" + "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/livekit/protocol/livekit" @@ -171,3 +172,23 @@ func MaybeTruncateIP(addr string) string { return addr[:len(addr)-3] + "..." } + +func ChunkProtoBatch[T proto.Message](batch []T, target int) [][]T { + var chunks [][]T + var start, size int + for i, m := range batch { + if s := proto.Size(m); size+s > target { + if start < i { + chunks = append(chunks, batch[start:i]) + } + start = i + size = s + } else { + size += s + } + } + if start < len(batch) { + chunks = append(chunks, batch[start:]) + } + return chunks +} diff --git a/pkg/rtc/utils_test.go b/pkg/rtc/utils_test.go index 813f910d8..55c1d217d 100644 --- a/pkg/rtc/utils_test.go +++ b/pkg/rtc/utils_test.go @@ -15,11 +15,17 @@ package rtc import ( + "fmt" + "math/rand/v2" + "strings" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils/guid" ) func TestPackStreamId(t *testing.T) { @@ -43,3 +49,26 @@ func TestPackDataTrackLabel(t *testing.T) { require.Equal(t, trackID, tr) require.Equal(t, label, l) } + +func TestChunkProtoBatch(t *testing.T) { + rng := rand.New(rand.NewPCG(1, 2)) + var updates []*livekit.ParticipantInfo + for range 32 { + updates = append(updates, &livekit.ParticipantInfo{ + Sid: guid.New(guid.ParticipantPrefix), + Identity: uuid.NewString(), + Metadata: strings.Repeat("x", rng.IntN(128*1024)), + }) + } + + target := 64 * 1024 + batches := ChunkProtoBatch(updates, target) + for _, b := range batches { + var sum int + for _, m := range b { + sum += proto.Size(m) + } + require.True(t, sum < target || len(b) == 1, "batch size exceeds target") + } + fmt.Println(len(batches)) +}