From 7c830ea5b951f6de9a76b34840ea88cafa4dbdb9 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 19 Oct 2023 00:53:50 +0530 Subject: [PATCH 1/6] Log highest time update on padding packet. (#2154) * Log highest time update on padding packet. Seeing a strange case of what looks like highest time getting updated on a padding packet. Can't see how it happens in code. So, logging to check. Will be removing log after checking. * log sequence number also --- pkg/sfu/buffer/rtpstats_sender.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 2ac196904..c6cd98dd2 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -380,6 +380,17 @@ func (r *RTPStatsSender) Update( if extTimestamp != r.extHighestTS { // update only on first packet as same timestamp could be in multiple packets. // NOTE: this may not be the first packet with this time stamp if there is packet loss. + if payloadSize == 0 { + r.logger.Infow( + "updating highest time on padding packet", + "extSequenceNumber", extSequenceNumber, + "extHighestSN", r.extHighestSN, + "extTimestamp", extTimestamp, + "extHighestTS", r.extHighestTS, + "highestTime", r.highestTime.String(), + "packetTime", packetTime.String(), + ) + } r.highestTime = packetTime } r.extHighestSN = extSequenceNumber From 62b057b4c12503c3759174412fef7656acdedb91 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 18 Oct 2023 14:48:51 -0700 Subject: [PATCH 2/6] Egress store/IO cleanup (#2152) * egress store cleanup * client wrapper, regenerate * put WithClusterID back * rename clent * infinite loops * client wrapper -> interface * remove StopEgress update * remove Update from IOClient * avoid duplicate EgressStarted events * update protocol --- go.mod | 10 +- go.sum | 25 +- pkg/service/egress.go | 81 ++---- pkg/service/interfaces.go | 10 + pkg/service/{ioinfo.go => ioservice.go} | 103 +++++++- pkg/service/servicefakes/fake_ioclient.go | 284 ++++++++++++++++++++++ pkg/service/wire.go | 1 + pkg/service/wire_gen.go | 14 +- 8 files changed, 429 insertions(+), 99 deletions(-) rename pkg/service/{ioinfo.go => ioservice.go} (67%) create mode 100644 pkg/service/servicefakes/fake_ioclient.go diff --git a/go.mod b/go.mod index cf175a44f..263fb72b3 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.0 + github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -71,7 +71,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.0 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -93,8 +93,8 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.uber.org/multierr v1.10.0 // indirect - go.uber.org/zap v1.25.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect @@ -102,6 +102,6 @@ require ( golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect - google.golang.org/grpc v1.58.3 // indirect + google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 6626aafb1..2a16e832b 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -110,8 +109,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -126,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ= -github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16 h1:0Ca5hQP9+DaRjrr5d9msNhfCL+CBvcxqLdh4xOXAZeU= +github.com/livekit/protocol v1.8.1-0.20231018194636-fac7f187fc16/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -162,7 +161,7 @@ github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46N github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o= github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= @@ -241,7 +240,7 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/ github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -281,10 +280,10 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= -go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -419,8 +418,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 36f379b3f..d6698c7e0 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -23,58 +23,47 @@ import ( "github.com/twitchtv/twirp" "github.com/livekit/livekit-server/pkg/rtc" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" ) type EgressService struct { client rpc.EgressClient - store ServiceStore - es EgressStore + io IOClient roomService livekit.RoomService - telemetry telemetry.TelemetryService + store ServiceStore launcher rtc.EgressLauncher } type egressLauncher struct { - client rpc.EgressClient - es EgressStore - telemetry telemetry.TelemetryService + client rpc.EgressClient + io IOClient } -func NewEgressLauncher( - client rpc.EgressClient, - es EgressStore, - ts telemetry.TelemetryService) rtc.EgressLauncher { +func NewEgressLauncher(client rpc.EgressClient, io IOClient) rtc.EgressLauncher { if client == nil { return nil } - return &egressLauncher{ - client: client, - es: es, - telemetry: ts, + client: client, + io: io, } } func NewEgressService( client rpc.EgressClient, store ServiceStore, - es EgressStore, + io IOClient, rs livekit.RoomService, - ts telemetry.TelemetryService, launcher rtc.EgressLauncher, ) *EgressService { return &EgressService{ client: client, store: store, - es: es, + io: io, roomService: rs, - telemetry: ts, launcher: launcher, } } @@ -189,7 +178,6 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } else if s.launcher == nil { return nil, ErrEgressNotConnected } - if roomName != "" { room, _, err := s.store.LoadRoom(ctx, roomName, false) if err != nil { @@ -197,14 +185,18 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } req.RoomId = room.Sid } - return s.launcher.StartEgress(ctx, req) } func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { return s.StartEgressWithClusterId(ctx, "", req) } + func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { + if s.client == nil { + return nil, ErrEgressNotConnected + } + // Ensure we have an Egress ID if req.EgressId == "" { req.EgressId = utils.NewGuid(utils.EgressPrefix) @@ -215,11 +207,9 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId return nil, err } - s.telemetry.EgressStarted(ctx, info) + // TODO: remove go func() { - if err := s.es.StoreEgress(ctx, info); err != nil { - logger.Errorw("could not write egress info", err) - } + _, _ = s.io.CreateEgress(ctx, info) }() return info, nil @@ -234,11 +224,8 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.client == nil { - return nil, ErrEgressNotConnected - } - info, err := s.es.LoadEgress(ctx, req.EgressId) + info, err := s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if err != nil { return nil, err } @@ -277,7 +264,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr info, err := s.client.UpdateStream(ctx, req.EgressId, req) if err != nil { var loadErr error - info, loadErr = s.es.LoadEgress(ctx, req.EgressId) + info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if loadErr != nil { return nil, loadErr } @@ -306,29 +293,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.client == nil { - return nil, ErrEgressNotConnected - } - - var items []*livekit.EgressInfo - if req.EgressId != "" { - info, err := s.es.LoadEgress(ctx, req.EgressId) - if err != nil { - return nil, err - } - - if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) { - items = []*livekit.EgressInfo{info} - } - } else { - var err error - items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active) - if err != nil { - return nil, err - } - } - - return &livekit.ListEgressResponse{Items: items}, nil + return s.io.ListEgress(ctx, req) } func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { @@ -344,7 +309,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR info, err := s.client.StopEgress(ctx, req.EgressId, req) if err != nil { var loadErr error - info, loadErr = s.es.LoadEgress(ctx, req.EgressId) + info, loadErr = s.io.GetEgress(ctx, &rpc.GetEgressRequest{EgressId: req.EgressId}) if loadErr != nil { return nil, loadErr } @@ -359,11 +324,5 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR } } - go func() { - if err := s.es.UpdateEgress(ctx, info); err != nil { - logger.Errorw("could not write egress info", err) - } - }() - return info, nil } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 36da68fc5..ba390554f 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -18,7 +18,10 @@ import ( "context" "time" + "google.golang.org/protobuf/types/known/emptypb" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -71,6 +74,13 @@ type IngressStore interface { DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error } +//counterfeiter:generate . IOClient +type IOClient interface { + CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) + GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) + ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) +} + //counterfeiter:generate . RoomAllocator type RoomAllocator interface { CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) diff --git a/pkg/service/ioinfo.go b/pkg/service/ioservice.go similarity index 67% rename from pkg/service/ioinfo.go rename to pkg/service/ioservice.go index b8fedd8ac..a993daa83 100644 --- a/pkg/service/ioinfo.go +++ b/pkg/service/ioservice.go @@ -28,11 +28,13 @@ import ( ) type IOInfoService struct { - ioServer rpc.IOInfoServer + ioServer rpc.IOInfoServer + es EgressStore is IngressStore telemetry telemetry.TelemetryService - shutdown chan struct{} + + shutdown chan struct{} } func NewIOInfoService( @@ -73,27 +75,38 @@ func (s *IOInfoService) Start() error { return nil } -func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { +func (s *IOInfoService) CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + // check if egress already exists to avoid duplicate EgressStarted event + if _, err := s.es.LoadEgress(ctx, info.EgressId); err == nil { + return &emptypb.Empty{}, nil + } + + err := s.es.StoreEgress(ctx, info) + if err != nil { + logger.Errorw("could not update egress", err) + return nil, err + } + + s.telemetry.EgressStarted(ctx, info) + + return &emptypb.Empty{}, nil +} + +func (s *IOInfoService) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { err := s.es.UpdateEgress(ctx, info) switch info.Status { - case livekit.EgressStatus_EGRESS_ACTIVE: + case livekit.EgressStatus_EGRESS_ACTIVE, + livekit.EgressStatus_EGRESS_ENDING: s.telemetry.EgressUpdated(ctx, info) case livekit.EgressStatus_EGRESS_COMPLETE, livekit.EgressStatus_EGRESS_FAILED, livekit.EgressStatus_EGRESS_ABORTED, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - - // log results - if info.Error != "" { - logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId) - } else { - logger.Infow("egress ended", "egressID", info.EgressId) - } - s.telemetry.EgressEnded(ctx, info) } + if err != nil { logger.Errorw("could not update egress", err) return nil, err @@ -102,6 +115,40 @@ func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.Egre return &emptypb.Empty{}, nil } +func (s *IOInfoService) GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) { + info, err := s.es.LoadEgress(ctx, req.EgressId) + if err != nil { + logger.Errorw("failed to load egress", err) + return nil, err + } + + return info, nil +} + +func (s *IOInfoService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { + var items []*livekit.EgressInfo + if req.EgressId != "" { + info, err := s.es.LoadEgress(ctx, req.EgressId) + if err != nil { + logger.Errorw("failed to load egress", err) + return nil, err + } + + if !req.Active || int32(info.Status) < int32(livekit.EgressStatus_EGRESS_COMPLETE) { + items = []*livekit.EgressInfo{info} + } + } else { + var err error + items, err = s.es.ListEgress(ctx, livekit.RoomName(req.RoomName), req.Active) + if err != nil { + logger.Errorw("failed to list egress", err) + return nil, err + } + } + + return &livekit.ListEgressResponse{Items: items}, nil +} + func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) { info, err := s.loadIngressFromInfoRequest(req) if err != nil { @@ -128,7 +175,7 @@ func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateI return nil, err } - if err := s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { + if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { logger.Errorw("could not update ingress", err) return nil, err } @@ -176,3 +223,33 @@ func (s *IOInfoService) Stop() { s.ioServer.Shutdown() } } + +// deprecated +func (s *IOInfoService) UpdateEgressInfo(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + err := s.es.UpdateEgress(ctx, info) + + switch info.Status { + case livekit.EgressStatus_EGRESS_ACTIVE: + s.telemetry.EgressUpdated(ctx, info) + + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED, + livekit.EgressStatus_EGRESS_LIMIT_REACHED: + + // log results + if info.Error != "" { + logger.Errorw("egress failed", errors.New(info.Error), "egressID", info.EgressId) + } else { + logger.Infow("egress ended", "egressID", info.EgressId) + } + + s.telemetry.EgressEnded(ctx, info) + } + if err != nil { + logger.Errorw("could not update egress", err) + return nil, err + } + + return &emptypb.Empty{}, nil +} diff --git a/pkg/service/servicefakes/fake_ioclient.go b/pkg/service/servicefakes/fake_ioclient.go new file mode 100644 index 000000000..5d440cbb9 --- /dev/null +++ b/pkg/service/servicefakes/fake_ioclient.go @@ -0,0 +1,284 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package servicefakes + +import ( + "context" + "sync" + + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +type FakeIOClient struct { + CreateEgressStub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error) + createEgressMutex sync.RWMutex + createEgressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.EgressInfo + } + createEgressReturns struct { + result1 *emptypb.Empty + result2 error + } + createEgressReturnsOnCall map[int]struct { + result1 *emptypb.Empty + result2 error + } + GetEgressStub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error) + getEgressMutex sync.RWMutex + getEgressArgsForCall []struct { + arg1 context.Context + arg2 *rpc.GetEgressRequest + } + getEgressReturns struct { + result1 *livekit.EgressInfo + result2 error + } + getEgressReturnsOnCall map[int]struct { + result1 *livekit.EgressInfo + result2 error + } + ListEgressStub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) + listEgressMutex sync.RWMutex + listEgressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.ListEgressRequest + } + listEgressReturns struct { + result1 *livekit.ListEgressResponse + result2 error + } + listEgressReturnsOnCall map[int]struct { + result1 *livekit.ListEgressResponse + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeIOClient) CreateEgress(arg1 context.Context, arg2 *livekit.EgressInfo) (*emptypb.Empty, error) { + fake.createEgressMutex.Lock() + ret, specificReturn := fake.createEgressReturnsOnCall[len(fake.createEgressArgsForCall)] + fake.createEgressArgsForCall = append(fake.createEgressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.EgressInfo + }{arg1, arg2}) + stub := fake.CreateEgressStub + fakeReturns := fake.createEgressReturns + fake.recordInvocation("CreateEgress", []interface{}{arg1, arg2}) + fake.createEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) CreateEgressCallCount() int { + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + return len(fake.createEgressArgsForCall) +} + +func (fake *FakeIOClient) CreateEgressCalls(stub func(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error)) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = stub +} + +func (fake *FakeIOClient) CreateEgressArgsForCall(i int) (context.Context, *livekit.EgressInfo) { + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + argsForCall := fake.createEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) CreateEgressReturns(result1 *emptypb.Empty, result2 error) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = nil + fake.createEgressReturns = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) CreateEgressReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) { + fake.createEgressMutex.Lock() + defer fake.createEgressMutex.Unlock() + fake.CreateEgressStub = nil + if fake.createEgressReturnsOnCall == nil { + fake.createEgressReturnsOnCall = make(map[int]struct { + result1 *emptypb.Empty + result2 error + }) + } + fake.createEgressReturnsOnCall[i] = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) GetEgress(arg1 context.Context, arg2 *rpc.GetEgressRequest) (*livekit.EgressInfo, error) { + fake.getEgressMutex.Lock() + ret, specificReturn := fake.getEgressReturnsOnCall[len(fake.getEgressArgsForCall)] + fake.getEgressArgsForCall = append(fake.getEgressArgsForCall, struct { + arg1 context.Context + arg2 *rpc.GetEgressRequest + }{arg1, arg2}) + stub := fake.GetEgressStub + fakeReturns := fake.getEgressReturns + fake.recordInvocation("GetEgress", []interface{}{arg1, arg2}) + fake.getEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) GetEgressCallCount() int { + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + return len(fake.getEgressArgsForCall) +} + +func (fake *FakeIOClient) GetEgressCalls(stub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error)) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = stub +} + +func (fake *FakeIOClient) GetEgressArgsForCall(i int) (context.Context, *rpc.GetEgressRequest) { + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + argsForCall := fake.getEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) GetEgressReturns(result1 *livekit.EgressInfo, result2 error) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = nil + fake.getEgressReturns = struct { + result1 *livekit.EgressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) GetEgressReturnsOnCall(i int, result1 *livekit.EgressInfo, result2 error) { + fake.getEgressMutex.Lock() + defer fake.getEgressMutex.Unlock() + fake.GetEgressStub = nil + if fake.getEgressReturnsOnCall == nil { + fake.getEgressReturnsOnCall = make(map[int]struct { + result1 *livekit.EgressInfo + result2 error + }) + } + fake.getEgressReturnsOnCall[i] = struct { + result1 *livekit.EgressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) ListEgress(arg1 context.Context, arg2 *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { + fake.listEgressMutex.Lock() + ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)] + fake.listEgressArgsForCall = append(fake.listEgressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.ListEgressRequest + }{arg1, arg2}) + stub := fake.ListEgressStub + fakeReturns := fake.listEgressReturns + fake.recordInvocation("ListEgress", []interface{}{arg1, arg2}) + fake.listEgressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) ListEgressCallCount() int { + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + return len(fake.listEgressArgsForCall) +} + +func (fake *FakeIOClient) ListEgressCalls(stub func(context.Context, *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = stub +} + +func (fake *FakeIOClient) ListEgressArgsForCall(i int) (context.Context, *livekit.ListEgressRequest) { + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + argsForCall := fake.listEgressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) ListEgressReturns(result1 *livekit.ListEgressResponse, result2 error) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = nil + fake.listEgressReturns = struct { + result1 *livekit.ListEgressResponse + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) ListEgressReturnsOnCall(i int, result1 *livekit.ListEgressResponse, result2 error) { + fake.listEgressMutex.Lock() + defer fake.listEgressMutex.Unlock() + fake.ListEgressStub = nil + if fake.listEgressReturnsOnCall == nil { + fake.listEgressReturnsOnCall = make(map[int]struct { + result1 *livekit.ListEgressResponse + result2 error + }) + } + fake.listEgressReturnsOnCall[i] = struct { + result1 *livekit.ListEgressResponse + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createEgressMutex.RLock() + defer fake.createEgressMutex.RUnlock() + fake.getEgressMutex.RLock() + defer fake.getEgressMutex.RUnlock() + fake.listEgressMutex.RLock() + defer fake.listEgressMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeIOClient) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ service.IOClient = new(FakeIOClient) diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 9d8fbf394..05502a16c 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -58,6 +58,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetry.NewTelemetryService, getMessageBus, NewIOInfoService, + wire.Bind(new(IOClient), new(*IOInfoService)), rpc.NewEgressClient, getEgressStore, NewEgressLauncher, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 942d693fc..fa788a829 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -57,6 +57,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } egressStore := getEgressStore(objectStore) + ingressStore := getIngressStore(objectStore) keyProvider, err := createKeyProvider(conf) if err != nil { return nil, err @@ -67,23 +68,22 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) - rtcEgressLauncher := NewEgressLauncher(egressClient, egressStore, telemetryService) + ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService) + if err != nil { + return nil, err + } + rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) roomService, err := NewRoomService(roomConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher) if err != nil { return nil, err } - egressService := NewEgressService(egressClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) + egressService := NewEgressService(egressClient, objectStore, ioInfoService, roomService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(nodeID, messageBus) if err != nil { return nil, err } - ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) - ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService) - if err != nil { - return nil, err - } rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() From b290c233ea5e11b273156ebd9e4ba91845eed7a0 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 18 Oct 2023 23:18:53 -0700 Subject: [PATCH 3/6] fix CreateEgress not completing (#2156) --- pkg/service/egress.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/service/egress.go b/pkg/service/egress.go index d6698c7e0..0a77a231f 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -25,6 +25,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" ) @@ -207,10 +208,10 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId return nil, err } - // TODO: remove - go func() { - _, _ = s.io.CreateEgress(ctx, info) - }() + _, err = s.io.CreateEgress(ctx, info) + if err != nil { + logger.Errorw("failed to create egress", err) + } return info, nil } From 37f91ae89298a0fa6806d6bcd30c9e648026ac8d Mon Sep 17 00:00:00 2001 From: Ya Hui Liang <46517115@qq.com> Date: Thu, 19 Oct 2023 14:21:21 +0800 Subject: [PATCH 4/6] update version of golang (#2148) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7bdc9863e..c05377095 100644 --- a/README.md +++ b/README.md @@ -272,7 +272,7 @@ Read our [deployment docs](https://docs.livekit.io/deploy/) for more information Pre-requisites: -- Go 1.18+ is installed +- Go 1.20+ is installed - GOPATH/bin is in your PATH Then run From f653efcf10c521e5d4a23e44988ecb1b54a28f3d Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 19 Oct 2023 12:01:48 +0530 Subject: [PATCH 5/6] Do not update highest time on padding packet. (#2157) * Error log of padding updating highest time to get backtrace. * Do not update highest time on padding packet. Padding packets use time stamp of last packet sent. Padding packets could be sent when probing much after last packet was sent. Updating highest time on that screws up sender report calculations. We have ways of making sure sender reports do not get too out-of-whack, but it logs during that repair. That repair should be unnecessary unless the source is behaving weird (things like publisher sending all packets at the same time, publisher sample rate is incorrect, etc.) --- pkg/sfu/buffer/rtpstats_sender.go | 45 +++++++++++++------------------ pkg/sfu/rtpmunger.go | 20 +++++++++++--- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index c6cd98dd2..b1c4c5df9 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -327,17 +327,6 @@ func (r *RTPStatsSender) Update( r.extStartSN = extSequenceNumber } - if extTimestamp < r.extStartTS { - r.logger.Infow( - "adjusting start timestamp", - "snBefore", r.extStartSN, - "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, - "tsAfter", extTimestamp, - ) - r.extStartTS = extTimestamp - } - if gapSN != 0 { r.packetsOutOfOrder++ } @@ -377,23 +366,27 @@ func (r *RTPStatsSender) Update( r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, false) - if extTimestamp != r.extHighestTS { - // update only on first packet as same timestamp could be in multiple packets. - // NOTE: this may not be the first packet with this time stamp if there is packet loss. - if payloadSize == 0 { - r.logger.Infow( - "updating highest time on padding packet", - "extSequenceNumber", extSequenceNumber, - "extHighestSN", r.extHighestSN, - "extTimestamp", extTimestamp, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), - "packetTime", packetTime.String(), - ) - } + r.extHighestSN = extSequenceNumber + } + + if extTimestamp < r.extStartTS { + r.logger.Infow( + "adjusting start timestamp", + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + ) + r.extStartTS = extTimestamp + } + + if extTimestamp > r.extHighestTS { + // update only on first packet as same timestamp could be in multiple packets. + // NOTE: this may not be the first packet with this time stamp if there is packet loss. + if payloadSize > 0 { + // skip updating on padding only packets as they could re-use an old timestamp r.highestTime = packetTime } - r.extHighestSN = extSequenceNumber r.extHighestTS = extTimestamp } diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 8cfb4be27..31c415f12 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -54,10 +54,11 @@ type RTPMungerState struct { ExtLastSN uint64 ExtSecondLastSN uint64 ExtLastTS uint64 + ExtSecondLastTS uint64 } func (r RTPMungerState) String() string { - return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS) + return fmt.Sprintf("RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d)", r.ExtLastSN, r.ExtSecondLastSN, r.ExtLastTS, r.ExtSecondLastTS) } // ---------------------------------------------------------------------- @@ -72,8 +73,9 @@ type RTPMunger struct { extSecondLastSN uint64 snOffset uint64 - extLastTS uint64 - tsOffset uint64 + extLastTS uint64 + extSecondLastTS uint64 + tsOffset uint64 lastMarker bool @@ -95,6 +97,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { "ExtSecondLastSN": r.extSecondLastSN, "SNOffset": r.snOffset, "ExtLastTS": r.extLastTS, + "ExtSecondLastTS": r.extSecondLastTS, "TSOffset": r.tsOffset, "LastMarker": r.lastMarker, } @@ -105,6 +108,7 @@ func (r *RTPMunger) GetLast() RTPMungerState { ExtLastSN: r.extLastSN, ExtSecondLastSN: r.extSecondLastSN, ExtLastTS: r.extLastTS, + ExtSecondLastTS: r.extSecondLastTS, } } @@ -112,6 +116,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { r.extLastSN = state.ExtLastSN r.extSecondLastSN = state.ExtSecondLastSN r.extLastTS = state.ExtLastTS + r.extSecondLastTS = state.ExtSecondLastTS } func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { @@ -123,6 +128,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.updateSnOffset() r.extLastTS = extPkt.ExtTimestamp + r.extSecondLastTS = extPkt.ExtTimestamp } func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { @@ -156,6 +162,8 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { r.extLastSN = r.extSecondLastSN r.updateSnOffset() + + r.extLastTS = r.extSecondLastTS } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { @@ -174,6 +182,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara r.extSecondLastSN = r.extLastSN r.extLastSN = extMungedSN + r.extSecondLastTS = r.extLastTS r.extLastTS = extMungedTS r.lastMarker = extPkt.Packet.Marker @@ -306,6 +315,11 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate r.snRangeMap.DecValue(r.extHighestIncomingSN, uint64(num)) r.updateSnOffset() + if len(vals) == 1 { + r.extSecondLastTS = r.extLastTS + } else { + r.extSecondLastTS = vals[len(vals)-2].extTimestamp + } r.tsOffset -= extLastTS - r.extLastTS r.extLastTS = extLastTS From 8d0bb526f7e8c8377aab29ff18fe72305b1c625a Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 18 Oct 2023 23:45:23 -0700 Subject: [PATCH 6/6] Update module github.com/pion/interceptor to v0.1.24 (#2155) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 263fb72b3..f9a258872 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 github.com/pion/ice/v2 v2.3.11 - github.com/pion/interceptor v0.1.23 + github.com/pion/interceptor v0.1.24 github.com/pion/rtcp v1.2.10 github.com/pion/rtp v1.8.2 github.com/pion/sctp v1.8.9 diff --git a/go.sum b/go.sum index 2a16e832b..b1d7080f8 100644 --- a/go.sum +++ b/go.sum @@ -188,8 +188,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= -github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0= -github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= +github.com/pion/interceptor v0.1.24 h1:lN4ua3yUAJCgNKQKcZIM52wFjBgjN0r7shLj91PkJ0c= +github.com/pion/interceptor v0.1.24/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI=