diff --git a/cmd/cli/commands/room.go b/cmd/cli/commands/room.go index cff5d8b8f..b8b5f9f7d 100644 --- a/cmd/cli/commands/room.go +++ b/cmd/cli/commands/room.go @@ -230,7 +230,7 @@ func muteTrack(c *cli.Context) error { func contextWithAccessToken(c *cli.Context, grant *auth.VideoGrant) context.Context { ctx := context.Background() - token, err := accessToken(c, grant, "") + token, err := accessToken(c, grant, "").ToJWT() if err != nil { logger.Errorw("Could not get access token", "err", err) } diff --git a/cmd/cli/commands/rtc.go b/cmd/cli/commands/rtc.go index 3d26637b9..4a4c97c56 100644 --- a/cmd/cli/commands/rtc.go +++ b/cmd/cli/commands/rtc.go @@ -63,7 +63,7 @@ func joinRoom(c *cli.Context) error { token, err = accessToken(c, &auth.VideoGrant{ RoomJoin: true, Room: roomId, - }, identity) + }, identity).ToJWT() if err != nil { return err } diff --git a/cmd/cli/commands/token.go b/cmd/cli/commands/token.go index b0f9c3cc5..0846ec348 100644 --- a/cmd/cli/commands/token.go +++ b/cmd/cli/commands/token.go @@ -1,6 +1,7 @@ package commands import ( + "encoding/json" "fmt" "github.com/urfave/cli/v2" @@ -25,6 +26,10 @@ var ( Name: "create", Usage: "enable token to be used to create rooms", }, + &cli.BoolFlag{ + Name: "admin", + Usage: "enable token to be used to manage a room", + }, &cli.StringFlag{ Name: "participant", Aliases: []string{"p"}, @@ -35,6 +40,10 @@ var ( Aliases: []string{"r"}, Usage: "name of the room to join, empty to allow joining all rooms", }, + &cli.StringFlag{ + Name: "metadata", + Usage: "JSON metadata to encode in the token, will be passed to participant", + }, devFlag, }, }, @@ -46,6 +55,8 @@ func createToken(c *cli.Context) error { return fmt.Errorf("api-key and api-secret are required") } p := c.String("participant") // required only for join + room := c.String("room") + metadata := c.String("metadata") grant := &auth.VideoGrant{} if c.Bool("create") { @@ -53,21 +64,31 @@ func createToken(c *cli.Context) error { } if c.Bool("join") { grant.RoomJoin = true - - if room := c.String("room"); room != "" { - grant.Room = room - } - + grant.Room = room if p == "" { return fmt.Errorf("participant name is required") } } - - if !grant.RoomJoin && !grant.RoomCreate { - return fmt.Errorf("one of --join or --create is required") + if c.Bool("admin") { + grant.RoomAdmin = true + grant.Room = room } - token, err := accessToken(c, grant, p) + if !grant.RoomJoin && !grant.RoomCreate && !grant.RoomAdmin { + return fmt.Errorf("one of --join, --create, or --admin is required") + } + + at := accessToken(c, grant, p) + + if metadata != "" { + var md map[string]interface{} + if err := json.Unmarshal([]byte(metadata), &md); err != nil { + return err + } + at.SetMetadata(md) + } + + token, err := at.ToJWT() if err != nil { return err } diff --git a/cmd/cli/commands/utils.go b/cmd/cli/commands/utils.go index 3e6b04d95..e0ab185bf 100644 --- a/cmd/cli/commands/utils.go +++ b/cmd/cli/commands/utils.go @@ -62,12 +62,12 @@ func ExpandUser(p string) string { return p } -func accessToken(c *cli.Context, grant *auth.VideoGrant, identity string) (value string, err error) { +func accessToken(c *cli.Context, grant *auth.VideoGrant, identity string) *auth.AccessToken { apiKey := c.String("api-key") apiSecret := c.String("api-secret") if apiKey == "" && apiSecret == "" { // not provided, don't sign request - return + return nil } isDev := c.Bool("dev") @@ -79,5 +79,5 @@ func accessToken(c *cli.Context, grant *auth.VideoGrant, identity string) (value fmt.Println("creating dev token") at.SetValidFor(time.Hour * 24 * 30) } - return at.ToJWT() + return at } diff --git a/pkg/auth/accesstoken.go b/pkg/auth/accesstoken.go index 22d99a7ef..15c8f7b70 100644 --- a/pkg/auth/accesstoken.go +++ b/pkg/auth/accesstoken.go @@ -13,11 +13,12 @@ const ( // Signer that produces token signed with API key and secret type AccessToken struct { - apiKey string - secret string - identity string - grant *VideoGrant - validFor time.Duration + apiKey string + secret string + identity string + videoGrant *VideoGrant + metadata map[string]interface{} + validFor time.Duration } func NewAccessToken(key string, secret string) *AccessToken { @@ -38,7 +39,12 @@ func (t *AccessToken) SetValidFor(duration time.Duration) *AccessToken { } func (t *AccessToken) AddGrant(grant *VideoGrant) *AccessToken { - t.grant = grant + t.videoGrant = grant + return t +} + +func (t *AccessToken) SetMetadata(md map[string]interface{}) *AccessToken { + t.metadata = md return t } @@ -65,8 +71,11 @@ func (t *AccessToken) ToJWT() (string, error) { ID: t.identity, } grants := &ClaimGrants{} - if t.grant != nil { - grants.Video = t.grant + if t.videoGrant != nil { + grants.Video = t.videoGrant + } + if t.metadata != nil { + grants.Metadata = t.metadata } return jwt.Signed(sig).Claims(cl).Claims(grants).CompactSerialize() } diff --git a/pkg/auth/accesstoken_test.go b/pkg/auth/accesstoken_test.go index f242efac1..8c9867141 100644 --- a/pkg/auth/accesstoken_test.go +++ b/pkg/auth/accesstoken_test.go @@ -12,7 +12,9 @@ import ( "github.com/livekit/livekit-server/pkg/utils" ) -func TestAPIIssuer(t *testing.T) { +func TestAccessToken(t *testing.T) { + t.Parallel() + t.Run("keys must be set", func(t *testing.T) { token := auth.NewAccessToken("", "") _, err := token.ToJWT() diff --git a/pkg/auth/grants.go b/pkg/auth/grants.go index 22b1c5a7a..f70acc368 100644 --- a/pkg/auth/grants.go +++ b/pkg/auth/grants.go @@ -12,6 +12,7 @@ type VideoGrant struct { } type ClaimGrants struct { - Identity string `json:"-"` - Video *VideoGrant `json:"video,omitempty"` + Identity string `json:"-"` + Video *VideoGrant `json:"video,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` } diff --git a/pkg/auth/verifier_test.go b/pkg/auth/verifier_test.go index ca0d3caca..8d8d74f0c 100644 --- a/pkg/auth/verifier_test.go +++ b/pkg/auth/verifier_test.go @@ -9,7 +9,7 @@ import ( "github.com/livekit/livekit-server/pkg/auth" ) -func TestAPIVerifier(t *testing.T) { +func TestVerifier(t *testing.T) { apiKey := "APID3B67uxk4Nj2GKiRPibAZ9" secret := "YHC-CUhbQhGeVCaYgn1BNA++" accessToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2MDg5MzAzMDgsImlzcyI6IkFQSUQzQjY3dXhrNE5qMkdLaVJQaWJBWjkiLCJuYmYiOjE2MDg5MjY3MDgsInJvb21fam9pbiI6dHJ1ZSwicm9vbV9zaWQiOiJteWlkIiwic3ViIjoiQVBJRDNCNjd1eGs0TmoyR0tpUlBpYkFaOSJ9.cmHEBq0MLyRqphmVLM2cLXg5ao5Sro7am8yXhcYKcwE" @@ -50,4 +50,28 @@ func TestAPIVerifier(t *testing.T) { assert.NoError(t, err) assert.Equal(t, &claim, decoded.Video) }) + + t.Run("ensure metadata can be passed through", func(t *testing.T) { + metadata := map[string]interface{}{ + "user": "value", + "number": float64(3), + } + at := auth.NewAccessToken(apiKey, secret). + AddGrant(&auth.VideoGrant{ + RoomAdmin: true, + Room: "myroom", + }). + SetMetadata(metadata) + + authToken, err := at.ToJWT() + assert.NoError(t, err) + + v, err := auth.ParseAPIToken(authToken) + assert.NoError(t, err) + + decoded, err := v.Verify(secret) + assert.NoError(t, err) + + assert.EqualValues(t, metadata, decoded.Metadata) + }) } diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 5c78ad541..e709724f4 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -23,7 +23,7 @@ type MessageSource interface { ReadChan() <-chan proto.Message } -type NewParticipantCallback func(roomName, identity string, reconnect bool, requestSource MessageSource, responseSink MessageSink) +type NewParticipantCallback func(roomName, identity, metadata string, reconnect bool, requestSource MessageSource, responseSink MessageSink) type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMessage) // Router allows multiple nodes to coordinate the participant session @@ -39,7 +39,7 @@ type Router interface { ListNodes() ([]*livekit.Node, error) // participant signal connection is ready to start - StartParticipantSignal(roomName, identity string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) + StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) // sends a message to RTC node SendRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) error diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index fc3fcf3cf..a4bc05782 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -65,7 +65,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { }, nil } -func (r *LocalRouter) StartParticipantSignal(roomName, identity string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) { +func (r *LocalRouter) StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) { // treat it as a new participant connecting if r.onNewParticipant == nil { return nil, nil, ErrHandlerNotDefined @@ -79,6 +79,7 @@ func (r *LocalRouter) StartParticipantSignal(roomName, identity string, reconnec r.onNewParticipant( roomName, identity, + metadata, reconnect, // request source reqChan, diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index ee87c2adb..252407380 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -127,7 +127,7 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) { } // signal connection sets up paths to the RTC node, and starts to route messages to that message queue -func (r *RedisRouter) StartParticipantSignal(roomName, identity string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) { +func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (reqSink MessageSink, resSource MessageSource, err error) { // find the node where the room is hosted at rtcNode, err := r.GetNodeForRoom(roomName) if err != nil { @@ -149,6 +149,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName, identity string, reconnec err = sink.WriteMessage(&livekit.StartSession{ RoomName: roomName, Identity: identity, + Metadata: metadata, // connection id is to allow the RTC node to identify where to route the message back to ConnectionId: connectionId, Reconnect: reconnect, @@ -219,6 +220,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK r.onNewParticipant( ss.RoomName, ss.Identity, + ss.Metadata, ss.Reconnect, reqChan, resSink, diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index 61aa0451c..f74ef99ce 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -123,12 +123,13 @@ type FakeRouter struct { startReturnsOnCall map[int]struct { result1 error } - StartParticipantSignalStub func(string, string, bool) (routing.MessageSink, routing.MessageSource, error) + StartParticipantSignalStub func(string, string, string, bool) (routing.MessageSink, routing.MessageSource, error) startParticipantSignalMutex sync.RWMutex startParticipantSignalArgsForCall []struct { arg1 string arg2 string - arg3 bool + arg3 string + arg4 bool } startParticipantSignalReturns struct { result1 routing.MessageSink @@ -751,20 +752,21 @@ func (fake *FakeRouter) StartReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 string, arg3 bool) (routing.MessageSink, routing.MessageSource, error) { +func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 string, arg3 string, arg4 bool) (routing.MessageSink, routing.MessageSource, error) { fake.startParticipantSignalMutex.Lock() ret, specificReturn := fake.startParticipantSignalReturnsOnCall[len(fake.startParticipantSignalArgsForCall)] fake.startParticipantSignalArgsForCall = append(fake.startParticipantSignalArgsForCall, struct { arg1 string arg2 string - arg3 bool - }{arg1, arg2, arg3}) + arg3 string + arg4 bool + }{arg1, arg2, arg3, arg4}) stub := fake.StartParticipantSignalStub fakeReturns := fake.startParticipantSignalReturns - fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2, arg3}) + fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2, arg3, arg4}) fake.startParticipantSignalMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3) + return stub(arg1, arg2, arg3, arg4) } if specificReturn { return ret.result1, ret.result2, ret.result3 @@ -778,17 +780,17 @@ func (fake *FakeRouter) StartParticipantSignalCallCount() int { return len(fake.startParticipantSignalArgsForCall) } -func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, string, bool) (routing.MessageSink, routing.MessageSource, error)) { +func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, string, string, bool) (routing.MessageSink, routing.MessageSource, error)) { fake.startParticipantSignalMutex.Lock() defer fake.startParticipantSignalMutex.Unlock() fake.StartParticipantSignalStub = stub } -func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, string, bool) { +func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, string, string, bool) { fake.startParticipantSignalMutex.RLock() defer fake.startParticipantSignalMutex.RUnlock() argsForCall := fake.startParticipantSignalArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } func (fake *FakeRouter) StartParticipantSignalReturns(result1 routing.MessageSink, result2 routing.MessageSource, result3 error) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 0afc5b6a0..9592c05b8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1,6 +1,7 @@ package rtc import ( + "encoding/json" "io" "sync" "sync/atomic" @@ -39,8 +40,10 @@ type ParticipantImpl struct { isClosed utils.AtomicFlag mediaEngine *webrtc.MediaEngine identity string - state atomic.Value // livekit.ParticipantInfo_State - rtcpCh chan []rtcp.Packet + // JSON encoded metadata to pass to clients + metadata string + state atomic.Value // livekit.ParticipantInfo_State + rtcpCh chan []rtcp.Packet // tracks the current participant is subscribed to, map of otherParticipantId => []DownTrack subscribedTracks map[string][]types.SubscribedTrack // publishedTracks that participant is publishing @@ -162,6 +165,22 @@ func (p *ParticipantImpl) IsReady() bool { return state == livekit.ParticipantInfo_JOINED || state == livekit.ParticipantInfo_ACTIVE } +// attach metadata to the participant +func (p *ParticipantImpl) SetMetadata(metadata map[string]interface{}) error { + if metadata == nil { + p.metadata = "" + return nil + } + + if data, err := json.Marshal(metadata); err != nil { + return err + } else { + p.metadata = string(data) + } + + return nil +} + func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet { return p.rtcpCh } @@ -170,6 +189,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { info := &livekit.ParticipantInfo{ Sid: p.id, Identity: p.identity, + Metadata: p.metadata, State: p.State(), } @@ -433,6 +453,7 @@ func (p *ParticipantImpl) SendParticipantUpdate(participants []*livekit.Particip if !p.IsReady() { return nil } + return p.responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Update{ Update: &livekit.ParticipantUpdate{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 4f5ae63f6..35d07e213 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -181,7 +181,7 @@ func (r *Room) RemoveParticipant(identity string) { if r.onParticipantChanged != nil { r.onParticipantChanged(p) } - go r.broadcastParticipantState(p) + r.broadcastParticipantState(p) } } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 382a06d70..c662e9b4a 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -9,6 +9,7 @@ import ( "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" "github.com/livekit/livekit-server/proto/livekit" ) @@ -93,15 +94,20 @@ func TestRoomJoin(t *testing.T) { t.Run("participant state change is broadcasted to others", func(t *testing.T) { rm := newRoomWithParticipants(t, numParticipants) + var changedParticipant types.Participant + rm.OnParticipantChanged(func(participant types.Participant) { + changedParticipant = participant + }) participants := rm.GetParticipants() p := participants[0].(*typesfakes.FakeParticipant) disconnectedParticipant := participants[1].(*typesfakes.FakeParticipant) disconnectedParticipant.StateReturns(livekit.ParticipantInfo_DISCONNECTED) rm.RemoveParticipant(p.Identity()) - p.OnStateChangeArgsForCall(0)(p, livekit.ParticipantInfo_ACTIVE) time.Sleep(defaultDelay) + assert.Equal(t, p, changedParticipant) + numUpdates := 0 for _, op := range participants { if op == p || op == disconnectedParticipant { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 78ac021b3..37dc5aa5f 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -52,6 +52,7 @@ type Participant interface { IsReady() bool ToProto() *livekit.ParticipantInfo RTCPChan() chan []rtcp.Packet + SetMetadata(metadata map[string]interface{}) error GetResponseSink() routing.MessageSink SetResponseSink(sink routing.MessageSink) diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index acedc6451..7579f7548 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -204,6 +204,17 @@ type FakeParticipant struct { sendParticipantUpdateReturnsOnCall map[int]struct { result1 error } + SetMetadataStub func(map[string]interface{}) error + setMetadataMutex sync.RWMutex + setMetadataArgsForCall []struct { + arg1 map[string]interface{} + } + setMetadataReturns struct { + result1 error + } + setMetadataReturnsOnCall map[int]struct { + result1 error + } SetResponseSinkStub func(routing.MessageSink) setResponseSinkMutex sync.RWMutex setResponseSinkArgsForCall []struct { @@ -1310,6 +1321,67 @@ func (fake *FakeParticipant) SendParticipantUpdateReturnsOnCall(i int, result1 e }{result1} } +func (fake *FakeParticipant) SetMetadata(arg1 map[string]interface{}) error { + fake.setMetadataMutex.Lock() + ret, specificReturn := fake.setMetadataReturnsOnCall[len(fake.setMetadataArgsForCall)] + fake.setMetadataArgsForCall = append(fake.setMetadataArgsForCall, struct { + arg1 map[string]interface{} + }{arg1}) + stub := fake.SetMetadataStub + fakeReturns := fake.setMetadataReturns + fake.recordInvocation("SetMetadata", []interface{}{arg1}) + fake.setMetadataMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) SetMetadataCallCount() int { + fake.setMetadataMutex.RLock() + defer fake.setMetadataMutex.RUnlock() + return len(fake.setMetadataArgsForCall) +} + +func (fake *FakeParticipant) SetMetadataCalls(stub func(map[string]interface{}) error) { + fake.setMetadataMutex.Lock() + defer fake.setMetadataMutex.Unlock() + fake.SetMetadataStub = stub +} + +func (fake *FakeParticipant) SetMetadataArgsForCall(i int) map[string]interface{} { + fake.setMetadataMutex.RLock() + defer fake.setMetadataMutex.RUnlock() + argsForCall := fake.setMetadataArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeParticipant) SetMetadataReturns(result1 error) { + fake.setMetadataMutex.Lock() + defer fake.setMetadataMutex.Unlock() + fake.SetMetadataStub = nil + fake.setMetadataReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeParticipant) SetMetadataReturnsOnCall(i int, result1 error) { + fake.setMetadataMutex.Lock() + defer fake.setMetadataMutex.Unlock() + fake.SetMetadataStub = nil + if fake.setMetadataReturnsOnCall == nil { + fake.setMetadataReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.setMetadataReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeParticipant) SetResponseSink(arg1 routing.MessageSink) { fake.setResponseSinkMutex.Lock() fake.setResponseSinkArgsForCall = append(fake.setResponseSinkArgsForCall, struct { @@ -1554,6 +1626,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.sendJoinResponseMutex.RUnlock() fake.sendParticipantUpdateMutex.RLock() defer fake.sendParticipantUpdateMutex.RUnlock() + fake.setMetadataMutex.RLock() + defer fake.setMetadataMutex.RUnlock() fake.setResponseSinkMutex.RLock() defer fake.setResponseSinkMutex.RUnlock() fake.setTrackMutedMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 9910eb4c3..d32348e59 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -1,6 +1,7 @@ package service import ( + "encoding/json" "sync" "time" @@ -138,7 +139,7 @@ func (r *RoomManager) CloseIdleRooms() { } // starts WebRTC session when a new participant is connected, takes place on RTC node -func (r *RoomManager) StartSession(roomName, identity string, reconnect bool, requestSource routing.MessageSource, responseSink routing.MessageSink) { +func (r *RoomManager) StartSession(roomName, identity, metadata string, reconnect bool, requestSource routing.MessageSource, responseSink routing.MessageSink) { room, err := r.getOrCreateRoom(roomName) if err != nil { logger.Errorw("could not create room", "error", err) @@ -161,6 +162,7 @@ func (r *RoomManager) StartSession(roomName, identity string, reconnect bool, re prevSink.Close() } participant.SetResponseSink(responseSink) + return } else { // we need to clean up the existing participant, so a new one can join @@ -186,6 +188,12 @@ func (r *RoomManager) StartSession(roomName, identity string, reconnect bool, re logger.Errorw("could not create participant", "error", err) return } + if metadata != "" { + var md map[string]interface{} + if err := json.Unmarshal([]byte(metadata), &md); err == nil { + participant.SetMetadata(md) + } + } // join room if err := room.Join(participant); err != nil { diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 898635397..1e0f927fd 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -1,6 +1,7 @@ package service import ( + "encoding/json" "fmt" "io" "net/http" @@ -73,8 +74,17 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + var metadata string + if claims.Metadata != nil { + if data, err := json.Marshal(claims.Metadata); err != nil { + logger.Warnw("unable to encode metadata", "error", err) + } else { + metadata = string(data) + } + } + // this needs to be started first *before* using router functions on this node - reqSink, resSource, err := s.router.StartParticipantSignal(roomName, identity, isReconnect) + reqSink, resSource, err := s.router.StartParticipantSignal(roomName, identity, metadata, isReconnect) if err != nil { handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error()) return diff --git a/proto/internal.proto b/proto/internal.proto index 2c6385d7e..0be6d0dae 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -52,6 +52,8 @@ message StartSession { string connection_id = 3; // if a client is reconnecting (i.e. resume instead of restart) bool reconnect = 4; + // metadata to pass to participant + string metadata = 5; } message EndSession { diff --git a/proto/livekit/internal.pb.go b/proto/livekit/internal.pb.go index 61c956365..b799b941d 100644 --- a/proto/livekit/internal.pb.go +++ b/proto/livekit/internal.pb.go @@ -402,6 +402,8 @@ type StartSession struct { ConnectionId string `protobuf:"bytes,3,opt,name=connection_id,json=connectionId,proto3" json:"connection_id,omitempty"` // if a client is reconnecting (i.e. resume instead of restart) Reconnect bool `protobuf:"varint,4,opt,name=reconnect,proto3" json:"reconnect,omitempty"` + // metadata to pass to participant + Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` } func (x *StartSession) Reset() { @@ -464,6 +466,13 @@ func (x *StartSession) GetReconnect() bool { return false } +func (x *StartSession) GetMetadata() string { + if x != nil { + return x.Metadata + } + return "" +} + type EndSession struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -607,7 +616,7 @@ var file_internal_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x65, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x22, 0x8a, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, + 0x67, 0x65, 0x22, 0xa6, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x6d, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, @@ -615,16 +624,17 @@ var file_internal_proto_rawDesc = []byte{ 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x22, - 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, - 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, - 0x6e, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, - 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, + 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x0c, 0x0a, 0x0a, 0x45, + 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, 0x11, 0x52, 0x65, 0x6d, + 0x6f, 0x76, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x12, 0x25, + 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, + 0x61, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/livekit/model.pb.go b/proto/livekit/model.pb.go index 1c457d397..2916bbe98 100644 --- a/proto/livekit/model.pb.go +++ b/proto/livekit/model.pb.go @@ -218,6 +218,7 @@ type ParticipantInfo struct { Identity string `protobuf:"bytes,2,opt,name=identity,proto3" json:"identity,omitempty"` State ParticipantInfo_State `protobuf:"varint,3,opt,name=state,proto3,enum=livekit.ParticipantInfo_State" json:"state,omitempty"` Tracks []*TrackInfo `protobuf:"bytes,4,rep,name=tracks,proto3" json:"tracks,omitempty"` + Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` } func (x *ParticipantInfo) Reset() { @@ -280,6 +281,13 @@ func (x *ParticipantInfo) GetTracks() []*TrackInfo { return nil } +func (x *ParticipantInfo) GetMetadata() string { + if x != nil { + return x.Metadata + } + return "" +} + type TrackInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -445,7 +453,7 @@ var file_model_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x6d, 0x61, 0x78, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x72, - 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xe1, 0x01, 0x0a, 0x0f, 0x50, + 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xfd, 0x01, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, @@ -455,29 +463,31 @@ var file_model_proto_rawDesc = []byte{ 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, 0x61, - 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x22, 0x3e, - 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x49, 0x4e, 0x49, - 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4a, 0x4f, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x01, - 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, - 0x44, 0x49, 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x22, 0x6f, - 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x26, 0x0a, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6c, 0x69, - 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x52, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x75, 0x74, - 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x22, - 0x46, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, - 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, - 0x74, 0x65, 0x78, 0x74, 0x12, 0x18, 0x0a, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x42, 0x07, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x2a, 0x2b, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x55, 0x44, 0x49, 0x4f, 0x10, 0x00, 0x12, - 0x09, 0x0a, 0x05, 0x56, 0x49, 0x44, 0x45, 0x4f, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x41, - 0x54, 0x41, 0x10, 0x02, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, - 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x73, 0x12, 0x1a, + 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3e, 0x0a, 0x05, 0x53, 0x74, + 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x49, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x00, + 0x12, 0x0a, 0x0a, 0x06, 0x4a, 0x4f, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, + 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x49, 0x53, 0x43, + 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x22, 0x6f, 0x0a, 0x09, 0x54, 0x72, + 0x61, 0x63, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x04, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, + 0x74, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x6d, 0x75, 0x74, 0x65, 0x64, 0x22, 0x46, 0x0a, 0x0b, 0x44, + 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x04, 0x74, 0x65, + 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, + 0x12, 0x18, 0x0a, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x48, 0x00, 0x52, 0x06, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x2a, 0x2b, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x09, 0x0a, 0x05, 0x41, 0x55, 0x44, 0x49, 0x4f, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, + 0x49, 0x44, 0x45, 0x4f, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x41, 0x54, 0x41, 0x10, 0x02, + 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/model.proto b/proto/model.proto index 922983d86..e21a5e8bb 100644 --- a/proto/model.proto +++ b/proto/model.proto @@ -26,6 +26,7 @@ message ParticipantInfo { string identity = 2; State state = 3; repeated TrackInfo tracks = 4; + string metadata = 5; } enum TrackType {