diff --git a/go.mod b/go.mod index 61d9e51fb..df9ead462 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b - github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c + github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 @@ -66,6 +66,7 @@ require ( github.com/google/go-cmp v0.5.8 // indirect github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/jellydator/ttlcache/v2 v2.11.1 // indirect github.com/josharian/native v1.0.0 // indirect github.com/jxskiss/base62 v1.1.0 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect diff --git a/go.sum b/go.sum index 98ba015b3..bb18a5591 100644 --- a/go.sum +++ b/go.sum @@ -204,6 +204,8 @@ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64= +github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.0.0 h1:Ts/E8zCSEsG17dUqv7joXJFybuMLjQfWE04tsBODTxk= github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= @@ -246,8 +248,10 @@ github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc h1:e3GIA9AL6h4a38 github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b h1:RBNV8TckETSkIkKxcD12d8nZKVkB9GSY/sQlMoaruP4= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c h1:5BciCRrrcYE8HyKACliG2RTwNhkT8dYtPu4rp2O8Sq4= -github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI= +github.com/livekit/protocol v1.1.3-0.20221101231808-eaca87b5819b h1:/4pVDA6ibizN8xz4nq5snudJ5fAAgZH7XFbmj68GOHY= +github.com/livekit/protocol v1.1.3-0.20221101231808-eaca87b5819b/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI= +github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558 h1:NQGeFjdkBund/JX1rIIQF0sSEd4fxA8yl4rlYDF6M2g= +github.com/livekit/protocol v1.1.3-0.20221102105925-3017cf1fb558/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= @@ -477,6 +481,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -696,6 +701,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 376ddc0c9..5598251e8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -142,6 +142,7 @@ type ParticipantImpl struct { onParticipantUpdate func(types.LocalParticipant) onDataPacket func(types.LocalParticipant, *livekit.DataPacket) onSubscribedTo func(types.LocalParticipant, livekit.ParticipantID) + onDataStreamRequest func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error) migrateState atomic.Value // types.MigrateState @@ -387,6 +388,12 @@ func (p *ParticipantImpl) OnSubscribedTo(callback func(types.LocalParticipant, l p.lock.Unlock() } +func (p *ParticipantImpl) OnDataStreamRequest(callback func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error)) { + p.lock.Lock() + p.onDataStreamRequest = callback + p.lock.Unlock() +} + func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) { p.lock.Lock() p.onClose = callback @@ -1198,6 +1205,13 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt payload.User.ParticipantSid = string(p.params.SID) onDataPacket(p, &dp) } + case *livekit.DataPacket_Stream: + p.lock.RLock() + onDataPacket := p.onDataPacket + p.lock.RUnlock() + if onDataPacket != nil { + onDataPacket(p, &dp) + } default: p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload) } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index fa61ed5e8..c14d694d3 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -29,6 +29,24 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { } } +func (p *ParticipantImpl) HandleDataStreamRequest(request *livekit.GetDataStreamRequest) error { + handler := p.onDataStreamRequest + if handler == nil { + rsp, err := handler(request) + if err != nil { + return err + } + return p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_DataStreamResponse{ + DataStreamResponse: &livekit.GetDataStreamResponse{ + Packets: rsp.GetPackets(), + }, + }, + }) + } + return errNotFound +} + func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 165b515cf..00b94e696 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -67,6 +67,7 @@ type Room struct { onParticipantChanged func(p types.LocalParticipant) onMetadataUpdate func(metadata string) onClose func() + onGetDataStream func(bucket string) ([]*livekit.DataPacket_Stream, error) } type ParticipantOptions struct { @@ -111,6 +112,12 @@ func NewRoom( return r } +func (r *Room) OnGetDataStream(callback func(bucket string) ([]*livekit.DataPacket_Stream, error)) { + r.lock.Lock() + defer r.lock.Unlock() + r.onGetDataStream = callback +} + func (r *Room) ToProto() *livekit.Room { r.lock.RLock() defer r.lock.RUnlock() @@ -214,6 +221,26 @@ func (r *Room) Release() { r.holds.Dec() } +func (r *Room) OnDataStreamRequest(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error) { + handler := r.onGetDataStream + if handler == nil { + return nil, errNotFound + } + data, err := handler(request.GetName()) + if err != nil { + return nil, err + } + var packets []*livekit.StreamPacket + for _, d := range data { + packets = append(packets, &livekit.StreamPacket{ + Name: d.Stream.GetName(), + Key: d.Stream.GetKey(), + Value: d.Stream.GetValue(), + }) + } + return &livekit.GetDataStreamResponse{Packets: packets}, nil +} + func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error { r.lock.Lock() defer r.lock.Unlock() @@ -273,6 +300,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions participant.OnTrackUpdated(r.onTrackUpdated) participant.OnParticipantUpdate(r.onParticipantUpdate) participant.OnDataPacket(r.onDataPacket) + participant.OnDataStreamRequest(r.OnDataStreamRequest) participant.OnSubscribedTo(func(p types.LocalParticipant, publisherID livekit.ParticipantID) { go func() { // when a participant subscribes to another participant, diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 9a59ae857..1f6e90ac5 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -88,6 +88,9 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant pLogger.Warnw("could not simulate scenario", err, "simulate", msg.Simulate) } + case *livekit.SignalRequest_DataStreamRequest: + pLogger.Infow("DataStream request") + participant.HandleDataStreamRequest(msg.DataStreamRequest) } return nil } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 79e9649b1..b741c74dd 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -305,6 +305,7 @@ type LocalParticipant interface { OnClose(callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) OnClaimsChanged(callback func(LocalParticipant)) OnReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) + OnDataStreamRequest(callback func(request *livekit.GetDataStreamRequest) (*livekit.GetDataStreamResponse, error)) // session migration MaybeStartMigration(force bool, onStart func()) bool @@ -328,6 +329,8 @@ type LocalParticipant interface { UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error + + HandleDataStreamRequest(request *livekit.GetDataStreamRequest) error } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/service/datastreamstore.go b/pkg/service/datastreamstore.go new file mode 100644 index 000000000..af3980517 --- /dev/null +++ b/pkg/service/datastreamstore.go @@ -0,0 +1,84 @@ +package service + +import ( + "strings" + "sync" + "time" + + "github.com/jellydator/ttlcache/v2" + + "github.com/livekit/protocol/livekit" +) + +const ( + KeySeparator = "!_!" +) + +type LocalDataStreamStore struct { + sync.RWMutex + ttl time.Duration + Cache *ttlcache.Cache +} + +func NewLocalDataStreamStore(ttl time.Duration) DataStreamStore { + return &LocalDataStreamStore{ + Cache: ttlcache.NewCache(), + ttl: ttl, + } +} + +func generateKeyName(bucket string, key string) string { + return bucket + KeySeparator + key +} + +func getBucketNameFromKey(key string) string { + return strings.Split(key, KeySeparator)[0] +} + +func (ds *LocalDataStreamStore) CreateBucket(bucket string, ttl time.Duration) error { + // no op + return nil +} + +func (ds *LocalDataStreamStore) DeleteBucket(bucket string) error { + ds.Lock() + defer ds.Unlock() + for _, key := range ds.Cache.GetKeys() { + if bucket == getBucketNameFromKey(key) { + ds.Cache.Remove(key) + } + } + return nil +} + +func (ds *LocalDataStreamStore) Get(bucket string, key string) (*livekit.DataPacket_Stream, error) { + ds.RLock() + defer ds.RUnlock() + v, err := ds.Cache.Get(generateKeyName(bucket, key)) + if err != nil { + return nil, err + } + return v.(*livekit.DataPacket_Stream), nil +} + +func (ds *LocalDataStreamStore) GetAll(bucket string) ([]*livekit.DataPacket_Stream, error) { + var rsp []*livekit.DataPacket_Stream + ds.RLock() + defer ds.RUnlock() + for _, key := range ds.Cache.GetKeys() { + if bucket == getBucketNameFromKey(key) { + d, err := ds.Cache.Get(key) + if err != nil { + continue + } + rsp = append(rsp, d.(*livekit.DataPacket_Stream)) + } + } + return rsp, nil +} + +func (ds *LocalDataStreamStore) Put(bucket string, key string, value *livekit.DataPacket_Stream) error { + ds.Lock() + defer ds.Unlock() + return ds.Cache.SetWithTTL(generateKeyName(bucket, key), *value, ds.ttl) +} diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index c2b9876e6..5c8e56b97 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -61,3 +61,11 @@ type IngressStore interface { type RoomAllocator interface { CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) } + +type DataStreamStore interface { + CreateBucket(bucket string, ttl time.Duration) error + DeleteBucket(bucket string) error + Get(bucket string, key string) (*livekit.DataPacket_Stream, error) + GetAll(bucket string) ([]*livekit.DataPacket_Stream, error) + Put(bucket string, key string, value *livekit.DataPacket_Stream) error +} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 2a0f90406..a1da2249b 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -49,6 +49,7 @@ type RoomManager struct { telemetry telemetry.TelemetryService clientConfManager clientconfiguration.ClientConfigurationManager egressLauncher rtc.EgressLauncher + dataStreamManager DataStreamStore rooms map[livekit.RoomName]*rtc.Room @@ -63,6 +64,7 @@ func NewLocalRoomManager( telemetry telemetry.TelemetryService, clientConfManager clientconfiguration.ClientConfigurationManager, egressLauncher rtc.EgressLauncher, + dataStreamStore DataStreamStore, ) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) @@ -79,6 +81,7 @@ func NewLocalRoomManager( telemetry: telemetry, clientConfManager: clientConfManager, egressLauncher: egressLauncher, + dataStreamManager: dataStreamStore, rooms: make(map[livekit.RoomName]*rtc.Room), @@ -114,7 +117,7 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName) var err, err2 error wg := sync.WaitGroup{} - wg.Add(2) + wg.Add(3) // clear routing information go func() { defer wg.Done() @@ -126,6 +129,12 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName) err2 = r.roomStore.DeleteRoom(ctx, roomName) }() + // clear dataStore + go func() { + defer wg.Done() + r.dataStreamManager.DeleteBucket(string(roomName)) + }() + wg.Wait() if err2 != nil { err = err2 @@ -394,6 +403,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room // construct ice servers newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) + newRoom.OnGetDataStream(r.dataStreamManager.GetAll) newRoom.OnClose(func() { roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) @@ -425,6 +435,8 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom.Hold() + // create room datastore + r.dataStreamManager.CreateBucket(string(roomName), time.Hour) r.telemetry.RoomStarted(ctx, newRoom.ToProto()) prometheus.RoomStarted() diff --git a/pkg/service/server.go b/pkg/service/server.go index 60a94b9e9..5cc23e30f 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -27,19 +27,20 @@ import ( ) type LivekitServer struct { - config *config.Config - egressService *EgressService - ingressService *IngressService - rtcService *RTCService - httpServer *http.Server - promServer *http.Server - router routing.Router - roomManager *RoomManager - turnServer *turn.Server - currentNode routing.LocalNode - running atomic.Bool - doneChan chan struct{} - closedChan chan struct{} + config *config.Config + egressService *EgressService + ingressService *IngressService + rtcService *RTCService + httpServer *http.Server + promServer *http.Server + router routing.Router + roomManager *RoomManager + turnServer *turn.Server + currentNode routing.LocalNode + running atomic.Bool + doneChan chan struct{} + closedChan chan struct{} + dataStreamManager DataStreamStore } func NewLivekitServer(conf *config.Config, @@ -52,6 +53,7 @@ func NewLivekitServer(conf *config.Config, roomManager *RoomManager, turnServer *turn.Server, currentNode routing.LocalNode, + dataStreamStore DataStreamStore, ) (s *LivekitServer, err error) { s = &LivekitServer{ config: conf, @@ -61,9 +63,10 @@ func NewLivekitServer(conf *config.Config, router: router, roomManager: roomManager, // turn server starts automatically - turnServer: turnServer, - currentNode: currentNode, - closedChan: make(chan struct{}), + turnServer: turnServer, + currentNode: currentNode, + closedChan: make(chan struct{}), + dataStreamManager: dataStreamStore, } middlewares := []negroni.Handler{ diff --git a/pkg/service/wire.go b/pkg/service/wire.go index be6f8cf04..de5e51a3b 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -8,6 +8,7 @@ import ( "crypto/tls" "fmt" "os" + "time" "github.com/go-redis/redis/v8" "github.com/google/wire" @@ -33,6 +34,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getNodeID, createRedisClient, createStore, + createDataStreamStore, wire.Bind(new(ServiceStore), new(ObjectStore)), createKeyProvider, createWebhookNotifier, @@ -211,3 +213,7 @@ func getRoomConf(config *config.Config) config.RoomConfig { func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } + +func createDataStreamStore() DataStreamStore { + return NewLocalDataStreamStore(60 * time.Minute) +} diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index b6da2453c..dad20fbf5 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "gopkg.in/yaml.v3" "os" + "time" ) import ( @@ -71,7 +72,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) clientConfigurationManager := createClientConfiguration() - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher) + dataStreamStore := createDataStreamStore() + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, dataStreamStore) if err != nil { return nil, err } @@ -80,7 +82,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, rtcService, keyProvider, router, roomManager, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, rtcService, keyProvider, router, roomManager, server, currentNode, dataStreamStore) if err != nil { return nil, err } @@ -237,3 +239,7 @@ func getRoomConf(config2 *config.Config) config.RoomConfig { func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) { return NewTurnServer(conf, authHandler, false) } + +func createDataStreamStore() DataStreamStore { + return NewLocalDataStreamStore(60 * time.Minute) +}