use departure timeout (#2549)

* use departure timeout

* deps

* default

* remove constants

* deps

* protoproxy cache

* add sample
This commit is contained in:
Paul Wells
2024-03-05 09:05:42 -08:00
committed by GitHub
parent 0618cb39df
commit d87f8aa299
9 changed files with 35 additions and 22 deletions
+3 -1
View File
@@ -170,8 +170,10 @@ keys:
# room:
# # allow rooms to be automatically created when participants join, defaults to true
# # auto_create: false
# # number of seconds to leave a room open when it's empty
# # number of seconds to keep the room open if no one joins
# empty_timeout: 300
# # number of seconds to keep the room open after everyone leaves
# departure_timeout: 20
# # limit number of participants that can be in a room, 0 for no limit
# max_participants: 0
# # only accept specific codecs for clients publishing to this room
+3 -3
View File
@@ -19,8 +19,8 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8
github.com/livekit/protocol v1.10.1
github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3
github.com/livekit/protocol v1.10.2-0.20240305164205-dde5199e5268
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1
@@ -97,7 +97,7 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
+6 -6
View File
@@ -130,10 +130,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.10.1 h1:upe6pKRqH8wpsMuR2OLtgizEm94iia3pDYm3O4/2PRY=
github.com/livekit/protocol v1.10.1/go.mod h1:eWPz45pnxwpCwB84qqhHxG0bCRgasa2itN6GAHCDddc=
github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3 h1:bvjzDR+Rvdf3JgzQMtLiGVHBQ8KoOWM7x7sHj79jevQ=
github.com/livekit/psrpc v0.5.3-0.20240227154351-b7f99eaaf7b3/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/protocol v1.10.2-0.20240305164205-dde5199e5268 h1:cEox8Kzv0Ib+je6OtaKsx3DBlpZF45xZnkev2L4sQv4=
github.com/livekit/protocol v1.10.2-0.20240305164205-dde5199e5268/go.mod h1:XpJ2t2wFnnQghPpkxXAzMZhYMDnm8wWxdxYJK4fP9gM=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
@@ -312,8 +312,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+3 -1
View File
@@ -232,6 +232,7 @@ type RoomConfig struct {
EnabledCodecs []CodecSpec `yaml:"enabled_codecs,omitempty"`
MaxParticipants uint32 `yaml:"max_participants,omitempty"`
EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"`
DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"`
EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"`
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"`
@@ -482,7 +483,8 @@ var DefaultConfig = Config{
{Mime: webrtc.MimeTypeVP9},
{Mime: webrtc.MimeTypeAV1},
},
EmptyTimeout: 5 * 60,
EmptyTimeout: 5 * 60,
DepartureTimeout: 20,
},
Logging: LoggingConfig{
PionLevel: "error",
+9 -7
View File
@@ -47,8 +47,7 @@ import (
)
const (
DefaultEmptyTimeout = 5 * 60 // 5m
AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal
AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal
invAudioLevelQuantization = 1.0 / AudioLevelQuantization
subscriberUpdateInterval = 3 * time.Second
@@ -59,8 +58,7 @@ const (
var (
// var to allow unit test override
RoomDepartureGrace uint32 = 20
roomUpdateInterval = 5 * time.Second // frequency to update room participant counts
roomUpdateInterval = 5 * time.Second // frequency to update room participant counts
)
type broadcastOptions struct {
@@ -139,6 +137,7 @@ func NewRoom(
room *livekit.Room,
internal *livekit.RoomInternal,
config WebRTCConfig,
roomConfig config.RoomConfig,
audioConfig *config.AudioConfig,
serverInfo *livekit.ServerInfo,
telemetry telemetry.TelemetryService,
@@ -172,13 +171,16 @@ func NewRoom(
disconnectSignalOnResumeNoMessagesParticipants: make(map[livekit.ParticipantIdentity]*disconnectSignalOnResumeNoMessages),
}
r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto)
if r.protoRoom.EmptyTimeout == 0 {
r.protoRoom.EmptyTimeout = DefaultEmptyTimeout
r.protoRoom.EmptyTimeout = roomConfig.EmptyTimeout
}
if r.protoRoom.DepartureTimeout == 0 {
r.protoRoom.DepartureTimeout = roomConfig.DepartureTimeout
}
if r.protoRoom.CreationTime == 0 {
r.protoRoom.CreationTime = time.Now().Unix()
}
r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto)
if agentClient != nil {
go func() {
@@ -774,7 +776,7 @@ func (r *Room) CloseIfEmpty() {
if r.FirstJoinedAt() > 0 && r.LastLeftAt() > 0 {
elapsed = time.Now().Unix() - r.LastLeftAt()
// need to give time in case participant is reconnecting
timeout = RoomDepartureGrace
timeout = r.protoRoom.DepartureTimeout
} else {
elapsed = time.Now().Unix() - r.protoRoom.CreationTime
timeout = r.protoRoom.EmptyTimeout
+5 -3
View File
@@ -48,8 +48,6 @@ const (
func init() {
config.InitLoggerFromConfig(&config.DefaultConfig.Logging)
// allow immediate closure in testing
RoomDepartureGrace = 1
roomUpdateInterval = defaultDelay
}
@@ -377,7 +375,7 @@ func TestRoomClosure(t *testing.T) {
rm.lock.Unlock()
rm.RemoveParticipant(p.Identity(), p.ID(), types.ParticipantCloseReasonClientRequestLeave)
time.Sleep(time.Duration(RoomDepartureGrace)*time.Second + defaultDelay)
time.Sleep(time.Duration(rm.ToProto().DepartureTimeout)*time.Second + defaultDelay)
rm.CloseIfEmpty()
require.Len(t, rm.GetParticipants(), 0)
@@ -737,6 +735,10 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
&livekit.Room{Name: "room"},
nil,
WebRTCConfig{},
config.RoomConfig{
EmptyTimeout: 5 * 60,
DepartureTimeout: 1,
},
&config.AudioConfig{
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
+4
View File
@@ -79,6 +79,9 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if req.EmptyTimeout > 0 {
rm.EmptyTimeout = req.EmptyTimeout
}
if req.DepartureTimeout > 0 {
rm.DepartureTimeout = req.DepartureTimeout
}
if req.MaxParticipants > 0 {
rm.MaxParticipants = req.MaxParticipants
}
@@ -157,6 +160,7 @@ func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName
func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, conf *config.RoomConfig) {
room.EmptyTimeout = conf.EmptyTimeout
room.DepartureTimeout = conf.DepartureTimeout
room.MaxParticipants = conf.MaxParticipants
for _, codec := range conf.EnabledCodecs {
room.EnabledCodecs = append(room.EnabledCodecs, &livekit.Codec{
+1
View File
@@ -42,6 +42,7 @@ func TestCreateRoom(t *testing.T) {
room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
require.NoError(t, err)
require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout)
require.Equal(t, conf.Room.DepartureTimeout, room.DepartureTimeout)
require.NotEmpty(t, room.EnabledCodecs)
})
+1 -1
View File
@@ -545,7 +545,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// construct ice servers
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.egressLauncher)
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.egressLauncher)
roomTopic := rpc.FormatRoomTopic(roomName)
roomServer := must.Get(rpc.NewTypedRoomServer(r, r.bus))