Files
livekit/pkg/rtc/participant_data_blob_handler.go
Raja Subramanian 1faab0c48e Add support for data blob (a. k. a. async participant attributes) (#4619)
* Async attributes on participant.

How it is different from existing participant attributes?
1. Async attribute can be added one at a time.
2. These are not included in `ParticipantInfo`.
3. Get an attribute bt participant identity and async attribute ID as
   and when needed.

* clean up

* get full definitions, not just ids

* listener OnDataTrackSchema

* name length config

* data blob

* deps

* static check

* Add missing request ID

* Update protocol commit

* Wire up StoreDataBlobResponse

* Pass request ID through in GetDataBlobResponse

* deps

* atomic

* sctp at 1.9.5

* remove proto clone

---------

Co-authored-by: Jacob Gelman <3182119+ladvoc@users.noreply.github.com>
2026-06-24 14:42:37 +05:30

114 lines
3.4 KiB
Go

// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
func (p *ParticipantImpl) HandleStoreDataBlobRequest(req *livekit.StoreDataBlobRequest) {
if !p.params.EnableParticipantDataBlob {
p.pubLogger.Warnw("data blob not enabled", nil, "req", logger.Proto(req))
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_NOT_ALLOWED,
Message: "data blob not enabled",
})
return
}
if req.Blob == nil || req.Blob.Key == nil || len(req.Blob.Key.String()) == 0 || !p.params.LimitConfig.CheckDataBlobKeyLength(req.Blob.Key.String()) {
p.pubLogger.Warnw("data blob is invalid", nil, "req", logger.Proto(req))
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_INVALID_REQUEST,
Message: "data blob is invalid",
})
return
}
if len(req.Blob.Contents) == 0 {
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_INVALID_REQUEST,
Message: "data blob is empty",
})
return
}
if !p.params.LimitConfig.CanAddDataBlob(p.dataBlob.GetAll(), req.Blob) {
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_LIMIT_EXCEEDED,
Message: "async attribute definition exceeds limit",
})
return
}
p.AddDataBlob(req.Blob)
p.listener().OnStoreDataBlob(p, req.Blob)
p.sendStoreDataBlobResponse(req.RequestId, req.Blob.Key)
}
func (p *ParticipantImpl) HandleGetDataBlobRequest(req *livekit.GetDataBlobRequest) {
if req.Key == nil {
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_INVALID_REQUEST,
Message: "data blob key is required",
})
return
}
p.listener().OnGetDataBlob(p, req)
}
func (p *ParticipantImpl) AddDataBlob(dataBlob *livekit.DataBlob) {
p.dataBlob.Add(dataBlob)
}
func (p *ParticipantImpl) GetDataBlob(key *livekit.DataBlobKey) *livekit.DataBlob {
return p.dataBlob.Get(key)
}
func (p *ParticipantImpl) ProcessGetDataBlobRequest(req *livekit.GetDataBlobRequest, publisher types.Participant) {
if publisher == nil {
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_NOT_FOUND,
Message: "participant not found",
})
return
}
dataBlob := publisher.GetDataBlob(req.Key)
if dataBlob == nil {
p.sendRequestResponse(&livekit.RequestResponse{
RequestId: req.RequestId,
Reason: livekit.RequestResponse_NOT_FOUND,
Message: "data blob not found",
})
return
}
p.sendGetDataBlobResponse(req.RequestId, dataBlob)
}
func (p *ParticipantImpl) GetAllDataBlob() []*livekit.DataBlob {
return p.dataBlob.GetAll()
}