Add participant option for data track auto-subscribe. (#4240)

* Add participant option for data track auto-subscribe.

Default disabled.

* protocol update to use data track auto subscribe setting

* deps
This commit is contained in:
Raja Subramanian
2026-01-14 13:22:43 +05:30
committed by GitHub
parent 07572511ba
commit a35a6ae751
9 changed files with 83 additions and 45 deletions
+6 -6
View File
@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20260113174415-2e8ba344fca3
github.com/livekit/protocol v1.43.5-0.20260106150219-0dd6c6d4d408
github.com/livekit/protocol v1.43.5-0.20260114074149-a8bb8204ce69
github.com/livekit/psrpc v0.7.1
github.com/mackerelio/go-osstat v0.2.6
github.com/magefile/mage v1.15.0
@@ -55,8 +55,8 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.1
golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93
golang.org/x/mod v0.31.0
golang.org/x/exp v0.0.0-20260112195511-716be5621a96
golang.org/x/mod v0.32.0
golang.org/x/sync v0.19.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
@@ -151,9 +151,9 @@ require (
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/tools v0.40.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect
golang.org/x/tools v0.41.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260112192933-99fd39fd28a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9 // indirect
google.golang.org/grpc v1.78.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+12 -12
View File
@@ -177,8 +177,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20260113174415-2e8ba344fca3 h1:v1Xc/q/547TjLX7Nw5y2vXNnmV0XYFAbhTJrtErQeDA=
github.com/livekit/mediatransportutil v0.0.0-20260113174415-2e8ba344fca3/go.mod h1:QBx/KHV6Vv00ggibg/WrOlqrkTciEA2Hc9DGWYr3Q9U=
github.com/livekit/protocol v1.43.5-0.20260106150219-0dd6c6d4d408 h1:R1ATkXJ6STAaj1oz5CLtfnryNQ5X8mnKWP/MnO/KwGg=
github.com/livekit/protocol v1.43.5-0.20260106150219-0dd6c6d4d408/go.mod h1:BLJHYHErQTu3+fnmfGrzN6CbHxNYiooFIIYGYxXxotw=
github.com/livekit/protocol v1.43.5-0.20260114074149-a8bb8204ce69 h1:cD82r488SxGYL5MX1lLuLLjmdnNoC+u5TIepxQmSB40=
github.com/livekit/protocol v1.43.5-0.20260114074149-a8bb8204ce69/go.mod h1:BLJHYHErQTu3+fnmfGrzN6CbHxNYiooFIIYGYxXxotw=
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=
github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk=
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
@@ -383,14 +383,14 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93 h1:fQsdNF2N+/YewlRZiricy4P1iimyPKZ/xwniHj8Q2a0=
golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93/go.mod h1:EPRbTFwzwjXj9NpYyyrvenVh9Y+GFeEvMNh7Xuz7xgU=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI=
golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg=
golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c=
golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -476,18 +476,18 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA=
golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc=
golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc=
golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b h1:uA40e2M6fYRBf0+8uN5mLlqUtV192iiksiICIBkYJ1E=
google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:Xa7le7qx2vmqB/SzWUBa7KdMjpdpAHlh5QCSnjessQk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/genproto/googleapis/api v0.0.0-20260112192933-99fd39fd28a9 h1:4DKBrmaqeptdEzp21EfrOEh8LE7PJ5ywH6wydSbOfGY=
google.golang.org/genproto/googleapis/api v0.0.0-20260112192933-99fd39fd28a9/go.mod h1:dd646eSK+Dk9kxVBl1nChEOhJPtMXriCcVb4x3o6J+E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9 h1:IY6/YYRrFUk0JPp0xOVctvFIVuRnjccihY5kxf5g0TE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
+10
View File
@@ -189,6 +189,7 @@ type ParticipantInit struct {
Reconnect bool
ReconnectReason livekit.ReconnectReason
AutoSubscribe bool
AutoSubscribeDataTrack *bool
Client *livekit.ClientInfo
Grants *auth.ClaimGrants
Region string
@@ -220,6 +221,7 @@ func (pi *ParticipantInit) MarshalLogObject(e zapcore.ObjectEncoder) error {
logBoolPtr("Reconnect", &pi.Reconnect)
e.AddString("ReconnectReason", pi.ReconnectReason.String())
logBoolPtr("AutoSubscribe", &pi.AutoSubscribe)
logBoolPtr("AutoSubscribeDataTrack", pi.AutoSubscribeDataTrack)
e.AddObject("Client", logger.Proto(utils.ClientInfoWithoutAddress(pi.Client)))
e.AddObject("Grants", pi.Grants)
e.AddString("Region", pi.Region)
@@ -260,6 +262,10 @@ func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionI
SyncState: pi.SyncState,
UseSinglePeerConnection: pi.UseSinglePeerConnection,
}
if pi.AutoSubscribeDataTrack != nil {
autoSubscribeDataTrack := *pi.AutoSubscribeDataTrack
ss.AutoSubscribeDataTrack = &autoSubscribeDataTrack
}
if pi.SubscriberAllowPause != nil {
subscriberAllowPause := *pi.SubscriberAllowPause
ss.SubscriberAllowPause = &subscriberAllowPause
@@ -292,6 +298,10 @@ func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*
SyncState: ss.SyncState,
UseSinglePeerConnection: ss.UseSinglePeerConnection,
}
if ss.AutoSubscribeDataTrack != nil {
autoSubscribeDataTrack := *ss.AutoSubscribeDataTrack
pi.AutoSubscribeDataTrack = &autoSubscribeDataTrack
}
if ss.SubscriberAllowPause != nil {
subscriberAllowPause := *ss.SubscriberAllowPause
pi.SubscriberAllowPause = &subscriberAllowPause
+26 -13
View File
@@ -152,7 +152,8 @@ type Room struct {
}
type ParticipantOptions struct {
AutoSubscribe bool
AutoSubscribe bool
AutoSubscribeDataTrack bool
}
type agentDispatch struct {
@@ -1004,7 +1005,7 @@ func (r *Room) onSimulateScenario(participant types.LocalParticipant, simulateSc
return nil
}
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
// checks if participant should be auto subscribed to new tracks, assumes lock is already acquired
func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
opts := r.participantOpts[participant.Identity()]
// default to true if no options are set
@@ -1014,6 +1015,16 @@ func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
return true
}
// checks if participant should be auto subscribed to new data tracks, assumes lock is already acquired
func (r *Room) autoSubscribeDataTrack(participant types.LocalParticipant) bool {
opts := r.participantOpts[participant.Identity()]
// default to true if no options are set
if opts != nil && !opts.AutoSubscribeDataTrack {
return false
}
return true
}
func (r *Room) createJoinResponseLocked(
participant types.LocalParticipant,
iceServers []*livekit.ICEServer,
@@ -1160,7 +1171,7 @@ func (r *Room) onDataTrackPublished(participant types.Participant, dt types.Data
// not fully joined. don't subscribe yet
continue
}
if !r.autoSubscribe(existingParticipant) {
if !r.autoSubscribeDataTrack(existingParticipant) {
continue
}
@@ -1461,11 +1472,9 @@ func (r *Room) RemoveParticipant(
func (r *Room) subscribeToExistingTracks(p types.LocalParticipant, isSync bool) {
r.lock.RLock()
shouldSubscribe := r.autoSubscribe(p)
autoSubscribe := r.autoSubscribe(p)
autoSubscribeDataTrack := r.autoSubscribeDataTrack(p)
r.lock.RUnlock()
if !shouldSubscribe {
return
}
var trackIDs []livekit.TrackID
for _, op := range r.GetParticipants() {
@@ -1475,14 +1484,18 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant, isSync bool)
}
// subscribe to all
for _, track := range op.GetPublishedTracks() {
trackIDs = append(trackIDs, track.ID())
p.SubscribeToTrack(track.ID(), isSync)
if autoSubscribe {
for _, track := range op.GetPublishedTracks() {
trackIDs = append(trackIDs, track.ID())
p.SubscribeToTrack(track.ID(), isSync)
}
}
for _, track := range op.GetPublishedDataTracks() {
trackIDs = append(trackIDs, track.ID())
p.SubscribeToDataTrack(track.ID())
if autoSubscribeDataTrack {
for _, track := range op.GetPublishedDataTracks() {
trackIDs = append(trackIDs, track.ID())
p.SubscribeToDataTrack(track.ID())
}
}
}
if len(trackIDs) > 0 {
+3
View File
@@ -518,6 +518,9 @@ func (r *RoomManager) StartSession(
opts := rtc.ParticipantOptions{
AutoSubscribe: pi.AutoSubscribe,
}
if pi.AutoSubscribeDataTrack != nil {
opts.AutoSubscribeDataTrack = *pi.AutoSubscribeDataTrack
}
iceServers := r.iceServersForParticipant(apiKey, participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)
if err = room.Join(participant, requestSource, &opts, iceServers); err != nil {
pLogger.Errorw("could not join room", err)
+15 -6
View File
@@ -221,7 +221,17 @@ func (s *RTCService) validateInternal(
if wrappedJoinRequestBase64 == "" {
pi.Reconnect = boolValue(r.FormValue("reconnect"))
pi.Client = ParseClientInfo(r)
pi.AutoSubscribe = true
if autoSubscribeParam := r.FormValue("auto_subscribe"); autoSubscribeParam != "" {
pi.AutoSubscribe = boolValue(autoSubscribeParam)
}
if autoSubscribeDataTrackParam := r.FormValue("auto_subscribe_data_track"); autoSubscribeDataTrackParam != "" {
autoSubscribeDataTrack := boolValue(autoSubscribeDataTrackParam)
pi.AutoSubscribeDataTrack = &autoSubscribeDataTrack
}
pi.AdaptiveStream = boolValue(r.FormValue("adaptive_stream"))
pi.DisableICELite = boolValue(r.FormValue("disable_ice_lite"))
@@ -232,12 +242,7 @@ func (s *RTCService) validateInternal(
pi.ID = livekit.ParticipantID(r.FormValue("sid"))
}
if autoSubscribe := r.FormValue("auto_subscribe"); autoSubscribe != "" {
pi.AutoSubscribe = boolValue(autoSubscribe)
}
subscriberAllowPauseParam := r.FormValue("subscriber_allow_pause")
if subscriberAllowPauseParam != "" {
if subscriberAllowPauseParam := r.FormValue("subscriber_allow_pause"); subscriberAllowPauseParam != "" {
subscriberAllowPause := boolValue(subscriberAllowPauseParam)
pi.SubscriberAllowPause = &subscriberAllowPause
}
@@ -248,6 +253,10 @@ func (s *RTCService) validateInternal(
pi.Client = joinRequest.ClientInfo
pi.AutoSubscribe = joinRequest.GetConnectionSettings().GetAutoSubscribe()
autoSubscribeDataTrack := joinRequest.GetConnectionSettings().GetAutoSubscribeDataTrack()
pi.AutoSubscribeDataTrack = &autoSubscribeDataTrack
pi.AdaptiveStream = joinRequest.GetConnectionSettings().GetAdaptiveStream()
pi.DisableICELite = joinRequest.GetConnectionSettings().GetDisableIceLite()
+4 -1
View File
@@ -127,6 +127,7 @@ var (
type Options struct {
AutoSubscribe bool
AutoSubscribeDataTrack bool
Publish string
Attributes map[string]string
ClientInfo *livekit.ClientInfo
@@ -162,7 +163,8 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error
}
connectionSettings := &livekit.ConnectionSettings{
AutoSubscribe: opts.AutoSubscribe,
AutoSubscribe: opts.AutoSubscribe,
AutoSubscribeDataTrack: &opts.AutoSubscribeDataTrack,
}
joinRequest := &livekit.JoinRequest{
@@ -185,6 +187,7 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error
sdk := "go"
if opts != nil {
connectUrl += fmt.Sprintf("&auto_subscribe=%t", opts.AutoSubscribe)
connectUrl += fmt.Sprintf("&auto_subscribe_data_track=%t", opts.AutoSubscribeDataTrack)
if opts.Publish != "" {
connectUrl += encodeQueryParam("publish", opts.Publish)
}
+4 -4
View File
@@ -209,9 +209,9 @@ func scenarioDataUnlabeledPublish(t *testing.T) {
func scenarioDataTracksPublishingUponJoining(t *testing.T) {
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("scenarioDataTracksPublishingUponJoining/testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("dtpuj_1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("dtpuj_2", secondServerPort, testRTCServicePath, &testclient.Options{AutoSubscribe: true})
c3 := createRTCClient("dtpuj_3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribe: true})
c1 := createRTCClient("dtpuj_1", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
c2 := createRTCClient("dtpuj_2", secondServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
c3 := createRTCClient("dtpuj_3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
defer stopClients(c1, c2, c3)
waitUntilConnected(t, c1, c2, c3)
@@ -261,7 +261,7 @@ func scenarioDataTracksPublishingUponJoining(t *testing.T) {
logger.Infow("c2 reconnecting")
// connect to a diff port
c2 = createRTCClient("dtpuj_2", defaultServerPort, testRTCServicePath, nil)
c2 = createRTCClient("dtpuj_2", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
defer c2.Stop()
waitUntilConnected(t, c2)
writers = publishDataTracksForClients(t, c2)
+3 -3
View File
@@ -1036,8 +1036,8 @@ func TestSinglePublisherDataTrack(t *testing.T) {
for _, testRTCServicePath := range testRTCServicePaths {
t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
waitUntilConnected(t, c1, c2)
// publish a couple of data tracks and ensure clients receive it ok
@@ -1061,7 +1061,7 @@ func TestSinglePublisherDataTrack(t *testing.T) {
})
// a new client joins and should get the initial stream
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, nil)
c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
// ensure that new client that has joined also received data tracks
waitUntilConnected(t, c3)