Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-10-19 13:02:37 +05:30
11 changed files with 471 additions and 122 deletions
+1 -1
View File
@@ -272,7 +272,7 @@ Read our [deployment docs](https://docs.livekit.io/deploy/) for more information
Pre-requisites:
- Go 1.18+ is installed
- Go 1.20+ is installed
- GOPATH/bin is in your PATH
Then run
+6 -6
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.8.0
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -27,7 +27,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/dtls/v2 v2.2.7
github.com/pion/ice/v2 v2.3.11
github.com/pion/interceptor v0.1.23
github.com/pion/interceptor v0.1.24
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v1.8.2
github.com/pion/sctp v1.8.9
@@ -71,7 +71,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@@ -93,8 +93,8 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
@@ -102,6 +102,6 @@ require (
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+14 -15
View File
@@ -1,4 +1,3 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY=
@@ -110,8 +109,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
@@ -126,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ=
github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU=
github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -162,7 +161,7 @@ github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46N
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
@@ -189,8 +188,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ
github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw=
github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E=
github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I=
github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0=
github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y=
github.com/pion/interceptor v0.1.24 h1:lN4ua3yUAJCgNKQKcZIM52wFjBgjN0r7shLj91PkJ0c=
github.com/pion/interceptor v0.1.24/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI=
@@ -241,7 +240,7 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
@@ -281,10 +280,10 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -419,8 +418,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0=
google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ=
google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+22 -62
View File
@@ -23,7 +23,6 @@ import (
"github.com/twitchtv/twirp"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -33,48 +32,39 @@ import (
type EgressService struct {
client rpc.EgressClient
store ServiceStore
es EgressStore
io IOClient
roomService livekit.RoomService
telemetry telemetry.TelemetryService
store ServiceStore
launcher rtc.EgressLauncher
}
type egressLauncher struct {
client rpc.EgressClient
es EgressStore
telemetry telemetry.TelemetryService
client rpc.EgressClient
io IOClient
}
func NewEgressLauncher(
client rpc.EgressClient,
es EgressStore,
ts telemetry.TelemetryService) rtc.EgressLauncher {
func NewEgressLauncher(client rpc.EgressClient, io IOClient) rtc.EgressLauncher {
if client == nil {
return nil
}
return &egressLauncher{
client: client,
es: es,
telemetry: ts,
client: client,
io: io,
}
}
func NewEgressService(
client rpc.EgressClient,
store ServiceStore,
es EgressStore,
io IOClient,
rs livekit.RoomService,
ts telemetry.TelemetryService,
launcher rtc.EgressLauncher,
) *EgressService {
return &EgressService{
client: client,
store: store,
es: es,
io: io,
roomService: rs,
telemetry: ts,
launcher: launcher,
}
}
@@ -189,7 +179,6 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa
} else if s.launcher == nil {
return nil, ErrEgressNotConnected
}
if roomName != "" {
room, _, err := s.store.LoadRoom(ctx, roomName, false)
if err != nil {
@@ -197,14 +186,18 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa
}
req.RoomId = room.Sid
}
return s.launcher.StartEgress(ctx, req)
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgressWithClusterId(ctx, "", req)
}
func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if s.client == nil {
return nil, ErrEgressNotConnected
}
// Ensure we have an Egress ID
if req.EgressId == "" {
req.EgressId = utils.NewGuid(utils.EgressPrefix)
@@ -215,12 +208,10 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId
return nil, err
}
s.telemetry.EgressStarted(ctx, info)
go func() {
if err := s.es.StoreEgress(ctx, info); err != nil {
logger.Errorw("could not write egress info", err)
}
}()
_, err = s.io.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
}
return info, nil
}
@@ -234,11 +225,8 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.client == nil {
return nil, ErrEgressNotConnected
}
info, err := s.es.LoadEgress(ctx, req.EgressId)
info, err := s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId})
if err != nil {
return nil, err
}
@@ -277,7 +265,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
info, err := s.client.UpdateStream(ctx, req.EgressId, req)
if err != nil {
var loadErr error
info, loadErr = s.es.LoadEgress(ctx, req.EgressId)
info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId})
if loadErr != nil {
return nil, loadErr
}
@@ -306,29 +294,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.client == nil {
return nil, ErrEgressNotConnected
}
var items []*livekit.EgressInfo
if req.EgressId != "" {
info, err := s.es.LoadEgress(ctx, req.EgressId)
if err != nil {
return nil, err
}
if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) {
items = []*livekit.EgressInfo{info}
}
} else {
var err error
items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active)
if err != nil {
return nil, err
}
}
return &livekit.ListEgressResponse{Items: items}, nil
return s.io.ListEgress(ctx, req)
}
func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
@@ -344,7 +310,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
info, err := s.client.StopEgress(ctx, req.EgressId, req)
if err != nil {
var loadErr error
info, loadErr = s.es.LoadEgress(ctx, req.EgressId)
info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId})
if loadErr != nil {
return nil, loadErr
}
@@ -359,11 +325,5 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
}
}
go func() {
if err := s.es.UpdateEgress(ctx, info); err != nil {
logger.Errorw("could not write egress info", err)
}
}()
return info, nil
}
+10
View File
@@ -18,7 +18,10 @@ import (
"context"
"time"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -71,6 +74,13 @@ type IngressStore interface {
DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error
}
//counterfeiter:generate . IOClient
type IOClient interface {
CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error)
GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error)
ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)
}
//counterfeiter:generate . RoomAllocator
type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
@@ -28,11 +28,13 @@ import (
)
type IOInfoService struct {
ioServer rpc.IOInfoServer
ioServer rpc.IOInfoServer
es EgressStore
is IngressStore
telemetry telemetry.TelemetryService
shutdown chan struct{}
shutdown chan struct{}
}
func NewIOInfoService(
@@ -73,27 +75,38 @@ func (s *IOInfoService) Start() error {
return nil
}
func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
func (s *IOInfoService) CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
// check if egress already exists to avoid duplicate EgressStarted event
if _, err := s.es.LoadEgress(ctx, info.EgressId); err == nil {
return &emptypb.Empty{}, nil
}
err := s.es.StoreEgress(ctx, info)
if err != nil {
logger.Errorw("could not update egress", err)
return nil, err
}
s.telemetry.EgressStarted(ctx, info)
return &emptypb.Empty{}, nil
}
func (s *IOInfoService) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
err := s.es.UpdateEgress(ctx, info)
switch info.Status {
case livekit.EgressStatus_EGRESS_ACTIVE:
case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
s.telemetry.EgressUpdated(ctx, info)
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// log results
if info.Error != "" {
logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId)
} else {
logger.Infow("egress ended", "egressID", info.EgressId)
}
s.telemetry.EgressEnded(ctx, info)
}
if err != nil {
logger.Errorw("could not update egress", err)
return nil, err
@@ -102,6 +115,40 @@ func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.Egre
return &emptypb.Empty{}, nil
}
func (s *IOInfoService) GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) {
info, err := s.es.LoadEgress(ctx, req.EgressId)
if err != nil {
logger.Errorw("failed to load egress", err)
return nil, err
}
return info, nil
}
func (s *IOInfoService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) {
var items []*livekit.EgressInfo
if req.EgressId != "" {
info, err := s.es.LoadEgress(ctx, req.EgressId)
if err != nil {
logger.Errorw("failed to load egress", err)
return nil, err
}
if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) {
items = []*livekit.EgressInfo{info}
}
} else {
var err error
items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active)
if err != nil {
logger.Errorw("failed to list egress", err)
return nil, err
}
}
return &livekit.ListEgressResponse{Items: items}, nil
}
func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) {
info, err := s.loadIngressFromInfoRequest(req)
if err != nil {
@@ -128,7 +175,7 @@ func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateI
return nil, err
}
if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
logger.Errorw("could not update ingress", err)
return nil, err
}
@@ -176,3 +223,33 @@ func (s *IOInfoService) Stop() {
s.ioServer.Shutdown()
}
}
// deprecated
func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
err := s.es.UpdateEgress(ctx, info)
switch info.Status {
case livekit.EgressStatus_EGRESS_ACTIVE:
s.telemetry.EgressUpdated(ctx, info)
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// log results
if info.Error != "" {
logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId)
} else {
logger.Infow("egress ended", "egressID", info.EgressId)
}
s.telemetry.EgressEnded(ctx, info)
}
if err != nil {
logger.Errorw("could not update egress", err)
return nil, err
}
return &emptypb.Empty{}, nil
}
+284
View File
@@ -0,0 +1,284 @@
// Code generated by counterfeiter. DO NOT EDIT.
package servicefakes
import (
"context"
"sync"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"google.golang.org/protobuf/types/known/emptypb"
)
type FakeIOClient struct {
CreateEgressStub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error)
createEgressMutex sync.RWMutex
createEgressArgsForCall []struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}
createEgressReturns struct {
result1 *emptypb.Empty
result2 error
}
createEgressReturnsOnCall map[int]struct {
result1 *emptypb.Empty
result2 error
}
GetEgressStub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error)
getEgressMutex sync.RWMutex
getEgressArgsForCall []struct {
arg1 context.Context
arg2 *rpc.GetEgressRequest
}
getEgressReturns struct {
result1 *livekit.EgressInfo
result2 error
}
getEgressReturnsOnCall map[int]struct {
result1 *livekit.EgressInfo
result2 error
}
ListEgressStub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)
listEgressMutex sync.RWMutex
listEgressArgsForCall []struct {
arg1 context.Context
arg2 *livekit.ListEgressRequest
}
listEgressReturns struct {
result1 *livekit.ListEgressResponse
result2 error
}
listEgressReturnsOnCall map[int]struct {
result1 *livekit.ListEgressResponse
result2 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeIOClient) CreateEgress(arg1 context.Context, arg2 *livekit.EgressInfo) (*emptypb.Empty, error) {
fake.createEgressMutex.Lock()
ret, specificReturn := fake.createEgressReturnsOnCall[len(fake.createEgressArgsForCall)]
fake.createEgressArgsForCall = append(fake.createEgressArgsForCall, struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}{arg1, arg2})
stub := fake.CreateEgressStub
fakeReturns := fake.createEgressReturns
fake.recordInvocation("CreateEgress", []interface{}{arg1, arg2})
fake.createEgressMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeIOClient) CreateEgressCallCount() int {
fake.createEgressMutex.RLock()
defer fake.createEgressMutex.RUnlock()
return len(fake.createEgressArgsForCall)
}
func (fake *FakeIOClient) CreateEgressCalls(stub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error)) {
fake.createEgressMutex.Lock()
defer fake.createEgressMutex.Unlock()
fake.CreateEgressStub = stub
}
func (fake *FakeIOClient) CreateEgressArgsForCall(i int) (context.Context, *livekit.EgressInfo) {
fake.createEgressMutex.RLock()
defer fake.createEgressMutex.RUnlock()
argsForCall := fake.createEgressArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeIOClient) CreateEgressReturns(result1 *emptypb.Empty, result2 error) {
fake.createEgressMutex.Lock()
defer fake.createEgressMutex.Unlock()
fake.CreateEgressStub = nil
fake.createEgressReturns = struct {
result1 *emptypb.Empty
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) CreateEgressReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) {
fake.createEgressMutex.Lock()
defer fake.createEgressMutex.Unlock()
fake.CreateEgressStub = nil
if fake.createEgressReturnsOnCall == nil {
fake.createEgressReturnsOnCall = make(map[int]struct {
result1 *emptypb.Empty
result2 error
})
}
fake.createEgressReturnsOnCall[i] = struct {
result1 *emptypb.Empty
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) GetEgress(arg1 context.Context, arg2 *rpc.GetEgressRequest) (*livekit.EgressInfo, error) {
fake.getEgressMutex.Lock()
ret, specificReturn := fake.getEgressReturnsOnCall[len(fake.getEgressArgsForCall)]
fake.getEgressArgsForCall = append(fake.getEgressArgsForCall, struct {
arg1 context.Context
arg2 *rpc.GetEgressRequest
}{arg1, arg2})
stub := fake.GetEgressStub
fakeReturns := fake.getEgressReturns
fake.recordInvocation("GetEgress", []interface{}{arg1, arg2})
fake.getEgressMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeIOClient) GetEgressCallCount() int {
fake.getEgressMutex.RLock()
defer fake.getEgressMutex.RUnlock()
return len(fake.getEgressArgsForCall)
}
func (fake *FakeIOClient) GetEgressCalls(stub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error)) {
fake.getEgressMutex.Lock()
defer fake.getEgressMutex.Unlock()
fake.GetEgressStub = stub
}
func (fake *FakeIOClient) GetEgressArgsForCall(i int) (context.Context, *rpc.GetEgressRequest) {
fake.getEgressMutex.RLock()
defer fake.getEgressMutex.RUnlock()
argsForCall := fake.getEgressArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeIOClient) GetEgressReturns(result1 *livekit.EgressInfo, result2 error) {
fake.getEgressMutex.Lock()
defer fake.getEgressMutex.Unlock()
fake.GetEgressStub = nil
fake.getEgressReturns = struct {
result1 *livekit.EgressInfo
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) GetEgressReturnsOnCall(i int, result1 *livekit.EgressInfo, result2 error) {
fake.getEgressMutex.Lock()
defer fake.getEgressMutex.Unlock()
fake.GetEgressStub = nil
if fake.getEgressReturnsOnCall == nil {
fake.getEgressReturnsOnCall = make(map[int]struct {
result1 *livekit.EgressInfo
result2 error
})
}
fake.getEgressReturnsOnCall[i] = struct {
result1 *livekit.EgressInfo
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) ListEgress(arg1 context.Context, arg2 *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) {
fake.listEgressMutex.Lock()
ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)]
fake.listEgressArgsForCall = append(fake.listEgressArgsForCall, struct {
arg1 context.Context
arg2 *livekit.ListEgressRequest
}{arg1, arg2})
stub := fake.ListEgressStub
fakeReturns := fake.listEgressReturns
fake.recordInvocation("ListEgress", []interface{}{arg1, arg2})
fake.listEgressMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeIOClient) ListEgressCallCount() int {
fake.listEgressMutex.RLock()
defer fake.listEgressMutex.RUnlock()
return len(fake.listEgressArgsForCall)
}
func (fake *FakeIOClient) ListEgressCalls(stub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)) {
fake.listEgressMutex.Lock()
defer fake.listEgressMutex.Unlock()
fake.ListEgressStub = stub
}
func (fake *FakeIOClient) ListEgressArgsForCall(i int) (context.Context, *livekit.ListEgressRequest) {
fake.listEgressMutex.RLock()
defer fake.listEgressMutex.RUnlock()
argsForCall := fake.listEgressArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeIOClient) ListEgressReturns(result1 *livekit.ListEgressResponse, result2 error) {
fake.listEgressMutex.Lock()
defer fake.listEgressMutex.Unlock()
fake.ListEgressStub = nil
fake.listEgressReturns = struct {
result1 *livekit.ListEgressResponse
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) ListEgressReturnsOnCall(i int, result1 *livekit.ListEgressResponse, result2 error) {
fake.listEgressMutex.Lock()
defer fake.listEgressMutex.Unlock()
fake.ListEgressStub = nil
if fake.listEgressReturnsOnCall == nil {
fake.listEgressReturnsOnCall = make(map[int]struct {
result1 *livekit.ListEgressResponse
result2 error
})
}
fake.listEgressReturnsOnCall[i] = struct {
result1 *livekit.ListEgressResponse
result2 error
}{result1, result2}
}
func (fake *FakeIOClient) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.createEgressMutex.RLock()
defer fake.createEgressMutex.RUnlock()
fake.getEgressMutex.RLock()
defer fake.getEgressMutex.RUnlock()
fake.listEgressMutex.RLock()
defer fake.listEgressMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeIOClient) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ service.IOClient = new(FakeIOClient)
+1
View File
@@ -58,6 +58,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
telemetry.NewTelemetryService,
getMessageBus,
NewIOInfoService,
wire.Bind(new(IOClient), new(*IOInfoService)),
rpc.NewEgressClient,
getEgressStore,
NewEgressLauncher,
+7 -7
View File
@@ -57,6 +57,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
egressStore := getEgressStore(objectStore)
ingressStore := getIngressStore(objectStore)
keyProvider, err := createKeyProvider(conf)
if err != nil {
return nil, err
@@ -67,23 +68,22 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService)
rtcEgressLauncher := NewEgressLauncher(egressClient, egressStore, telemetryService)
ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService)
if err != nil {
return nil, err
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService)
roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher)
if err != nil {
return nil, err
}
egressService := NewEgressService(egressClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
egressService := NewEgressService(egressClient, objectStore, ioInfoService, roomService, rtcEgressLauncher)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(nodeID, messageBus)
if err != nil {
return nil, err
}
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService)
if err != nil {
return nil, err
}
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
clientConfigurationManager := createClientConfiguration()
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
+19 -15
View File
@@ -327,17 +327,6 @@ func (r *RTPStatsSender) Update(
r.extStartSN = extSequenceNumber
}
if extTimestamp < r.extStartTS {
r.logger.Infow(
"adjusting start timestamp",
"snBefore", r.extStartSN,
"snAfter", extSequenceNumber,
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
)
r.extStartTS = extTimestamp
}
if gapSN != 0 {
r.packetsOutOfOrder++
}
@@ -377,12 +366,27 @@ func (r *RTPStatsSender) Update(
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, false)
if extTimestamp != r.extHighestTS {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.extHighestSN = extSequenceNumber
}
if extTimestamp < r.extStartTS {
r.logger.Infow(
"adjusting start timestamp",
"snBefore", r.extStartSN,
"snAfter", extSequenceNumber,
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
)
r.extStartTS = extTimestamp
}
if extTimestamp > r.extHighestTS {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
if payloadSize > 0 {
// skip updating on padding only packets as they could re-use an old timestamp
r.highestTime = packetTime
}
r.extHighestSN = extSequenceNumber
r.extHighestTS = extTimestamp
}
+17 -3
View File
@@ -54,10 +54,11 @@ type RTPMungerState struct {
ExtLastSN uint64
ExtSecondLastSN uint64
ExtLastTS uint64
ExtSecondLastTS uint64
}
func (r RTPMungerState) String() string {
return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS)
return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS, r.ExtSecondLastTS)
}
// ----------------------------------------------------------------------
@@ -72,8 +73,9 @@ type RTPMunger struct {
extSecondLastSN uint64
snOffset uint64
extLastTS uint64
tsOffset uint64
extLastTS uint64
extSecondLastTS uint64
tsOffset uint64
lastMarker bool
@@ -95,6 +97,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
"ExtSecondLastSN": r.extSecondLastSN,
"SNOffset": r.snOffset,
"ExtLastTS": r.extLastTS,
"ExtSecondLastTS": r.extSecondLastTS,
"TSOffset": r.tsOffset,
"LastMarker": r.lastMarker,
}
@@ -105,6 +108,7 @@ func (r *RTPMunger) GetLast() RTPMungerState {
ExtLastSN: r.extLastSN,
ExtSecondLastSN: r.extSecondLastSN,
ExtLastTS: r.extLastTS,
ExtSecondLastTS: r.extSecondLastTS,
}
}
@@ -112,6 +116,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.extLastSN = state.ExtLastSN
r.extSecondLastSN = state.ExtSecondLastSN
r.extLastTS = state.ExtLastTS
r.extSecondLastTS = state.ExtSecondLastTS
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
@@ -123,6 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.updateSnOffset()
r.extLastTS = extPkt.ExtTimestamp
r.extSecondLastTS = extPkt.ExtTimestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) {
@@ -156,6 +162,8 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
r.extLastSN = r.extSecondLastSN
r.updateSnOffset()
r.extLastTS = r.extSecondLastTS
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
@@ -174,6 +182,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
r.extSecondLastSN = r.extLastSN
r.extLastSN = extMungedSN
r.extSecondLastTS = r.extLastTS
r.extLastTS = extMungedTS
r.lastMarker = extPkt.Packet.Marker
@@ -306,6 +315,11 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
r.snRangeMap.DecValue(r.extHighestIncomingSN, uint64(num))
r.updateSnOffset()
if len(vals) == 1 {
r.extSecondLastTS = r.extLastTS
} else {
r.extSecondLastTS = vals[len(vals)-2].extTimestamp
}
r.tsOffset -= extLastTS - r.extLastTS
r.extLastTS = extLastTS