datastream support

Signed-off-by: shishir gowda <shishir@livekit.io>
This commit is contained in:
shishir gowda
2022-11-02 13:13:52 +01:00
parent ac5bbe8d38
commit a6c6bd5399
13 changed files with 214 additions and 22 deletions
+2 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c
github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
github.com/magefile/mage v1.14.0
@@ -66,6 +66,7 @@ require (
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/subcommands v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/jellydator/ttlcache/v2 v2.11.1 // indirect
github.com/josharian/native v1.0.0 // indirect
github.com/jxskiss/base62 v1.1.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
+8 -2
View File
@@ -204,6 +204,8 @@ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64=
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/josharian/native v1.0.0 h1:Ts/E8zCSEsG17dUqv7joXJFybuMLjQfWE04tsBODTxk=
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
@@ -246,8 +248,10 @@ github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc h1:e3GIA9AL6h4a38
github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b h1:RBNV8TckETSkIkKxcD12d8nZKVkB9GSY/sQlMoaruP4=
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c h1:5BciCRrrcYE8HyKACliG2RTwNhkT8dYtPu4rp2O8Sq4=
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI=
github.com/livekit/protocol v1.1.3-0.20221101231808-eaca87b5819b h1:/4pVDA6ibizN8xz4nq5snudJ5fAAgZH7XFbmj68GOHY=
github.com/livekit/protocol v1.1.3-0.20221101231808-eaca87b5819b/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI=
github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558 h1:NQGeFjdkBund/JX1rIIQF0sSEd4fxA8yl4rlYDF6M2g=
github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw=
@@ -477,6 +481,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
@@ -696,6 +701,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
+14
View File
@@ -142,6 +142,7 @@ type ParticipantImpl struct {
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, *livekit.DataPacket)
onSubscribedTo func(types.LocalParticipant, livekit.ParticipantID)
onDataStreamRequest func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error)
migrateState atomic.Value // types.MigrateState
@@ -387,6 +388,12 @@ func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, l
p.lock.Unlock()
}
func (p *ParticipantImpl) OnDataStreamRequest(callback func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error)) {
p.lock.Lock()
p.onDataStreamRequest = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
p.lock.Lock()
p.onClose = callback
@@ -1198,6 +1205,13 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
payload.User.ParticipantSid = string(p.params.SID)
onDataPacket(p, &dp)
}
case *livekit.DataPacket_Stream:
p.lock.RLock()
onDataPacket := p.onDataPacket
p.lock.RUnlock()
if onDataPacket != nil {
onDataPacket(p, &dp)
}
default:
p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload)
}
+18
View File
@@ -29,6 +29,24 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
}
}
func (p *ParticipantImpl) HandleDataStreamRequest(request *livekit.GetDataStreamRequest) error {
handler := p.onDataStreamRequest
if handler == nil {
rsp, err := handler(request)
if err != nil {
return err
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_DataStreamResponse{
DataStreamResponse: &livekit.GetDataStreamResponse{
Packets: rsp.GetPackets(),
},
},
})
}
return errNotFound
}
func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error {
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
+28
View File
@@ -67,6 +67,7 @@ type Room struct {
onParticipantChanged func(p types.LocalParticipant)
onMetadataUpdate func(metadata string)
onClose func()
onGetDataStream func(bucket string) ([]*livekit.DataPacket_Stream, error)
}
type ParticipantOptions struct {
@@ -111,6 +112,12 @@ func NewRoom(
return r
}
func (r *Room) OnGetDataStream(callback func(bucket string) ([]*livekit.DataPacket_Stream, error)) {
r.lock.Lock()
defer r.lock.Unlock()
r.onGetDataStream = callback
}
func (r *Room) ToProto() *livekit.Room {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -214,6 +221,26 @@ func (r *Room) Release() {
r.holds.Dec()
}
func (r *Room) OnDataStreamRequest(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error) {
handler := r.onGetDataStream
if handler == nil {
return nil, errNotFound
}
data, err := handler(request.GetName())
if err != nil {
return nil, err
}
var packets []*livekit.StreamPacket
for _, d := range data {
packets = append(packets, &livekit.StreamPacket{
Name: d.Stream.GetName(),
Key: d.Stream.GetKey(),
Value: d.Stream.GetValue(),
})
}
return &livekit.GetDataStreamResponse{Packets: packets}, nil
}
func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error {
r.lock.Lock()
defer r.lock.Unlock()
@@ -273,6 +300,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
participant.OnTrackUpdated(r.onTrackUpdated)
participant.OnParticipantUpdate(r.onParticipantUpdate)
participant.OnDataPacket(r.onDataPacket)
participant.OnDataStreamRequest(r.OnDataStreamRequest)
participant.OnSubscribedTo(func(p types.LocalParticipant, publisherID livekit.ParticipantID) {
go func() {
// when a participant subscribes to another participant,
+3
View File
@@ -88,6 +88,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
pLogger.Warnw("could not simulate scenario", err,
"simulate", msg.Simulate)
}
case *livekit.SignalRequest_DataStreamRequest:
pLogger.Infow("DataStream request")
participant.HandleDataStreamRequest(msg.DataStreamRequest)
}
return nil
}
+3
View File
@@ -305,6 +305,7 @@ type LocalParticipant interface {
OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
OnClaimsChanged(callback func(LocalParticipant))
OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
OnDataStreamRequest(callback func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error))
// session migration
MaybeStartMigration(force bool, onStart func()) bool
@@ -328,6 +329,8 @@ type LocalParticipant interface {
UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error
UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error
HandleDataStreamRequest(request *livekit.GetDataStreamRequest) error
}
// Room is a container of participants, and can provide room-level actions
+84
View File
@@ -0,0 +1,84 @@
package service
import (
"strings"
"sync"
"time"
"github.com/jellydator/ttlcache/v2"
"github.com/livekit/protocol/livekit"
)
const (
KeySeparator = "!_!"
)
type LocalDataStreamStore struct {
sync.RWMutex
ttl time.Duration
Cache *ttlcache.Cache
}
func NewLocalDataStreamStore(ttl time.Duration) DataStreamStore {
return &LocalDataStreamStore{
Cache: ttlcache.NewCache(),
ttl: ttl,
}
}
func generateKeyName(bucket string, key string) string {
return bucket + KeySeparator + key
}
func getBucketNameFromKey(key string) string {
return strings.Split(key, KeySeparator)[0]
}
func (ds *LocalDataStreamStore) CreateBucket(bucket string, ttl time.Duration) error {
// no op
return nil
}
func (ds *LocalDataStreamStore) DeleteBucket(bucket string) error {
ds.Lock()
defer ds.Unlock()
for _, key := range ds.Cache.GetKeys() {
if bucket == getBucketNameFromKey(key) {
ds.Cache.Remove(key)
}
}
return nil
}
func (ds *LocalDataStreamStore) Get(bucket string, key string) (*livekit.DataPacket_Stream, error) {
ds.RLock()
defer ds.RUnlock()
v, err := ds.Cache.Get(generateKeyName(bucket, key))
if err != nil {
return nil, err
}
return v.(*livekit.DataPacket_Stream), nil
}
func (ds *LocalDataStreamStore) GetAll(bucket string) ([]*livekit.DataPacket_Stream, error) {
var rsp []*livekit.DataPacket_Stream
ds.RLock()
defer ds.RUnlock()
for _, key := range ds.Cache.GetKeys() {
if bucket == getBucketNameFromKey(key) {
d, err := ds.Cache.Get(key)
if err != nil {
continue
}
rsp = append(rsp, d.(*livekit.DataPacket_Stream))
}
}
return rsp, nil
}
func (ds *LocalDataStreamStore) Put(bucket string, key string, value *livekit.DataPacket_Stream) error {
ds.Lock()
defer ds.Unlock()
return ds.Cache.SetWithTTL(generateKeyName(bucket, key), *value, ds.ttl)
}
+8
View File
@@ -61,3 +61,11 @@ type IngressStore interface {
type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
}
type DataStreamStore interface {
CreateBucket(bucket string, ttl time.Duration) error
DeleteBucket(bucket string) error
Get(bucket string, key string) (*livekit.DataPacket_Stream, error)
GetAll(bucket string) ([]*livekit.DataPacket_Stream, error)
Put(bucket string, key string, value *livekit.DataPacket_Stream) error
}
+13 -1
View File
@@ -49,6 +49,7 @@ type RoomManager struct {
telemetry telemetry.TelemetryService
clientConfManager clientconfiguration.ClientConfigurationManager
egressLauncher rtc.EgressLauncher
dataStreamManager DataStreamStore
rooms map[livekit.RoomName]*rtc.Room
@@ -63,6 +64,7 @@ func NewLocalRoomManager(
telemetry telemetry.TelemetryService,
clientConfManager clientconfiguration.ClientConfigurationManager,
egressLauncher rtc.EgressLauncher,
dataStreamStore DataStreamStore,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
@@ -79,6 +81,7 @@ func NewLocalRoomManager(
telemetry: telemetry,
clientConfManager: clientConfManager,
egressLauncher: egressLauncher,
dataStreamManager: dataStreamStore,
rooms: make(map[livekit.RoomName]*rtc.Room),
@@ -114,7 +117,7 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName)
var err, err2 error
wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(3)
// clear routing information
go func() {
defer wg.Done()
@@ -126,6 +129,12 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName)
err2 = r.roomStore.DeleteRoom(ctx, roomName)
}()
// clear dataStore
go func() {
defer wg.Done()
r.dataStreamManager.DeleteBucket(string(roomName))
}()
wg.Wait()
if err2 != nil {
err = err2
@@ -394,6 +403,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
// construct ice servers
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher)
newRoom.OnGetDataStream(r.dataStreamManager.GetAll)
newRoom.OnClose(func() {
roomInfo := newRoom.ToProto()
r.telemetry.RoomEnded(ctx, roomInfo)
@@ -425,6 +435,8 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
newRoom.Hold()
// create room datastore
r.dataStreamManager.CreateBucket(string(roomName), time.Hour)
r.telemetry.RoomStarted(ctx, newRoom.ToProto())
prometheus.RoomStarted()
+19 -16
View File
@@ -27,19 +27,20 @@ import (
)
type LivekitServer struct {
config *config.Config
egressService *EgressService
ingressService *IngressService
rtcService *RTCService
httpServer *http.Server
promServer *http.Server
router routing.Router
roomManager *RoomManager
turnServer *turn.Server
currentNode routing.LocalNode
running atomic.Bool
doneChan chan struct{}
closedChan chan struct{}
config *config.Config
egressService *EgressService
ingressService *IngressService
rtcService *RTCService
httpServer *http.Server
promServer *http.Server
router routing.Router
roomManager *RoomManager
turnServer *turn.Server
currentNode routing.LocalNode
running atomic.Bool
doneChan chan struct{}
closedChan chan struct{}
dataStreamManager DataStreamStore
}
func NewLivekitServer(conf *config.Config,
@@ -52,6 +53,7 @@ func NewLivekitServer(conf *config.Config,
roomManager *RoomManager,
turnServer *turn.Server,
currentNode routing.LocalNode,
dataStreamStore DataStreamStore,
) (s *LivekitServer, err error) {
s = &LivekitServer{
config: conf,
@@ -61,9 +63,10 @@ func NewLivekitServer(conf *config.Config,
router: router,
roomManager: roomManager,
// turn server starts automatically
turnServer: turnServer,
currentNode: currentNode,
closedChan: make(chan struct{}),
turnServer: turnServer,
currentNode: currentNode,
closedChan: make(chan struct{}),
dataStreamManager: dataStreamStore,
}
middlewares := []negroni.Handler{
+6
View File
@@ -8,6 +8,7 @@ import (
"crypto/tls"
"fmt"
"os"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/wire"
@@ -33,6 +34,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getNodeID,
createRedisClient,
createStore,
createDataStreamStore,
wire.Bind(new(ServiceStore), new(ObjectStore)),
createKeyProvider,
createWebhookNotifier,
@@ -211,3 +213,7 @@ func getRoomConf(config *config.Config) config.RoomConfig {
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
func createDataStreamStore() DataStreamStore {
return NewLocalDataStreamStore(60 * time.Minute)
}
+8 -2
View File
@@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
"os"
"time"
)
import (
@@ -71,7 +72,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher)
dataStreamStore := createDataStreamStore()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, dataStreamStore)
if err != nil {
return nil, err
}
@@ -80,7 +82,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, rtcService, keyProvider, router, roomManager, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, rtcService, keyProvider, router, roomManager, server, currentNode, dataStreamStore)
if err != nil {
return nil, err
}
@@ -237,3 +239,7 @@ func getRoomConf(config2 *config.Config) config.RoomConfig {
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
func createDataStreamStore() DataStreamStore {
return NewLocalDataStreamStore(60 * time.Minute)
}