diff --git a/README.md b/README.md index 7bdc9863e..c05377095 100644 --- a/README.md +++ b/README.md @@ -272,7 +272,7 @@ Read our [deployment docs](https://docs.livekit.io/deploy/) for more information Pre-requisites: -- Go 1.18+ is installed +- Go 1.20+ is installed - GOPATH/bin is in your PATH Then run diff --git a/go.mod b/go.mod index cf175a44f..f9a258872 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.0 + github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -27,7 +27,7 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 github.com/pion/ice/v2 v2.3.11 - github.com/pion/interceptor v0.1.23 + github.com/pion/interceptor v0.1.24 github.com/pion/rtcp v1.2.10 github.com/pion/rtp v1.8.2 github.com/pion/sctp v1.8.9 @@ -71,7 +71,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.0 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -93,8 +93,8 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.uber.org/multierr v1.10.0 // indirect - go.uber.org/zap v1.25.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect @@ -102,6 +102,6 @@ require ( golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect - google.golang.org/grpc v1.58.3 // indirect + google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 6626aafb1..b1d7080f8 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -110,8 +109,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -126,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ= -github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU= +github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -162,7 +161,7 @@ github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46N github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= @@ -189,8 +188,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= -github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0= -github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= +github.com/pion/interceptor v0.1.24 h1:lN4ua3yUAJCgNKQKcZIM52wFjBgjN0r7shLj91PkJ0c= +github.com/pion/interceptor v0.1.24/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI= @@ -241,7 +240,7 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/ github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -281,10 +280,10 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= -go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -419,8 +418,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 36f379b3f..0a77a231f 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -23,7 +23,6 @@ import ( "github.com/twitchtv/twirp" "github.com/livekit/livekit-server/pkg/rtc" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -33,48 +32,39 @@ import ( type EgressService struct { client rpc.EgressClient - store ServiceStore - es EgressStore + io IOClient roomService livekit.RoomService - telemetry telemetry.TelemetryService + store ServiceStore launcher rtc.EgressLauncher } type egressLauncher struct { - client rpc.EgressClient - es EgressStore - telemetry telemetry.TelemetryService + client rpc.EgressClient + io IOClient } -func NewEgressLauncher( - client rpc.EgressClient, - es EgressStore, - ts telemetry.TelemetryService) rtc.EgressLauncher { +func NewEgressLauncher(client rpc.EgressClient, io IOClient) rtc.EgressLauncher { if client == nil { return nil } - return &egressLauncher{ - client: client, - es: es, - telemetry: ts, + client: client, + io: io, } } func NewEgressService( client rpc.EgressClient, store ServiceStore, - es EgressStore, + io IOClient, rs livekit.RoomService, - ts telemetry.TelemetryService, launcher rtc.EgressLauncher, ) *EgressService { return &EgressService{ client: client, store: store, - es: es, + io: io, roomService: rs, - telemetry: ts, launcher: launcher, } } @@ -189,7 +179,6 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } else if s.launcher == nil { return nil, ErrEgressNotConnected } - if roomName != "" { room, _, err := s.store.LoadRoom(ctx, roomName, false) if err != nil { @@ -197,14 +186,18 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } req.RoomId = room.Sid } - return s.launcher.StartEgress(ctx, req) } func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { return s.StartEgressWithClusterId(ctx, "", req) } + func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { + if s.client == nil { + return nil, ErrEgressNotConnected + } + // Ensure we have an Egress ID if req.EgressId == "" { req.EgressId = utils.NewGuid(utils.EgressPrefix) @@ -215,12 +208,10 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId return nil, err } - s.telemetry.EgressStarted(ctx, info) - go func() { - if err := s.es.StoreEgress(ctx, info); err != nil { - logger.Errorw("could not write egress info", err) - } - }() + _, err = s.io.CreateEgress(ctx, info) + if err != nil { + logger.Errorw("failed to create egress", err) + } return info, nil } @@ -234,11 +225,8 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.client == nil { - return nil, ErrEgressNotConnected - } - info, err := s.es.LoadEgress(ctx, req.EgressId) + info, err := s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if err != nil { return nil, err } @@ -277,7 +265,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr info, err := s.client.UpdateStream(ctx, req.EgressId, req) if err != nil { var loadErr error - info, loadErr = s.es.LoadEgress(ctx, req.EgressId) + info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if loadErr != nil { return nil, loadErr } @@ -306,29 +294,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.client == nil { - return nil, ErrEgressNotConnected - } - - var items []*livekit.EgressInfo - if req.EgressId != "" { - info, err := s.es.LoadEgress(ctx, req.EgressId) - if err != nil { - return nil, err - } - - if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) { - items = []*livekit.EgressInfo{info} - } - } else { - var err error - items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active) - if err != nil { - return nil, err - } - } - - return &livekit.ListEgressResponse{Items: items}, nil + return s.io.ListEgress(ctx, req) } func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { @@ -344,7 +310,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR info, err := s.client.StopEgress(ctx, req.EgressId, req) if err != nil { var loadErr error - info, loadErr = s.es.LoadEgress(ctx, req.EgressId) + info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if loadErr != nil { return nil, loadErr } @@ -359,11 +325,5 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR } } - go func() { - if err := s.es.UpdateEgress(ctx, info); err != nil { - logger.Errorw("could not write egress info", err) - } - }() - return info, nil } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 36da68fc5..ba390554f 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -18,7 +18,10 @@ import ( "context" "time" + "google.golang.org/protobuf/types/known/emptypb" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -71,6 +74,13 @@ type IngressStore interface { DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error } +//counterfeiter:generate . IOClient +type IOClient interface { + CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) + GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) + ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) +} + //counterfeiter:generate . RoomAllocator type RoomAllocator interface { CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) diff --git a/pkg/service/ioinfo.go b/pkg/service/ioservice.go similarity index 67% rename from pkg/service/ioinfo.go rename to pkg/service/ioservice.go index b8fedd8ac..a993daa83 100644 --- a/pkg/service/ioinfo.go +++ b/pkg/service/ioservice.go @@ -28,11 +28,13 @@ import ( ) type IOInfoService struct { - ioServer rpc.IOInfoServer + ioServer rpc.IOInfoServer + es EgressStore is IngressStore telemetry telemetry.TelemetryService - shutdown chan struct{} + + shutdown chan struct{} } func NewIOInfoService( @@ -73,27 +75,38 @@ func (s *IOInfoService) Start() error { return nil } -func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { +func (s *IOInfoService) CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + // check if egress already exists to avoid duplicate EgressStarted event + if _, err := s.es.LoadEgress(ctx, info.EgressId); err == nil { + return &emptypb.Empty{}, nil + } + + err := s.es.StoreEgress(ctx, info) + if err != nil { + logger.Errorw("could not update egress", err) + return nil, err + } + + s.telemetry.EgressStarted(ctx, info) + + return &emptypb.Empty{}, nil +} + +func (s *IOInfoService) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { err := s.es.UpdateEgress(ctx, info) switch info.Status { - case livekit.EgressStatus_EGRESS_ACTIVE: + case livekit.EgressStatus_EGRESS_ACTIVE, + livekit.EgressStatus_EGRESS_ENDING: s.telemetry.EgressUpdated(ctx, info) case livekit.EgressStatus_EGRESS_COMPLETE, livekit.EgressStatus_EGRESS_FAILED, livekit.EgressStatus_EGRESS_ABORTED, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - - // log results - if info.Error != "" { - logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId) - } else { - logger.Infow("egress ended", "egressID", info.EgressId) - } - s.telemetry.EgressEnded(ctx, info) } + if err != nil { logger.Errorw("could not update egress", err) return nil, err @@ -102,6 +115,40 @@ func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.Egre return &emptypb.Empty{}, nil } +func (s *IOInfoService) GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) { + info, err := s.es.LoadEgress(ctx, req.EgressId) + if err != nil { + logger.Errorw("failed to load egress", err) + return nil, err + } + + return info, nil +} + +func (s *IOInfoService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { + var items []*livekit.EgressInfo + if req.EgressId != "" { + info, err := s.es.LoadEgress(ctx, req.EgressId) + if err != nil { + logger.Errorw("failed to load egress", err) + return nil, err + } + + if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) { + items = []*livekit.EgressInfo{info} + } + } else { + var err error + items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active) + if err != nil { + logger.Errorw("failed to list egress", err) + return nil, err + } + } + + return &livekit.ListEgressResponse{Items: items}, nil +} + func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) { info, err := s.loadIngressFromInfoRequest(req) if err != nil { @@ -128,7 +175,7 @@ func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateI return nil, err } - if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { + if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { logger.Errorw("could not update ingress", err) return nil, err } @@ -176,3 +223,33 @@ func (s *IOInfoService) Stop() { s.ioServer.Shutdown() } } + +// deprecated +func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + err := s.es.UpdateEgress(ctx, info) + + switch info.Status { + case livekit.EgressStatus_EGRESS_ACTIVE: + s.telemetry.EgressUpdated(ctx, info) + + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED, + livekit.EgressStatus_EGRESS_LIMIT_REACHED: + + // log results + if info.Error != "" { + logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId) + } else { + logger.Infow("egress ended", "egressID", info.EgressId) + } + + s.telemetry.EgressEnded(ctx, info) + } + if err != nil { + logger.Errorw("could not update egress", err) + return nil, err + } + + return &emptypb.Empty{}, nil +} diff --git a/pkg/service/servicefakes/fake_ioclient.go b/pkg/service/servicefakes/fake_ioclient.go new file mode 100644 index 000000000..5d440cbb9 --- /dev/null +++ b/pkg/service/servicefakes/fake_ioclient.go @@ -0,0 +1,284 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package servicefakes + +import ( + "context" + "sync" + + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +type FakeIOClient struct { + CreateEgressStub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error) + createEgressMutex sync.RWMutex + createEgressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.EgressInfo + } + createEgressReturns struct { + result1 *emptypb.Empty + result2 error + } + createEgressReturnsOnCall map[int]struct { + result1 *emptypb.Empty + result2 error + } + GetEgressStub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error) + getEgressMutex sync.RWMutex + getEgressArgsForCall []struct { + arg1 context.Context + arg2 *rpc.GetEgressRequest + } + getEgressReturns struct { + result1 *livekit.EgressInfo + result2 error + } + getEgressReturnsOnCall map[int]struct { + result1 *livekit.EgressInfo + result2 error + } + ListEgressStub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) + listEgressMutex sync.RWMutex + listEgressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.ListEgressRequest + } + listEgressReturns struct { + result1 *livekit.ListEgressResponse + result2 error + } + listEgressReturnsOnCall map[int]struct { + result1 *livekit.ListEgressResponse + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeIOClient) CreateEgress(arg1 context.Context, arg2 *livekit.EgressInfo) (*emptypb.Empty, error) { + fake.createEgressMutex.Lock() + ret, specificReturn := fake.createEgressReturnsOnCall[len(fake.createEgressArgsForCall)] + fake.createEgressArgsForCall = append(fake.createEgressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.EgressInfo + }{arg1, arg2}) + stub := fake.CreateEgressStub + fakeReturns := fake.createEgressReturns + fake.recordInvocation("CreateEgress", []interface{}{arg1, arg2}) + fake.createEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) CreateEgressCallCount() int { + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + return len(fake.createEgressArgsForCall) +} + +func (fake *FakeIOClient) CreateEgressCalls(stub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error)) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = stub +} + +func (fake *FakeIOClient) CreateEgressArgsForCall(i int) (context.Context, *livekit.EgressInfo) { + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + argsForCall := fake.createEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) CreateEgressReturns(result1 *emptypb.Empty, result2 error) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = nil + fake.createEgressReturns = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) CreateEgressReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = nil + if fake.createEgressReturnsOnCall == nil { + fake.createEgressReturnsOnCall = make(map[int]struct { + result1 *emptypb.Empty + result2 error + }) + } + fake.createEgressReturnsOnCall[i] = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) GetEgress(arg1 context.Context, arg2 *rpc.GetEgressRequest) (*livekit.EgressInfo, error) { + fake.getEgressMutex.Lock() + ret, specificReturn := fake.getEgressReturnsOnCall[len(fake.getEgressArgsForCall)] + fake.getEgressArgsForCall = append(fake.getEgressArgsForCall, struct { + arg1 context.Context + arg2 *rpc.GetEgressRequest + }{arg1, arg2}) + stub := fake.GetEgressStub + fakeReturns := fake.getEgressReturns + fake.recordInvocation("GetEgress", []interface{}{arg1, arg2}) + fake.getEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) GetEgressCallCount() int { + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + return len(fake.getEgressArgsForCall) +} + +func (fake *FakeIOClient) GetEgressCalls(stub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error)) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = stub +} + +func (fake *FakeIOClient) GetEgressArgsForCall(i int) (context.Context, *rpc.GetEgressRequest) { + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + argsForCall := fake.getEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) GetEgressReturns(result1 *livekit.EgressInfo, result2 error) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = nil + fake.getEgressReturns = struct { + result1 *livekit.EgressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) GetEgressReturnsOnCall(i int, result1 *livekit.EgressInfo, result2 error) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = nil + if fake.getEgressReturnsOnCall == nil { + fake.getEgressReturnsOnCall = make(map[int]struct { + result1 *livekit.EgressInfo + result2 error + }) + } + fake.getEgressReturnsOnCall[i] = struct { + result1 *livekit.EgressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) ListEgress(arg1 context.Context, arg2 *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { + fake.listEgressMutex.Lock() + ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)] + fake.listEgressArgsForCall = append(fake.listEgressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.ListEgressRequest + }{arg1, arg2}) + stub := fake.ListEgressStub + fakeReturns := fake.listEgressReturns + fake.recordInvocation("ListEgress", []interface{}{arg1, arg2}) + fake.listEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) ListEgressCallCount() int { + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + return len(fake.listEgressArgsForCall) +} + +func (fake *FakeIOClient) ListEgressCalls(stub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = stub +} + +func (fake *FakeIOClient) ListEgressArgsForCall(i int) (context.Context, *livekit.ListEgressRequest) { + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + argsForCall := fake.listEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) ListEgressReturns(result1 *livekit.ListEgressResponse, result2 error) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = nil + fake.listEgressReturns = struct { + result1 *livekit.ListEgressResponse + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) ListEgressReturnsOnCall(i int, result1 *livekit.ListEgressResponse, result2 error) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = nil + if fake.listEgressReturnsOnCall == nil { + fake.listEgressReturnsOnCall = make(map[int]struct { + result1 *livekit.ListEgressResponse + result2 error + }) + } + fake.listEgressReturnsOnCall[i] = struct { + result1 *livekit.ListEgressResponse + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeIOClient) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ service.IOClient = new(FakeIOClient) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 9d8fbf394..05502a16c 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -58,6 +58,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetry.NewTelemetryService, getMessageBus, NewIOInfoService, + wire.Bind(new(IOClient), new(*IOInfoService)), rpc.NewEgressClient, getEgressStore, NewEgressLauncher, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 942d693fc..fa788a829 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -57,6 +57,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } egressStore := getEgressStore(objectStore) + ingressStore := getIngressStore(objectStore) keyProvider, err := createKeyProvider(conf) if err != nil { return nil, err @@ -67,23 +68,22 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) - rtcEgressLauncher := NewEgressLauncher(egressClient, egressStore, telemetryService) + ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService) + if err != nil { + return nil, err + } + rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) if err != nil { return nil, err } - egressService := NewEgressService(egressClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) + egressService := NewEgressService(egressClient, objectStore, ioInfoService, roomService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(nodeID, messageBus) if err != nil { return nil, err } - ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) - ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService) - if err != nil { - return nil, err - } rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 2ac196904..b1c4c5df9 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -327,17 +327,6 @@ func (r *RTPStatsSender) Update( r.extStartSN = extSequenceNumber } - if extTimestamp < r.extStartTS { - r.logger.Infow( - "adjusting start timestamp", - "snBefore", r.extStartSN, - "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, - "tsAfter", extTimestamp, - ) - r.extStartTS = extTimestamp - } - if gapSN != 0 { r.packetsOutOfOrder++ } @@ -377,12 +366,27 @@ func (r *RTPStatsSender) Update( r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, false) - if extTimestamp != r.extHighestTS { - // update only on first packet as same timestamp could be in multiple packets. - // NOTE: this may not be the first packet with this time stamp if there is packet loss. + r.extHighestSN = extSequenceNumber + } + + if extTimestamp < r.extStartTS { + r.logger.Infow( + "adjusting start timestamp", + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + ) + r.extStartTS = extTimestamp + } + + if extTimestamp > r.extHighestTS { + // update only on first packet as same timestamp could be in multiple packets. + // NOTE: this may not be the first packet with this time stamp if there is packet loss. + if payloadSize > 0 { + // skip updating on padding only packets as they could re-use an old timestamp r.highestTime = packetTime } - r.extHighestSN = extSequenceNumber r.extHighestTS = extTimestamp } diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 8cfb4be27..31c415f12 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -54,10 +54,11 @@ type RTPMungerState struct { ExtLastSN uint64 ExtSecondLastSN uint64 ExtLastTS uint64 + ExtSecondLastTS uint64 } func (r RTPMungerState) String() string { - return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS) + return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS, r.ExtSecondLastTS) } // ---------------------------------------------------------------------- @@ -72,8 +73,9 @@ type RTPMunger struct { extSecondLastSN uint64 snOffset uint64 - extLastTS uint64 - tsOffset uint64 + extLastTS uint64 + extSecondLastTS uint64 + tsOffset uint64 lastMarker bool @@ -95,6 +97,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { "ExtSecondLastSN": r.extSecondLastSN, "SNOffset": r.snOffset, "ExtLastTS": r.extLastTS, + "ExtSecondLastTS": r.extSecondLastTS, "TSOffset": r.tsOffset, "LastMarker": r.lastMarker, } @@ -105,6 +108,7 @@ func (r *RTPMunger) GetLast() RTPMungerState { ExtLastSN: r.extLastSN, ExtSecondLastSN: r.extSecondLastSN, ExtLastTS: r.extLastTS, + ExtSecondLastTS: r.extSecondLastTS, } } @@ -112,6 +116,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { r.extLastSN = state.ExtLastSN r.extSecondLastSN = state.ExtSecondLastSN r.extLastTS = state.ExtLastTS + r.extSecondLastTS = state.ExtSecondLastTS } func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { @@ -123,6 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.updateSnOffset() r.extLastTS = extPkt.ExtTimestamp + r.extSecondLastTS = extPkt.ExtTimestamp } func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { @@ -156,6 +162,8 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { r.extLastSN = r.extSecondLastSN r.updateSnOffset() + + r.extLastTS = r.extSecondLastTS } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { @@ -174,6 +182,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara r.extSecondLastSN = r.extLastSN r.extLastSN = extMungedSN + r.extSecondLastTS = r.extLastTS r.extLastTS = extMungedTS r.lastMarker = extPkt.Packet.Marker @@ -306,6 +315,11 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate r.snRangeMap.DecValue(r.extHighestIncomingSN, uint64(num)) r.updateSnOffset() + if len(vals) == 1 { + r.extSecondLastTS = r.extLastTS + } else { + r.extSecondLastTS = vals[len(vals)-2].extTimestamp + } r.tsOffset -= extLastTS - r.extLastTS r.extLastTS = extLastTS