From 7aa5888338d6564c68f6977958dfa484b85ec615 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 28 Jul 2022 20:27:59 -0500 Subject: [PATCH] Keep egress info for 24h (#856) * Keep egress info for 24h * updates * better version comparison * fix test * duplicated if --- go.mod | 5 +- go.sum | 10 +- pkg/service/egress.go | 165 +++++----- pkg/service/interfaces.go | 3 +- pkg/service/localstore.go | 46 --- pkg/service/redisstore.go | 284 +++++++++++++----- pkg/service/redisstore_test.go | 98 +++++- pkg/service/server.go | 4 +- pkg/service/servicefakes/fake_egress_store.go | 88 +----- pkg/service/wire.go | 2 - pkg/service/wire_gen.go | 5 +- 11 files changed, 426 insertions(+), 284 deletions(-) diff --git a/go.mod b/go.mod index 73f38661d..62e41acc6 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,9 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 + github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9 + github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.13.0 @@ -45,7 +46,7 @@ require ( go.uber.org/atomic v1.9.0 go.uber.org/zap v1.21.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - google.golang.org/protobuf v1.28.0 + google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 94987a22a..ac3c9a612 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= @@ -235,8 +237,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9 h1:e12j1EyiiTG56Ag44fwpVtnYQ6MVgLv4bYYI0nTgxZY= -github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9/go.mod h1:Qd/Dn4BkJfZQy/IjtEeUOGXARrR7l09WDkg5SY8thkw= +github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a h1:tRioM9WNDjxGryt03ROYa8zq17J0MqHftCLr8Ex4dM0= +github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a/go.mod h1:vGQzKUaSYC92o5y7EbnhosgpoLWK9a3PneyYkGOGL0o= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= @@ -755,8 +757,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 8be5bd220..4f1bd13c0 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "time" "google.golang.org/protobuf/proto" @@ -37,14 +38,20 @@ func NewEgressService( es: es, roomService: rs, telemetry: ts, - shutdown: make(chan struct{}), } } -func (s *EgressService) Start() { - if s.rpcClient != nil { - go s.updateWorker() +func (s *EgressService) Start() error { + if s.shutdown != nil { + return nil } + + s.shutdown = make(chan struct{}) + if s.rpcClient != nil && s.es != nil { + return s.startWorker() + } + + return nil } func (s *EgressService) Stop() { @@ -94,6 +101,8 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa return nil, err } + ensureRoomName(info) + s.telemetry.EgressStarted(ctx, info) go func() { if err := s.es.StoreEgress(ctx, info); err != nil { @@ -121,17 +130,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay return nil, err } - var roomName string - switch r := info.Request.(type) { - case *livekit.EgressInfo_RoomComposite: - roomName = r.RoomComposite.RoomName - case *livekit.EgressInfo_TrackComposite: - roomName = r.TrackComposite.RoomName - case *livekit.EgressInfo_Track: - roomName = r.Track.RoomName - default: - return nil, ErrRoomNotFound - } + ensureRoomName(info) metadata, err := json.Marshal(&LayoutMetadata{Layout: req.Layout}) if err != nil { @@ -139,11 +138,11 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay } grants := GetGrants(ctx) - grants.Video.Room = roomName + grants.Video.Room = info.RoomName grants.Video.RoomAdmin = true _, err = s.roomService.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{ - Room: roomName, + Room: info.RoomName, Identity: info.EgressId, Metadata: string(metadata), }) @@ -172,6 +171,8 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr return nil, err } + ensureRoomName(info) + go func() { if err := s.es.UpdateEgress(ctx, info); err != nil { logger.Errorw("could not write egress info", err) @@ -189,19 +190,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR return nil, ErrEgressNotConnected } - var roomID livekit.RoomID - if req.RoomName != "" { - room, err := s.store.LoadRoom(ctx, livekit.RoomName(req.RoomName)) - if err != nil { - if err == ErrRoomNotFound { - return &livekit.ListEgressResponse{}, nil - } - return nil, err - } - roomID = livekit.RoomID(room.Sid) - } - - infos, err := s.es.ListEgress(ctx, roomID) + infos, err := s.es.ListEgress(ctx, livekit.RoomName(req.RoomName)) if err != nil { return nil, err } @@ -227,6 +216,8 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR return nil, err } + ensureRoomName(info) + go func() { if err := s.es.UpdateEgress(ctx, info); err != nil { logger.Errorw("could not write egress info", err) @@ -236,58 +227,86 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR return info, nil } -func (s *EgressService) updateWorker() { +func (s *EgressService) startWorker() error { + rs := s.es.(*RedisStore) + if err := rs.Start(); err != nil { + logger.Errorw("failed to start redis egress worker", err) + return err + } + sub, err := s.rpcClient.GetUpdateChannel(context.Background()) if err != nil { logger.Errorw("failed to subscribe to results channel", err) - return + return err } - resChan := sub.Channel() - for { - select { - case msg := <-resChan: - b := sub.Payload(msg) + go func() { + resChan := sub.Channel() + for { + select { + case msg := <-resChan: + b := sub.Payload(msg) - res := &livekit.EgressInfo{} - if err = proto.Unmarshal(b, res); err != nil { - logger.Errorw("failed to read results", err) - continue + res := &livekit.EgressInfo{} + if err = proto.Unmarshal(b, res); err != nil { + logger.Errorw("failed to read results", err) + continue + } + + ensureRoomName(res) + + switch res.Status { + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED: + + // make sure endedAt is set so it eventually gets deleted + if res.EndedAt == 0 { + res.EndedAt = time.Now().UnixNano() + } + + err = s.es.UpdateEgress(context.Background(), res) + if err != nil { + logger.Errorw("could not update egress", err) + } + + // log results + if res.Error != "" { + logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId) + } else { + logger.Infow("egress ended", "egressID", res.EgressId) + } + + s.telemetry.EgressEnded(context.Background(), res) + + default: + err = s.es.UpdateEgress(context.Background(), res) + if err != nil { + logger.Errorw("could not update egress", err) + } + } + + case <-s.shutdown: + _ = sub.Close() + rs.Stop() + return } + } + }() - switch res.Status { - case livekit.EgressStatus_EGRESS_ACTIVE, - livekit.EgressStatus_EGRESS_ENDING: + return nil +} - // save updated info to store - err = s.es.UpdateEgress(context.Background(), res) - if err != nil { - logger.Errorw("could not update egress", err) - } - - case livekit.EgressStatus_EGRESS_COMPLETE, - livekit.EgressStatus_EGRESS_FAILED, - livekit.EgressStatus_EGRESS_ABORTED: - - // delete from store - err = s.es.DeleteEgress(context.Background(), res) - if err != nil { - logger.Errorw("could not delete egress from store", err) - } - - // log results - if res.Error != "" { - logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId) - } else { - logger.Infow("egress ended", "egressID", res.EgressId) - } - - s.telemetry.EgressEnded(context.Background(), res) - } - - case <-s.shutdown: - _ = sub.Close() - return +// Ensure compatibility with Egress <= v1.0.5 +func ensureRoomName(info *livekit.EgressInfo) { + if info.RoomName == "" { + switch r := info.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + info.RoomName = r.RoomComposite.RoomName + case *livekit.EgressInfo_TrackComposite: + info.RoomName = r.TrackComposite.RoomName + case *livekit.EgressInfo_Track: + info.RoomName = r.Track.RoomName } } } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index efbf3c804..80a7e0b6a 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -41,9 +41,8 @@ type ServiceStore interface { type EgressStore interface { StoreEgress(ctx context.Context, info *livekit.EgressInfo) error LoadEgress(ctx context.Context, egressID string) (*livekit.EgressInfo, error) - ListEgress(ctx context.Context, roomID livekit.RoomID) ([]*livekit.EgressInfo, error) + ListEgress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error - DeleteEgress(ctx context.Context, info *livekit.EgressInfo) error } //counterfeiter:generate . IngressStore diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index ff5c26986..60037c584 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -144,49 +144,3 @@ func (s *LocalStore) DeleteParticipant(_ context.Context, roomName livekit.RoomN } return nil } - -// redis is required for egress -func (s *LocalStore) StoreEgress(_ context.Context, _ *livekit.EgressInfo) error { - return ErrEgressNotConnected -} - -func (s *LocalStore) LoadEgress(_ context.Context, _ string) (*livekit.EgressInfo, error) { - return nil, ErrEgressNotConnected -} - -func (s *LocalStore) ListEgress(_ context.Context, _ livekit.RoomID) ([]*livekit.EgressInfo, error) { - return nil, ErrEgressNotConnected -} - -func (s *LocalStore) UpdateEgress(_ context.Context, _ *livekit.EgressInfo) error { - return ErrEgressNotConnected -} - -func (s *LocalStore) DeleteEgress(_ context.Context, _ *livekit.EgressInfo) error { - return ErrEgressNotConnected -} - -// redis is required for ingress -func (s *LocalStore) StoreIngress(_ context.Context, _ *livekit.IngressInfo) error { - return ErrIngressNotConnected -} - -func (s *LocalStore) LoadIngress(_ context.Context, _ string) (*livekit.IngressInfo, error) { - return nil, ErrIngressNotConnected -} - -func (s *LocalStore) LoadIngressFromStreamKey(_ context.Context, _ string) (*livekit.IngressInfo, error) { - return nil, ErrIngressNotConnected -} - -func (s *LocalStore) ListIngress(_ context.Context, _ livekit.RoomName) ([]*livekit.IngressInfo, error) { - return nil, ErrIngressNotConnected -} - -func (s *LocalStore) UpdateIngress(_ context.Context, _ *livekit.IngressInfo) error { - return ErrIngressNotConnected -} - -func (s *LocalStore) DeleteIngress(_ context.Context, _ *livekit.IngressInfo) error { - return ErrIngressNotConnected -} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 10fd41a8d..b25c9c861 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -2,23 +2,33 @@ package service import ( "context" + "fmt" + "strconv" + "strings" "time" "github.com/go-redis/redis/v8" + goversion "github.com/hashicorp/go-version" "github.com/pkg/errors" "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) const ( + VersionKey = "livekit_version" + // RoomsKey is hash of room_name => Room proto RoomsKey = "rooms" // EgressKey is a hash of egressID => egress info - EgressKey = "egress" - RoomEgressPrefix = "room_egress:" + EgressKey = "egress" + EndedEgressKey = "ended_egress" + RoomEgressPrefix = "egress:room:" + DeprecatedRoomEgressPrefix = "room_egress:" // IngressKey is a hash of ingressID => ingress info IngressKey = "ingress" @@ -35,8 +45,9 @@ const ( ) type RedisStore struct { - rc *redis.Client - ctx context.Context + rc *redis.Client + ctx context.Context + done chan struct{} } func NewRedisStore(rc *redis.Client) *RedisStore { @@ -46,6 +57,44 @@ func NewRedisStore(rc *redis.Client) *RedisStore { } } +func (s *RedisStore) Start() error { + if s.done != nil { + return nil + } + + s.done = make(chan struct{}, 1) + current, err := s.rc.Get(s.ctx, VersionKey).Result() + if err != nil && err != redis.Nil { + return err + } + if current == "" { + current = "0.0.0" + } + + v, _ := goversion.NewVersion(current) + migrateEgress, _ := goversion.NewVersion("1.1.3") + if v.LessThan(migrateEgress) { + if _, err = s.MigrateEgressInfo(); err != nil { + return err + } + + if err = s.rc.Set(s.ctx, VersionKey, version.Version, 0).Err(); err != nil { + return err + } + } + + go s.egressWorker() + return nil +} + +func (s *RedisStore) Stop() { + select { + case <-s.done: + default: + close(s.done) + } +} + func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() @@ -233,11 +282,10 @@ func (s *RedisStore) StoreEgress(_ context.Context, info *livekit.EgressInfo) er return err } - pp := s.rc.Pipeline() - pp.HSet(s.ctx, EgressKey, info.EgressId, data) - pp.SAdd(s.ctx, RoomEgressPrefix+info.RoomId, info.EgressId) - - if _, err = pp.Exec(s.ctx); err != nil { + tx := s.rc.TxPipeline() + tx.HSet(s.ctx, EgressKey, info.EgressId, data) + tx.SAdd(s.ctx, RoomEgressPrefix+info.RoomName, info.EgressId) + if _, err = tx.Exec(s.ctx); err != nil { return errors.Wrap(err, "could not store egress info") } @@ -246,25 +294,27 @@ func (s *RedisStore) StoreEgress(_ context.Context, info *livekit.EgressInfo) er func (s *RedisStore) LoadEgress(_ context.Context, egressID string) (*livekit.EgressInfo, error) { data, err := s.rc.HGet(s.ctx, EgressKey, egressID).Result() - if err != nil { - if err == redis.Nil { - return nil, ErrEgressNotFound + switch err { + case nil: + info := &livekit.EgressInfo{} + err = proto.Unmarshal([]byte(data), info) + if err != nil { + return nil, err } - return nil, err - } + return info, nil - info := &livekit.EgressInfo{} - err = proto.Unmarshal([]byte(data), info) - if err != nil { + case redis.Nil: + return nil, ErrEgressNotFound + + default: return nil, err } - return info, nil } -func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*livekit.EgressInfo, error) { +func (s *RedisStore) ListEgress(_ context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error) { var infos []*livekit.EgressInfo - if roomID == "" { + if roomName == "" { data, err := s.rc.HGetAll(s.ctx, EgressKey).Result() if err != nil { if err == redis.Nil { @@ -282,7 +332,7 @@ func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*li infos = append(infos, info) } } else { - ids, err := s.rc.SMembers(s.ctx, RoomEgressPrefix+string(roomID)).Result() + egressIDs, err := s.rc.SMembers(s.ctx, RoomEgressPrefix+string(roomName)).Result() if err != nil { if err == redis.Nil { return nil, nil @@ -290,7 +340,7 @@ func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*li return nil, err } - data, _ := s.rc.HMGet(s.ctx, EgressKey, ids...).Result() + data, _ := s.rc.HMGet(s.ctx, EgressKey, egressIDs...).Result() for _, d := range data { if d == nil { continue @@ -313,23 +363,79 @@ func (s *RedisStore) UpdateEgress(_ context.Context, info *livekit.EgressInfo) e return err } - pp := s.rc.Pipeline() - pp.HSet(s.ctx, EgressKey, info.EgressId, data) + if info.EndedAt != 0 { + tx := s.rc.TxPipeline() + tx.HSet(s.ctx, EgressKey, info.EgressId, data) + tx.HSet(s.ctx, EndedEgressKey, info.EgressId, egressEndedValue(info.RoomName, info.EndedAt)) + _, err = tx.Exec(s.ctx) + } else { + err = s.rc.HSet(s.ctx, EgressKey, info.EgressId, data).Err() + } - if _, err = pp.Exec(s.ctx); err != nil { - return errors.Wrap(err, "could not store egress info") + if err != nil { + return errors.Wrap(err, "could not update egress info") } return nil } -func (s *RedisStore) DeleteEgress(_ context.Context, info *livekit.EgressInfo) error { - err := s.rc.SRem(s.ctx, RoomEgressPrefix+info.RoomId, info.EgressId).Err() - if err != nil { +// Deletes egress info 24h after the egress has ended +func (s *RedisStore) egressWorker() { + ticker := time.NewTicker(time.Minute * 30) + for { + select { + case <-s.done: + return + case <-ticker.C: + err := s.CleanEndedEgress() + if err != nil { + logger.Errorw("could not clean egress info", err) + } + } + } +} + +func (s RedisStore) CleanEndedEgress() error { + values, err := s.rc.HGetAll(s.ctx, EndedEgressKey).Result() + if err != nil && err != redis.Nil { return err } - return s.rc.HDel(s.ctx, EgressKey, info.EgressId).Err() + expiry := time.Now().Add(-24 * time.Hour).UnixNano() + for egressID, val := range values { + roomName, endedAt, err := parseEgressEnded(val) + if err != nil { + return err + } + + if endedAt < expiry { + tx := s.rc.TxPipeline() + tx.HDel(s.ctx, EndedEgressKey, egressID) + tx.SRem(s.ctx, RoomEgressPrefix+roomName, egressID) + tx.HDel(s.ctx, EgressKey, egressID) + if _, err := tx.Exec(s.ctx); err != nil { + return err + } + } + } + + return nil +} + +func egressEndedValue(roomName string, endedAt int64) string { + return fmt.Sprintf("%s|%d", roomName, endedAt) +} + +func parseEgressEnded(value string) (roomName string, endedAt int64, err error) { + s := strings.Split(value, "|") + if len(s) != 2 { + err = errors.New("invalid egressEnded value") + return + } + + roomName = s[0] + endedAt, err = strconv.ParseInt(s[1], 10, 64) + return } func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error { @@ -359,16 +465,16 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) return err } - results, err := tx.TxPipelined(s.ctx, func(pipe redis.Pipeliner) error { - pipe.HSet(s.ctx, IngressKey, info.IngressId, data) - pipe.HSet(s.ctx, StreamKeyKey, info.IngressId, info.StreamKey) + results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error { + p.HSet(s.ctx, IngressKey, info.IngressId, data) + p.HSet(s.ctx, StreamKeyKey, info.IngressId, info.StreamKey) if oldRoom != info.RoomName { if oldRoom != "" { - pipe.SRem(s.ctx, RoomIngressPrefix+oldRoom, info.IngressId) + p.SRem(s.ctx, RoomIngressPrefix+oldRoom, info.IngressId) } if info.RoomName != "" { - pipe.SAdd(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) + p.SAdd(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) } } @@ -408,20 +514,21 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) func (s *RedisStore) loadIngress(c redis.Cmdable, ingressId string) (*livekit.IngressInfo, error) { data, err := c.HGet(s.ctx, IngressKey, ingressId).Result() - if err != nil { - if err == redis.Nil { - return nil, ErrIngressNotFound + switch err { + case nil: + info := &livekit.IngressInfo{} + err = proto.Unmarshal([]byte(data), info) + if err != nil { + return nil, err } + return info, nil + + case redis.Nil: + return nil, ErrIngressNotFound + + default: return nil, err } - - info := &livekit.IngressInfo{} - err = proto.Unmarshal([]byte(data), info) - if err != nil { - return nil, err - } - - return info, nil } func (s *RedisStore) LoadIngress(_ context.Context, ingressId string) (*livekit.IngressInfo, error) { @@ -429,16 +536,17 @@ func (s *RedisStore) LoadIngress(_ context.Context, ingressId string) (*livekit. } func (s *RedisStore) LoadIngressFromStreamKey(_ context.Context, streamKey string) (*livekit.IngressInfo, error) { - ingressId, err := s.rc.HGet(s.ctx, StreamKeyKey, streamKey).Result() + ingressID, err := s.rc.HGet(s.ctx, StreamKeyKey, streamKey).Result() switch err { case nil: + return s.loadIngress(s.rc, ingressID) + case redis.Nil: return nil, ErrIngressNotFound + default: return nil, err } - - return s.loadIngress(s.rc, ingressId) } func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ([]*livekit.IngressInfo, error) { @@ -462,7 +570,7 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ( infos = append(infos, info) } } else { - ids, err := s.rc.SMembers(s.ctx, RoomIngressPrefix+string(roomName)).Result() + ingressIDs, err := s.rc.SMembers(s.ctx, RoomIngressPrefix+string(roomName)).Result() if err != nil { if err == redis.Nil { return nil, nil @@ -470,7 +578,7 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ( return nil, err } - data, _ := s.rc.HMGet(s.ctx, IngressKey, ids...).Result() + data, _ := s.rc.HMGet(s.ctx, IngressKey, ingressIDs...).Result() for _, d := range data { if d == nil { continue @@ -487,25 +595,67 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ( return infos, nil } -func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error { - return s.StoreIngress(ctx, info) +func (s *RedisStore) UpdateIngress(_ context.Context, info *livekit.IngressInfo) error { + return s.StoreIngress(s.ctx, info) } -func (s *RedisStore) DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error { - err := s.rc.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId).Err() - if err != nil { - return err - } - - err = s.rc.HDel(s.ctx, StreamKeyKey, info.IngressId).Err() - if err != nil { - return err - } - - err = s.rc.HDel(s.ctx, EgressKey, info.IngressId).Err() - if err != nil { - return err +func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error { + tx := s.rc.TxPipeline() + tx.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) + tx.HDel(s.ctx, StreamKeyKey, info.IngressId) + tx.HDel(s.ctx, IngressKey, info.IngressId) + if _, err := tx.Exec(s.ctx); err != nil { + return errors.Wrap(err, "could not delete ingress info") } return nil } + +// Migration to LiveKit >= v1.1.3 +func (s *RedisStore) MigrateEgressInfo() (int, error) { + locked, err := s.rc.SetNX(s.ctx, "egress-migration", utils.NewGuid("LOCK"), time.Minute).Result() + if err != nil { + return 0, err + } else if !locked { + return 0, nil + } + + it := s.rc.Scan(s.ctx, 0, DeprecatedRoomEgressPrefix+"*", 0).Iterator() + migrated := 0 + for it.Next(s.ctx) { + migrated++ + key := it.Val() + egressIDs, err := s.rc.SMembers(s.ctx, key).Result() + if err != nil && err != redis.Nil { + return migrated, err + } + + for _, egressID := range egressIDs { + info, err := s.LoadEgress(s.ctx, egressID) + if err != nil { + return migrated, err + } + + var roomName string + switch req := info.Request.(type) { + case *livekit.EgressInfo_RoomComposite: + roomName = req.RoomComposite.RoomName + case *livekit.EgressInfo_TrackComposite: + roomName = req.TrackComposite.RoomName + case *livekit.EgressInfo_Track: + roomName = req.Track.RoomName + } + + tx := s.rc.TxPipeline() + tx.SAdd(s.ctx, RoomEgressPrefix+roomName, egressID) + tx.SRem(s.ctx, key, egressID) + if _, err = tx.Exec(s.ctx); err != nil { + return migrated, err + } + } + + s.rc.Del(s.ctx, key) + } + + return migrated, nil +} diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index 951b54f02..17c689042 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -8,8 +8,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/service" ) @@ -111,7 +113,101 @@ func TestRoomLock(t *testing.T) { }) } -func TestStoreIngress(t *testing.T) { +func TestEgressStore(t *testing.T) { + ctx := context.Background() + rc := redisClient() + rs := service.NewRedisStore(rc) + + roomName := "egress-test" + + // test migration + info := &livekit.EgressInfo{ + EgressId: utils.NewGuid(utils.EgressPrefix), + RoomId: utils.NewGuid(utils.RoomPrefix), + RoomName: roomName, + Status: livekit.EgressStatus_EGRESS_STARTING, + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{ + RoomName: roomName, + Layout: "speaker-dark", + }, + }, + } + + data, err := proto.Marshal(info) + require.NoError(t, err) + + // store egress info the old way + tx := rc.TxPipeline() + tx.HSet(ctx, service.EgressKey, info.EgressId, data) + tx.SAdd(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId, info.EgressId) + _, err = tx.Exec(ctx) + require.NoError(t, err) + + // run migration + migrated, err := rs.MigrateEgressInfo() + require.NoError(t, err) + require.Equal(t, 1, migrated) + + // check that it was migrated + exists, err := rc.Exists(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId).Result() + require.NoError(t, err) + require.Equal(t, int64(0), exists) + + exists, err = rc.Exists(ctx, service.RoomEgressPrefix+info.RoomName).Result() + require.NoError(t, err) + require.Equal(t, int64(1), exists) + + // load + res, err := rs.LoadEgress(ctx, info.EgressId) + require.NoError(t, err) + require.Equal(t, res.EgressId, info.EgressId) + + // store another + info2 := &livekit.EgressInfo{ + EgressId: utils.NewGuid(utils.EgressPrefix), + RoomId: utils.NewGuid(utils.RoomPrefix), + RoomName: "another-egress-test", + Status: livekit.EgressStatus_EGRESS_STARTING, + Request: &livekit.EgressInfo_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{ + RoomName: "another-egress-test", + Layout: "speaker-dark", + }, + }, + } + require.NoError(t, rs.StoreEgress(ctx, info2)) + + // update + info2.Status = livekit.EgressStatus_EGRESS_COMPLETE + info2.EndedAt = time.Now().Add(-24 * time.Hour).UnixNano() + require.NoError(t, rs.UpdateEgress(ctx, info)) + + // list + list, err := rs.ListEgress(ctx, "") + require.NoError(t, err) + require.Len(t, list, 2) + + // list by room + list, err = rs.ListEgress(ctx, livekit.RoomName(roomName)) + require.NoError(t, err) + require.Len(t, list, 1) + + // update + info.Status = livekit.EgressStatus_EGRESS_COMPLETE + info.EndedAt = time.Now().Add(-24 * time.Hour).UnixNano() + require.NoError(t, rs.UpdateEgress(ctx, info)) + + // clean + require.NoError(t, rs.CleanEndedEgress()) + + // list + list, err = rs.ListEgress(ctx, livekit.RoomName(roomName)) + require.NoError(t, err) + require.Len(t, list, 0) +} + +func TestIngressStore(t *testing.T) { ctx := context.Background() rs := service.NewRedisStore(redisClient()) diff --git a/pkg/service/server.go b/pkg/service/server.go index 28e9bff3d..8f0ba1ea2 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -146,7 +146,9 @@ func (s *LivekitServer) Start() error { return err } - s.egressService.Start() + if err := s.egressService.Start(); err != nil { + return err + } addresses := s.config.BindAddresses if addresses == nil { diff --git a/pkg/service/servicefakes/fake_egress_store.go b/pkg/service/servicefakes/fake_egress_store.go index ee05a5649..06ee8d9cc 100644 --- a/pkg/service/servicefakes/fake_egress_store.go +++ b/pkg/service/servicefakes/fake_egress_store.go @@ -10,23 +10,11 @@ import ( ) type FakeEgressStore struct { - DeleteEgressStub func(context.Context, *livekit.EgressInfo) error - deleteEgressMutex sync.RWMutex - deleteEgressArgsForCall []struct { - arg1 context.Context - arg2 *livekit.EgressInfo - } - deleteEgressReturns struct { - result1 error - } - deleteEgressReturnsOnCall map[int]struct { - result1 error - } - ListEgressStub func(context.Context, livekit.RoomID) ([]*livekit.EgressInfo, error) + ListEgressStub func(context.Context, livekit.RoomName) ([]*livekit.EgressInfo, error) listEgressMutex sync.RWMutex listEgressArgsForCall []struct { arg1 context.Context - arg2 livekit.RoomID + arg2 livekit.RoomName } listEgressReturns struct { result1 []*livekit.EgressInfo @@ -78,74 +66,12 @@ type FakeEgressStore struct { invocationsMutex sync.RWMutex } -func (fake *FakeEgressStore) DeleteEgress(arg1 context.Context, arg2 *livekit.EgressInfo) error { - fake.deleteEgressMutex.Lock() - ret, specificReturn := fake.deleteEgressReturnsOnCall[len(fake.deleteEgressArgsForCall)] - fake.deleteEgressArgsForCall = append(fake.deleteEgressArgsForCall, struct { - arg1 context.Context - arg2 *livekit.EgressInfo - }{arg1, arg2}) - stub := fake.DeleteEgressStub - fakeReturns := fake.deleteEgressReturns - fake.recordInvocation("DeleteEgress", []interface{}{arg1, arg2}) - fake.deleteEgressMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeEgressStore) DeleteEgressCallCount() int { - fake.deleteEgressMutex.RLock() - defer fake.deleteEgressMutex.RUnlock() - return len(fake.deleteEgressArgsForCall) -} - -func (fake *FakeEgressStore) DeleteEgressCalls(stub func(context.Context, *livekit.EgressInfo) error) { - fake.deleteEgressMutex.Lock() - defer fake.deleteEgressMutex.Unlock() - fake.DeleteEgressStub = stub -} - -func (fake *FakeEgressStore) DeleteEgressArgsForCall(i int) (context.Context, *livekit.EgressInfo) { - fake.deleteEgressMutex.RLock() - defer fake.deleteEgressMutex.RUnlock() - argsForCall := fake.deleteEgressArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeEgressStore) DeleteEgressReturns(result1 error) { - fake.deleteEgressMutex.Lock() - defer fake.deleteEgressMutex.Unlock() - fake.DeleteEgressStub = nil - fake.deleteEgressReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeEgressStore) DeleteEgressReturnsOnCall(i int, result1 error) { - fake.deleteEgressMutex.Lock() - defer fake.deleteEgressMutex.Unlock() - fake.DeleteEgressStub = nil - if fake.deleteEgressReturnsOnCall == nil { - fake.deleteEgressReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.deleteEgressReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeEgressStore) ListEgress(arg1 context.Context, arg2 livekit.RoomID) ([]*livekit.EgressInfo, error) { +func (fake *FakeEgressStore) ListEgress(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.EgressInfo, error) { fake.listEgressMutex.Lock() ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)] fake.listEgressArgsForCall = append(fake.listEgressArgsForCall, struct { arg1 context.Context - arg2 livekit.RoomID + arg2 livekit.RoomName }{arg1, arg2}) stub := fake.ListEgressStub fakeReturns := fake.listEgressReturns @@ -166,13 +92,13 @@ func (fake *FakeEgressStore) ListEgressCallCount() int { return len(fake.listEgressArgsForCall) } -func (fake *FakeEgressStore) ListEgressCalls(stub func(context.Context, livekit.RoomID) ([]*livekit.EgressInfo, error)) { +func (fake *FakeEgressStore) ListEgressCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.EgressInfo, error)) { fake.listEgressMutex.Lock() defer fake.listEgressMutex.Unlock() fake.ListEgressStub = stub } -func (fake *FakeEgressStore) ListEgressArgsForCall(i int) (context.Context, livekit.RoomID) { +func (fake *FakeEgressStore) ListEgressArgsForCall(i int) (context.Context, livekit.RoomName) { fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() argsForCall := fake.listEgressArgsForCall[i] @@ -397,8 +323,6 @@ func (fake *FakeEgressStore) UpdateEgressReturnsOnCall(i int, result1 error) { func (fake *FakeEgressStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.deleteEgressMutex.RLock() - defer fake.deleteEgressMutex.RUnlock() fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() fake.loadEgressMutex.RLock() diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 1d1c79b7e..8fa768742 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -170,8 +170,6 @@ func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: return store - case *LocalStore: - return store default: return nil } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 88a1525dc..6d9060b60 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -196,8 +195,6 @@ func getEgressStore(s ObjectStore) EgressStore { switch store := s.(type) { case *RedisStore: return store - case *LocalStore: - return store default: return nil }