Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-10-23 12:00:54 +05:30
18 changed files with 402 additions and 144 deletions
+4 -4
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -70,14 +70,14 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/nats-io/nats.go v1.30.2 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
@@ -101,7 +101,7 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+8 -12
View File
@@ -107,8 +107,8 @@ github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa79
github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs=
github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU=
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y=
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I=
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -158,13 +158,10 @@ github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/
github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw=
github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -403,7 +400,6 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
@@ -416,8 +412,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+15
View File
@@ -71,6 +71,7 @@ type Config struct {
Keys map[string]string `yaml:"keys,omitempty"`
Region string `yaml:"region,omitempty"`
SignalRelay SignalRelayConfig `yaml:"signal_relay,omitempty"`
PSRPC PSRPCConfig `yaml:"psrpc,omitempty"`
// LogLevel is deprecated
LogLevel string `yaml:"log_level,omitempty"`
Logging LoggingConfig `yaml:"logging,omitempty"`
@@ -269,6 +270,14 @@ type SignalRelayConfig struct {
StreamBufferSize int `yaml:"stream_buffer_size,omitempty"`
}
type PSRPCConfig struct {
Enable bool `yaml:"enable,omitempty"`
MaxAttempts int `yaml:"retry_attempts,omitempty"`
Timeout time.Duration `yaml:"retry_timeout,omitempty"`
Backoff time.Duration `yaml:"retry_backoff,omitempty"`
BufferSize int `yaml:"stream_buffer_size,omitempty"`
}
// RegionConfig lists available regions and their latitude/longitude, so the selector would prefer
// regions that are closer
type RegionConfig struct {
@@ -484,6 +493,12 @@ var DefaultConfig = Config{
MaxRetryInterval: 4 * time.Second,
StreamBufferSize: 1000,
},
PSRPC: PSRPCConfig{
MaxAttempts: 3,
Timeout: 500 * time.Millisecond,
Backoff: 500 * time.Millisecond,
BufferSize: 1000,
},
Keys: map[string]string{},
}
+27
View File
@@ -0,0 +1,27 @@
package routing
import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protopsrpc "github.com/livekit/protocol/psrpc"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/pkg/middleware"
)
func NewRoomClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.PSRPCConfig) (rpc.TypedRoomClient, error) {
return rpc.NewTypedRoomClient(
nodeID,
bus,
protopsrpc.WithClientLogger(logger.GetLogger()),
middleware.WithClientMetrics(prometheus.PSRPCMetricsObserver{}),
psrpc.WithClientChannelSize(config.BufferSize),
middleware.WithRPCRetries(middleware.RetryOptions{
MaxAttempts: config.MaxAttempts,
Timeout: config.Timeout,
Backoff: config.Backoff,
}),
)
}
+22
View File
@@ -0,0 +1,22 @@
package routing
import (
"context"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)
type topicFormatter struct{}
func NewTopicFormatter() rpc.TopicFormatter {
return topicFormatter{}
}
func (f topicFormatter) ParticipantTopic(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) rpc.ParticipantTopic {
return rpc.FormatParticipantTopic(roomName, identity)
}
func (f topicFormatter) RoomTopic(ctx context.Context, roomName livekit.RoomName) rpc.RoomTopic {
return rpc.FormatRoomTopic(roomName)
}
+1
View File
@@ -27,6 +27,7 @@ var (
ErrEmptyIdentity = errors.New("participant identity cannot be empty")
ErrEmptyParticipantID = errors.New("participant ID cannot be empty")
ErrMissingGrants = errors.New("VideoGrant is missing")
ErrInternalError = errors.New("internal error")
// Track subscription related
ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track")
+2 -1
View File
@@ -64,8 +64,9 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
}
}
p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) {
p.SetTrackMutedCalls(func(sid livekit.TrackID, muted bool, fromServer bool) *livekit.TrackInfo {
updateTrack()
return nil
})
p.AddTrackCalls(func(req *livekit.AddTrackRequest) {
updateTrack()
+5 -3
View File
@@ -1670,16 +1670,16 @@ func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo)
})
}
func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) {
func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo {
// when request is coming from admin, send message to current participant
if fromAdmin {
p.sendTrackMuted(trackID, muted)
}
p.setTrackMuted(trackID, muted)
return p.setTrackMuted(trackID, muted)
}
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) *livekit.TrackInfo {
p.dirty.Store(true)
p.supervisor.SetPublicationMute(trackID, muted)
@@ -1713,6 +1713,8 @@ func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
if !isPending && track == nil {
p.pubLogger.Warnw("could not locate track", nil, "trackID", trackID)
}
return trackInfo
}
func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) (*MediaTrack, bool) {
+5 -2
View File
@@ -87,6 +87,7 @@ const (
ParticipantCloseReasonVerifyFailed
ParticipantCloseReasonJoinFailed
ParticipantCloseReasonJoinTimeout
ParticipantCloseReasonMessageBusFailed
ParticipantCloseReasonStateDisconnected
ParticipantCloseReasonPeerConnectionDisconnected
ParticipantCloseReasonDuplicateIdentity
@@ -119,6 +120,8 @@ func (p ParticipantCloseReason) String() string {
return "JOIN_FAILED"
case ParticipantCloseReasonJoinTimeout:
return "JOIN_TIMEOUT"
case ParticipantCloseReasonMessageBusFailed:
return "MESSAGE_BUS_FAILED"
case ParticipantCloseReasonStateDisconnected:
return "STATE_DISCONNECTED"
case ParticipantCloseReasonPeerConnectionDisconnected:
@@ -162,7 +165,7 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason {
return livekit.DisconnectReason_CLIENT_INITIATED
case ParticipantCloseReasonRoomManagerStop:
return livekit.DisconnectReason_SERVER_SHUTDOWN
case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout:
case ParticipantCloseReasonVerifyFailed, ParticipantCloseReasonJoinFailed, ParticipantCloseReasonJoinTimeout, ParticipantCloseReasonMessageBusFailed:
// expected to be connected but is not
return livekit.DisconnectReason_JOIN_FAILURE
case ParticipantCloseReasonPeerConnectionDisconnected:
@@ -335,7 +338,7 @@ type LocalParticipant interface {
AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget)
HandleOffer(sdp webrtc.SessionDescription)
AddTrack(req *livekit.AddTrackRequest)
SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool)
SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo
HandleAnswer(sdp webrtc.SessionDescription)
Negotiate(force bool)
@@ -738,13 +738,19 @@ type FakeLocalParticipant struct {
setSubscriberChannelCapacityArgsForCall []struct {
arg1 int64
}
SetTrackMutedStub func(livekit.TrackID, bool, bool)
SetTrackMutedStub func(livekit.TrackID, bool, bool) *livekit.TrackInfo
setTrackMutedMutex sync.RWMutex
setTrackMutedArgsForCall []struct {
arg1 livekit.TrackID
arg2 bool
arg3 bool
}
setTrackMutedReturns struct {
result1 *livekit.TrackInfo
}
setTrackMutedReturnsOnCall map[int]struct {
result1 *livekit.TrackInfo
}
StartStub func()
startMutex sync.RWMutex
startArgsForCall []struct {
@@ -4877,19 +4883,25 @@ func (fake *FakeLocalParticipant) SetSubscriberChannelCapacityArgsForCall(i int)
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) {
func (fake *FakeLocalParticipant) SetTrackMuted(arg1 livekit.TrackID, arg2 bool, arg3 bool) *livekit.TrackInfo {
fake.setTrackMutedMutex.Lock()
ret, specificReturn := fake.setTrackMutedReturnsOnCall[len(fake.setTrackMutedArgsForCall)]
fake.setTrackMutedArgsForCall = append(fake.setTrackMutedArgsForCall, struct {
arg1 livekit.TrackID
arg2 bool
arg3 bool
}{arg1, arg2, arg3})
stub := fake.SetTrackMutedStub
fakeReturns := fake.setTrackMutedReturns
fake.recordInvocation("SetTrackMuted", []interface{}{arg1, arg2, arg3})
fake.setTrackMutedMutex.Unlock()
if stub != nil {
fake.SetTrackMutedStub(arg1, arg2, arg3)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int {
@@ -4898,7 +4910,7 @@ func (fake *FakeLocalParticipant) SetTrackMutedCallCount() int {
return len(fake.setTrackMutedArgsForCall)
}
func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool)) {
func (fake *FakeLocalParticipant) SetTrackMutedCalls(stub func(livekit.TrackID, bool, bool) *livekit.TrackInfo) {
fake.setTrackMutedMutex.Lock()
defer fake.setTrackMutedMutex.Unlock()
fake.SetTrackMutedStub = stub
@@ -4911,6 +4923,29 @@ func (fake *FakeLocalParticipant) SetTrackMutedArgsForCall(i int) (livekit.Track
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) SetTrackMutedReturns(result1 *livekit.TrackInfo) {
fake.setTrackMutedMutex.Lock()
defer fake.setTrackMutedMutex.Unlock()
fake.SetTrackMutedStub = nil
fake.setTrackMutedReturns = struct {
result1 *livekit.TrackInfo
}{result1}
}
func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *livekit.TrackInfo) {
fake.setTrackMutedMutex.Lock()
defer fake.setTrackMutedMutex.Unlock()
fake.SetTrackMutedStub = nil
if fake.setTrackMutedReturnsOnCall == nil {
fake.setTrackMutedReturnsOnCall = make(map[int]struct {
result1 *livekit.TrackInfo
})
}
fake.setTrackMutedReturnsOnCall[i] = struct {
result1 *livekit.TrackInfo
}{result1}
}
func (fake *FakeLocalParticipant) Start() {
fake.startMutex.Lock()
fake.startArgsForCall = append(fake.startArgsForCall, struct {
+15 -14
View File
@@ -19,18 +19,19 @@ import (
)
var (
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist")
ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)")
ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty")
ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)")
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist")
ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified")
ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits")
ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed")
ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist")
ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist")
ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room")
ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match")
ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found")
ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress does not exist")
ErrEgressNotConnected = psrpc.NewErrorf(psrpc.Internal, "egress not connected (redis required)")
ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty")
ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)")
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist")
ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified")
ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits")
ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed")
ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist")
ErrRoomNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested room does not exist")
ErrRoomLockFailed = psrpc.NewErrorf(psrpc.Internal, "could not lock room")
ErrRoomUnlockFailed = psrpc.NewErrorf(psrpc.Internal, "could not unlock room, lock token does not match")
ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled")
ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found")
ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks")
)
+1 -1
View File
@@ -70,7 +70,7 @@ type RedisStore struct {
func NewRedisStore(rc redis.UniversalClient) *RedisStore {
unlockScript := `if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else return 0
else return 0
end`
return &RedisStore{
+166 -91
View File
@@ -22,6 +22,7 @@ import (
"time"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/version"
@@ -29,7 +30,9 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/clientconfiguration"
"github.com/livekit/livekit-server/pkg/config"
@@ -67,6 +70,7 @@ type RoomManager struct {
egressLauncher rtc.EgressLauncher
versionGenerator utils.TimedVersionGenerator
turnAuthHandler *TURNAuthHandler
roomServer rpc.TypedRoomServer
rooms map[livekit.RoomName]*rtc.Room
@@ -83,6 +87,7 @@ func NewLocalRoomManager(
egressLauncher rtc.EgressLauncher,
versionGenerator utils.TimedVersionGenerator,
turnAuthHandler *TURNAuthHandler,
bus psrpc.MessageBus,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf)
if err != nil {
@@ -114,6 +119,11 @@ func NewLocalRoomManager(
},
}
r.roomServer, err = rpc.NewTypedRoomServer(livekit.NodeID(r.currentNode.Id), r, bus)
if err != nil {
return nil, err
}
// hook up to router
router.OnNewParticipantRTC(r.StartSession)
router.OnRTCMessage(r.handleRTCMessage)
@@ -126,8 +136,8 @@ func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc
return r.rooms[roomName]
}
// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info
func (r *RoomManager) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error {
// deleteRoom completely deletes all room information, including active sessions, room store, and routing info
func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error {
logger.Infow("deleting room state", "room", roomName)
r.lock.Lock()
delete(r.rooms, roomName)
@@ -167,7 +177,7 @@ func (r *RoomManager) CleanupRooms() error {
now := time.Now().Unix()
for _, room := range rooms {
if (now - room.CreationTime) > roomPurgeSeconds {
if err := r.DeleteRoom(ctx, livekit.RoomName(room.Name)); err != nil {
if err := r.deleteRoom(ctx, livekit.RoomName(room.Name)); err != nil {
return err
}
}
@@ -177,10 +187,7 @@ func (r *RoomManager) CleanupRooms() error {
func (r *RoomManager) CloseIdleRooms() {
r.lock.RLock()
rooms := make([]*rtc.Room, 0, len(r.rooms))
for _, rm := range r.rooms {
rooms = append(rooms, rm)
}
rooms := maps.Values(r.rooms)
r.lock.RUnlock()
for _, room := range rooms {
@@ -203,10 +210,7 @@ func (r *RoomManager) HasParticipants() bool {
func (r *RoomManager) Stop() {
// disconnect all clients
r.lock.RLock()
rooms := make([]*rtc.Room, 0, len(r.rooms))
for _, rm := range r.rooms {
rooms = append(rooms, rm)
}
rooms := maps.Values(r.rooms)
r.lock.RUnlock()
for _, room := range rooms {
@@ -216,6 +220,8 @@ func (r *RoomManager) Stop() {
room.Close()
}
r.roomServer.Kill()
if r.rtcConfig != nil {
if r.rtcConfig.UDPMux != nil {
_ = r.rtcConfig.UDPMux.Close()
@@ -426,6 +432,11 @@ func (r *RoomManager) StartSession(
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
}
if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil {
pLogger.Errorw("could not join register participant topic", err)
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
return err
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
pLogger.Errorw("could not store participant", err)
}
@@ -449,6 +460,8 @@ func (r *RoomManager) StartSession(
pLogger.Errorw("could not delete participant", err)
}
r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity()))
// update room store with new numParticipants
proto := room.ToProto()
persistRoomForParticipantCount(proto)
@@ -489,6 +502,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
return nil, err
}
if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil {
return nil, err
}
r.lock.Lock()
currentRoom := r.rooms[roomName]
@@ -507,10 +524,12 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher)
newRoom.OnClose(func() {
r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName))
roomInfo := newRoom.ToProto()
r.telemetry.RoomEnded(ctx, roomInfo)
prometheus.RoomEnded(time.Unix(roomInfo.CreationTime, 0))
if err := r.DeleteRoom(ctx, roomName); err != nil {
if err := r.deleteRoom(ctx, roomName); err != nil {
newRoom.Logger.Errorw("could not delete room", err)
}
@@ -602,96 +621,152 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
// handles RTC messages resulted from Room API calls
func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
if room == nil {
if _, ok := msg.Message.(*livekit.RTCNodeMessage_DeleteRoom); ok {
// special case of a non-RTC room e.g. room created but no participants joined
logger.Debugw("Deleting non-rtc room, loading from roomstore")
err := r.roomStore.DeleteRoom(ctx, roomName)
if err != nil {
logger.Debugw("Error deleting non-rtc room", "err", err)
}
return
} else {
logger.Warnw("Could not find room", nil, "room", roomName)
return
}
}
participant := room.GetParticipant(identity)
var sid livekit.ParticipantID
if participant != nil {
sid = participant.ID()
}
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetLogger(), roomName, room.ID()),
identity,
sid,
false,
)
switch rm := msg.Message.(type) {
case *livekit.RTCNodeMessage_RemoveParticipant:
if participant == nil {
return
}
pLogger.Infow("removing participant")
// remove participant by identity, any SID
room.RemoveParticipant(identity, "", types.ParticipantCloseReasonServiceRequestRemoveParticipant)
r.RemoveParticipant(ctx, rm.RemoveParticipant)
case *livekit.RTCNodeMessage_MuteTrack:
if participant == nil {
return
}
pLogger.Debugw("setting track muted",
"trackID", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted)
if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute {
pLogger.Errorw("cannot unmute track, remote unmute is disabled", nil)
return
}
participant.SetTrackMuted(livekit.TrackID(rm.MuteTrack.TrackSid), rm.MuteTrack.Muted, true)
r.MutePublishedTrack(ctx, rm.MuteTrack)
case *livekit.RTCNodeMessage_UpdateParticipant:
if participant == nil {
return
}
pLogger.Debugw("updating participant", "metadata", rm.UpdateParticipant.Metadata,
"permission", rm.UpdateParticipant.Permission)
room.UpdateParticipantMetadata(participant, rm.UpdateParticipant.Name, rm.UpdateParticipant.Metadata)
if rm.UpdateParticipant.Permission != nil {
participant.SetPermission(rm.UpdateParticipant.Permission)
}
r.UpdateParticipant(ctx, rm.UpdateParticipant)
case *livekit.RTCNodeMessage_DeleteRoom:
r.DeleteRoom(ctx, rm.DeleteRoom)
case *livekit.RTCNodeMessage_UpdateSubscriptions:
r.UpdateSubscriptions(ctx, rm.UpdateSubscriptions)
case *livekit.RTCNodeMessage_SendData:
r.SendData(ctx, rm.SendData)
case *livekit.RTCNodeMessage_UpdateRoomMetadata:
r.UpdateRoomMetadata(ctx, rm.UpdateRoomMetadata)
}
}
func (r *RoomManager) roomLogger(room *rtc.Room) logger.Logger {
return rtc.LoggerWithParticipant(rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), "", "", false)
}
func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
}
participant.GetLogger().Infow("removing participant")
room.RemoveParticipant(livekit.ParticipantIdentity(req.Identity), "", types.ParticipantCloseReasonServiceRequestRemoveParticipant)
return &livekit.RemoveParticipantResponse{}, nil
}
func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
}
participant.GetLogger().Debugw("setting track muted",
"trackID", req.TrackSid, "muted", req.Muted)
if !req.Muted && !r.config.Room.EnableRemoteUnmute {
participant.GetLogger().Errorw("cannot unmute track, remote unmute is disabled", nil)
return nil, ErrRemoteUnmuteNoteEnabled
}
track := participant.SetTrackMuted(livekit.TrackID(req.TrackSid), req.Muted, true)
return &livekit.MuteRoomTrackResponse{Track: track}, nil
}
func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
}
participant.GetLogger().Debugw("updating participant",
"metadata", req.Metadata, "permission", req.Permission)
room.UpdateParticipantMetadata(participant, req.Name, req.Metadata)
if req.Permission != nil {
participant.SetPermission(req.Permission)
}
return participant.ToProto(), nil
}
func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
// special case of a non-RTC room e.g. room created but no participants joined
logger.Debugw("Deleting non-rtc room, loading from roomstore")
err := r.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room))
if err != nil {
logger.Debugw("Error deleting non-rtc room", "err", err)
return nil, err
}
} else {
room.Logger.Infow("deleting room")
for _, p := range room.GetParticipants() {
_ = p.Close(true, types.ParticipantCloseReasonServiceRequestDeleteRoom, false)
}
room.Close()
case *livekit.RTCNodeMessage_UpdateSubscriptions:
if participant == nil {
return
}
pLogger.Debugw("updating participant subscriptions")
room.UpdateSubscriptions(
participant,
livekit.StringsAsIDs[livekit.TrackID](rm.UpdateSubscriptions.TrackSids),
rm.UpdateSubscriptions.ParticipantTracks,
rm.UpdateSubscriptions.Subscribe,
)
case *livekit.RTCNodeMessage_SendData:
pLogger.Debugw("api send data", "size", len(rm.SendData.Data))
up := &livekit.UserPacket{
Payload: rm.SendData.Data,
DestinationSids: rm.SendData.DestinationSids,
DestinationIdentities: rm.SendData.DestinationIdentities,
Topic: rm.SendData.Topic,
}
room.SendDataPacket(up, rm.SendData.Kind)
case *livekit.RTCNodeMessage_UpdateRoomMetadata:
pLogger.Debugw("updating room")
room.SetMetadata(rm.UpdateRoomMetadata.Metadata)
}
return &livekit.DeleteRoomResponse{}, nil
}
func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
if participant == nil {
return nil, ErrParticipantNotFound
}
participant.GetLogger().Debugw("updating participant subscriptions")
room.UpdateSubscriptions(
participant,
livekit.StringsAsIDs[livekit.TrackID](req.TrackSids),
req.ParticipantTracks,
req.Subscribe,
)
return &livekit.UpdateSubscriptionsResponse{}, nil
}
func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
r.roomLogger(room).Debugw("api send data", "size", len(req.Data))
up := &livekit.UserPacket{
Payload: req.Data,
DestinationSids: req.DestinationSids,
DestinationIdentities: req.DestinationIdentities,
Topic: req.Topic,
}
room.SendDataPacket(up, req.Kind)
return &livekit.SendDataResponse{}, nil
}
func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
r.roomLogger(room).Debugw("updating room")
room.SetMetadata(req.Metadata)
return room.ToProto(), nil
}
func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer {
+60 -7
View File
@@ -37,27 +37,36 @@ import (
type RoomService struct {
roomConf config.RoomConfig
apiConf config.APIConfig
psrpcConf config.PSRPCConfig
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore ServiceStore
egressLauncher rtc.EgressLauncher
topicFormatter rpc.TopicFormatter
roomClient rpc.TypedRoomClient
}
func NewRoomService(
roomConf config.RoomConfig,
apiConf config.APIConfig,
psrpcConf config.PSRPCConfig,
router routing.MessageRouter,
roomAllocator RoomAllocator,
serviceStore ServiceStore,
egressLauncher rtc.EgressLauncher,
topicFormatter rpc.TopicFormatter,
roomClient rpc.TypedRoomClient,
) (svc *RoomService, err error) {
svc = &RoomService{
roomConf: roomConf,
apiConf: apiConf,
psrpcConf: psrpcConf,
router: router,
roomAllocator: roomAllocator,
roomStore: serviceStore,
egressLauncher: egressLauncher,
topicFormatter: topicFormatter,
roomClient: roomClient,
}
return
}
@@ -142,6 +151,10 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
}
if _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false); err == ErrRoomNotFound {
return nil, twirp.NotFoundError("room not found")
}
@@ -207,10 +220,18 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) {
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
}
if _, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)); err == ErrParticipantNotFound {
return nil, twirp.NotFoundError("participant not found")
}
if s.psrpcConf.Enable {
return s.roomClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_RemoveParticipant{
RemoveParticipant: req,
@@ -243,6 +264,10 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
return s.roomClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_MuteTrack{
MuteTrack: req,
@@ -289,6 +314,14 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
return nil, twirp.InvalidArgumentError(ErrMetadataExceedsLimits.Error(), strconv.Itoa(maxMetadataSize))
}
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
return s.roomClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateParticipant{
UpdateParticipant: req,
@@ -329,6 +362,15 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
trackSIDs = append(trackSIDs, pt.TrackSids...)
}
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity, "trackID", trackSIDs)
if err := EnsureAdminPermission(ctx, livekit.RoomName(req.Room)); err != nil {
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
return s.roomClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
}
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateSubscriptions{
UpdateSubscriptions: req,
@@ -348,6 +390,10 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest
return nil, twirpAuthError(err)
}
if s.psrpcConf.Enable {
return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
}
err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_SendData{
SendData: req,
@@ -386,13 +432,20 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, err
}
err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
UpdateRoomMetadata: req,
},
})
if err != nil {
return nil, err
if s.psrpcConf.Enable {
_, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
if err != nil {
return nil, err
}
} else {
err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
UpdateRoomMetadata: req,
},
})
if err != nil {
return nil, err
}
}
err = s.confirmExecution(func() error {
+12 -2
View File
@@ -23,8 +23,10 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc/rpcfakes"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/routingfakes"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/livekit-server/pkg/service/servicefakes"
@@ -126,9 +128,17 @@ func newTestRoomService(conf config.RoomConfig) *TestRoomService {
router := &routingfakes.FakeRouter{}
allocator := &servicefakes.FakeRoomAllocator{}
store := &servicefakes.FakeServiceStore{}
svc, err := service.NewRoomService(conf,
svc, err := service.NewRoomService(
conf,
config.APIConfig{ExecutionTimeout: 2},
router, allocator, store, nil)
config.PSRPCConfig{},
router,
allocator,
store,
nil,
routing.NewTopicFormatter(),
&rpcfakes.FakeTypedRoomClient{},
)
if err != nil {
panic(err)
}
+7
View File
@@ -73,6 +73,9 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getSignalRelayConfig,
NewDefaultSignalServer,
routing.NewSignalClient,
getPSRPCConfig,
routing.NewTopicFormatter,
routing.NewRoomClient,
NewLocalRoomManager,
NewTURNAuthHandler,
getTURNAuthHandlerFunc,
@@ -197,6 +200,10 @@ func getSignalRelayConfig(config *config.Config) config.SignalRelayConfig {
return config.SignalRelay
}
func getPSRPCConfig(config *config.Config) config.PSRPCConfig {
return config.PSRPC
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
+12 -2
View File
@@ -35,6 +35,7 @@ import (
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
roomConfig := getRoomConf(conf)
apiConfig := config.DefaultAPIConfig()
psrpcConfig := getPSRPCConfig(conf)
universalClient, err := createRedisClient(conf)
if err != nil {
return nil, err
@@ -73,7 +74,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService)
roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher)
topicFormatter := routing.NewTopicFormatter()
roomClient, err := routing.NewRoomClient(nodeID, messageBus, psrpcConfig)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient)
if err != nil {
return nil, err
}
@@ -88,7 +94,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
clientConfigurationManager := createClientConfiguration()
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
turnAuthHandler := NewTURNAuthHandler(keyProvider)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus)
if err != nil {
return nil, err
}
@@ -227,6 +233,10 @@ func getSignalRelayConfig(config2 *config.Config) config.SignalRelayConfig {
return config2.SignalRelay
}
func getPSRPCConfig(config2 *config.Config) config.PSRPCConfig {
return config2.PSRPC
}
func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
+1 -1
View File
@@ -1691,7 +1691,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
tp.shouldDrop = true
if f.started && result.IsRelevant {
// call to update highest incoming sequence number and other internal structures
if _, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil {
if tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt); err == nil && tpRTP.snOrdering == SequenceNumberOrderingContiguous {
f.rtpMunger.PacketDropped(extPkt)
}
}