Keep egress info for 24h (#856)

* Keep egress info for 24h

* updates

* better version comparison

* fix test

* duplicated if
This commit is contained in:
David Colburn
2022-07-28 20:27:59 -05:00
committed by GitHub
parent 0ab4379c5c
commit 7aa5888338
11 changed files with 426 additions and 284 deletions
+3 -2
View File
@@ -14,8 +14,9 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9
github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.1
github.com/magefile/mage v1.13.0
@@ -45,7 +46,7 @@ require (
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
)
+6 -4
View File
@@ -190,6 +190,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
@@ -235,8 +237,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9 h1:e12j1EyiiTG56Ag44fwpVtnYQ6MVgLv4bYYI0nTgxZY=
github.com/livekit/protocol v0.13.5-0.20220727215941-ac26418a52e9/go.mod h1:Qd/Dn4BkJfZQy/IjtEeUOGXARrR7l09WDkg5SY8thkw=
github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a h1:tRioM9WNDjxGryt03ROYa8zq17J0MqHftCLr8Ex4dM0=
github.com/livekit/protocol v0.13.5-0.20220728214908-67539ebcab2a/go.mod h1:vGQzKUaSYC92o5y7EbnhosgpoLWK9a3PneyYkGOGL0o=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc=
@@ -755,8 +757,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+92 -73
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"time"
"google.golang.org/protobuf/proto"
@@ -37,14 +38,20 @@ func NewEgressService(
es: es,
roomService: rs,
telemetry: ts,
shutdown: make(chan struct{}),
}
}
func (s *EgressService) Start() {
if s.rpcClient != nil {
go s.updateWorker()
func (s *EgressService) Start() error {
if s.shutdown != nil {
return nil
}
s.shutdown = make(chan struct{})
if s.rpcClient != nil && s.es != nil {
return s.startWorker()
}
return nil
}
func (s *EgressService) Stop() {
@@ -94,6 +101,8 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
return nil, err
}
ensureRoomName(info)
s.telemetry.EgressStarted(ctx, info)
go func() {
if err := s.es.StoreEgress(ctx, info); err != nil {
@@ -121,17 +130,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
return nil, err
}
var roomName string
switch r := info.Request.(type) {
case *livekit.EgressInfo_RoomComposite:
roomName = r.RoomComposite.RoomName
case *livekit.EgressInfo_TrackComposite:
roomName = r.TrackComposite.RoomName
case *livekit.EgressInfo_Track:
roomName = r.Track.RoomName
default:
return nil, ErrRoomNotFound
}
ensureRoomName(info)
metadata, err := json.Marshal(&LayoutMetadata{Layout: req.Layout})
if err != nil {
@@ -139,11 +138,11 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
}
grants := GetGrants(ctx)
grants.Video.Room = roomName
grants.Video.Room = info.RoomName
grants.Video.RoomAdmin = true
_, err = s.roomService.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
Room: roomName,
Room: info.RoomName,
Identity: info.EgressId,
Metadata: string(metadata),
})
@@ -172,6 +171,8 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
return nil, err
}
ensureRoomName(info)
go func() {
if err := s.es.UpdateEgress(ctx, info); err != nil {
logger.Errorw("could not write egress info", err)
@@ -189,19 +190,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
return nil, ErrEgressNotConnected
}
var roomID livekit.RoomID
if req.RoomName != "" {
room, err := s.store.LoadRoom(ctx, livekit.RoomName(req.RoomName))
if err != nil {
if err == ErrRoomNotFound {
return &livekit.ListEgressResponse{}, nil
}
return nil, err
}
roomID = livekit.RoomID(room.Sid)
}
infos, err := s.es.ListEgress(ctx, roomID)
infos, err := s.es.ListEgress(ctx, livekit.RoomName(req.RoomName))
if err != nil {
return nil, err
}
@@ -227,6 +216,8 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
return nil, err
}
ensureRoomName(info)
go func() {
if err := s.es.UpdateEgress(ctx, info); err != nil {
logger.Errorw("could not write egress info", err)
@@ -236,58 +227,86 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
return info, nil
}
func (s *EgressService) updateWorker() {
func (s *EgressService) startWorker() error {
rs := s.es.(*RedisStore)
if err := rs.Start(); err != nil {
logger.Errorw("failed to start redis egress worker", err)
return err
}
sub, err := s.rpcClient.GetUpdateChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
return err
}
resChan := sub.Channel()
for {
select {
case msg := <-resChan:
b := sub.Payload(msg)
go func() {
resChan := sub.Channel()
for {
select {
case msg := <-resChan:
b := sub.Payload(msg)
res := &livekit.EgressInfo{}
if err = proto.Unmarshal(b, res); err != nil {
logger.Errorw("failed to read results", err)
continue
res := &livekit.EgressInfo{}
if err = proto.Unmarshal(b, res); err != nil {
logger.Errorw("failed to read results", err)
continue
}
ensureRoomName(res)
switch res.Status {
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED:
// make sure endedAt is set so it eventually gets deleted
if res.EndedAt == 0 {
res.EndedAt = time.Now().UnixNano()
}
err = s.es.UpdateEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not update egress", err)
}
// log results
if res.Error != "" {
logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId)
} else {
logger.Infow("egress ended", "egressID", res.EgressId)
}
s.telemetry.EgressEnded(context.Background(), res)
default:
err = s.es.UpdateEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not update egress", err)
}
}
case <-s.shutdown:
_ = sub.Close()
rs.Stop()
return
}
}
}()
switch res.Status {
case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
return nil
}
// save updated info to store
err = s.es.UpdateEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not update egress", err)
}
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED:
// delete from store
err = s.es.DeleteEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not delete egress from store", err)
}
// log results
if res.Error != "" {
logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId)
} else {
logger.Infow("egress ended", "egressID", res.EgressId)
}
s.telemetry.EgressEnded(context.Background(), res)
}
case <-s.shutdown:
_ = sub.Close()
return
// Ensure compatibility with Egress <= v1.0.5
func ensureRoomName(info *livekit.EgressInfo) {
if info.RoomName == "" {
switch r := info.Request.(type) {
case *livekit.EgressInfo_RoomComposite:
info.RoomName = r.RoomComposite.RoomName
case *livekit.EgressInfo_TrackComposite:
info.RoomName = r.TrackComposite.RoomName
case *livekit.EgressInfo_Track:
info.RoomName = r.Track.RoomName
}
}
}
+1 -2
View File
@@ -41,9 +41,8 @@ type ServiceStore interface {
type EgressStore interface {
StoreEgress(ctx context.Context, info *livekit.EgressInfo) error
LoadEgress(ctx context.Context, egressID string) (*livekit.EgressInfo, error)
ListEgress(ctx context.Context, roomID livekit.RoomID) ([]*livekit.EgressInfo, error)
ListEgress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error)
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
DeleteEgress(ctx context.Context, info *livekit.EgressInfo) error
}
//counterfeiter:generate . IngressStore
-46
View File
@@ -144,49 +144,3 @@ func (s *LocalStore) DeleteParticipant(_ context.Context, roomName livekit.RoomN
}
return nil
}
// redis is required for egress
func (s *LocalStore) StoreEgress(_ context.Context, _ *livekit.EgressInfo) error {
return ErrEgressNotConnected
}
func (s *LocalStore) LoadEgress(_ context.Context, _ string) (*livekit.EgressInfo, error) {
return nil, ErrEgressNotConnected
}
func (s *LocalStore) ListEgress(_ context.Context, _ livekit.RoomID) ([]*livekit.EgressInfo, error) {
return nil, ErrEgressNotConnected
}
func (s *LocalStore) UpdateEgress(_ context.Context, _ *livekit.EgressInfo) error {
return ErrEgressNotConnected
}
func (s *LocalStore) DeleteEgress(_ context.Context, _ *livekit.EgressInfo) error {
return ErrEgressNotConnected
}
// redis is required for ingress
func (s *LocalStore) StoreIngress(_ context.Context, _ *livekit.IngressInfo) error {
return ErrIngressNotConnected
}
func (s *LocalStore) LoadIngress(_ context.Context, _ string) (*livekit.IngressInfo, error) {
return nil, ErrIngressNotConnected
}
func (s *LocalStore) LoadIngressFromStreamKey(_ context.Context, _ string) (*livekit.IngressInfo, error) {
return nil, ErrIngressNotConnected
}
func (s *LocalStore) ListIngress(_ context.Context, _ livekit.RoomName) ([]*livekit.IngressInfo, error) {
return nil, ErrIngressNotConnected
}
func (s *LocalStore) UpdateIngress(_ context.Context, _ *livekit.IngressInfo) error {
return ErrIngressNotConnected
}
func (s *LocalStore) DeleteIngress(_ context.Context, _ *livekit.IngressInfo) error {
return ErrIngressNotConnected
}
+217 -67
View File
@@ -2,23 +2,33 @@ package service
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/go-redis/redis/v8"
goversion "github.com/hashicorp/go-version"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
VersionKey = "livekit_version"
// RoomsKey is hash of room_name => Room proto
RoomsKey = "rooms"
// EgressKey is a hash of egressID => egress info
EgressKey = "egress"
RoomEgressPrefix = "room_egress:"
EgressKey = "egress"
EndedEgressKey = "ended_egress"
RoomEgressPrefix = "egress:room:"
DeprecatedRoomEgressPrefix = "room_egress:"
// IngressKey is a hash of ingressID => ingress info
IngressKey = "ingress"
@@ -35,8 +45,9 @@ const (
)
type RedisStore struct {
rc *redis.Client
ctx context.Context
rc *redis.Client
ctx context.Context
done chan struct{}
}
func NewRedisStore(rc *redis.Client) *RedisStore {
@@ -46,6 +57,44 @@ func NewRedisStore(rc *redis.Client) *RedisStore {
}
}
func (s *RedisStore) Start() error {
if s.done != nil {
return nil
}
s.done = make(chan struct{}, 1)
current, err := s.rc.Get(s.ctx, VersionKey).Result()
if err != nil && err != redis.Nil {
return err
}
if current == "" {
current = "0.0.0"
}
v, _ := goversion.NewVersion(current)
migrateEgress, _ := goversion.NewVersion("1.1.3")
if v.LessThan(migrateEgress) {
if _, err = s.MigrateEgressInfo(); err != nil {
return err
}
if err = s.rc.Set(s.ctx, VersionKey, version.Version, 0).Err(); err != nil {
return err
}
}
go s.egressWorker()
return nil
}
func (s *RedisStore) Stop() {
select {
case <-s.done:
default:
close(s.done)
}
}
func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error {
if room.CreationTime == 0 {
room.CreationTime = time.Now().Unix()
@@ -233,11 +282,10 @@ func (s *RedisStore) StoreEgress(_ context.Context, info *livekit.EgressInfo) er
return err
}
pp := s.rc.Pipeline()
pp.HSet(s.ctx, EgressKey, info.EgressId, data)
pp.SAdd(s.ctx, RoomEgressPrefix+info.RoomId, info.EgressId)
if _, err = pp.Exec(s.ctx); err != nil {
tx := s.rc.TxPipeline()
tx.HSet(s.ctx, EgressKey, info.EgressId, data)
tx.SAdd(s.ctx, RoomEgressPrefix+info.RoomName, info.EgressId)
if _, err = tx.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not store egress info")
}
@@ -246,25 +294,27 @@ func (s *RedisStore) StoreEgress(_ context.Context, info *livekit.EgressInfo) er
func (s *RedisStore) LoadEgress(_ context.Context, egressID string) (*livekit.EgressInfo, error) {
data, err := s.rc.HGet(s.ctx, EgressKey, egressID).Result()
if err != nil {
if err == redis.Nil {
return nil, ErrEgressNotFound
switch err {
case nil:
info := &livekit.EgressInfo{}
err = proto.Unmarshal([]byte(data), info)
if err != nil {
return nil, err
}
return nil, err
}
return info, nil
info := &livekit.EgressInfo{}
err = proto.Unmarshal([]byte(data), info)
if err != nil {
case redis.Nil:
return nil, ErrEgressNotFound
default:
return nil, err
}
return info, nil
}
func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*livekit.EgressInfo, error) {
func (s *RedisStore) ListEgress(_ context.Context, roomName livekit.RoomName) ([]*livekit.EgressInfo, error) {
var infos []*livekit.EgressInfo
if roomID == "" {
if roomName == "" {
data, err := s.rc.HGetAll(s.ctx, EgressKey).Result()
if err != nil {
if err == redis.Nil {
@@ -282,7 +332,7 @@ func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*li
infos = append(infos, info)
}
} else {
ids, err := s.rc.SMembers(s.ctx, RoomEgressPrefix+string(roomID)).Result()
egressIDs, err := s.rc.SMembers(s.ctx, RoomEgressPrefix+string(roomName)).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
@@ -290,7 +340,7 @@ func (s *RedisStore) ListEgress(_ context.Context, roomID livekit.RoomID) ([]*li
return nil, err
}
data, _ := s.rc.HMGet(s.ctx, EgressKey, ids...).Result()
data, _ := s.rc.HMGet(s.ctx, EgressKey, egressIDs...).Result()
for _, d := range data {
if d == nil {
continue
@@ -313,23 +363,79 @@ func (s *RedisStore) UpdateEgress(_ context.Context, info *livekit.EgressInfo) e
return err
}
pp := s.rc.Pipeline()
pp.HSet(s.ctx, EgressKey, info.EgressId, data)
if info.EndedAt != 0 {
tx := s.rc.TxPipeline()
tx.HSet(s.ctx, EgressKey, info.EgressId, data)
tx.HSet(s.ctx, EndedEgressKey, info.EgressId, egressEndedValue(info.RoomName, info.EndedAt))
_, err = tx.Exec(s.ctx)
} else {
err = s.rc.HSet(s.ctx, EgressKey, info.EgressId, data).Err()
}
if _, err = pp.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not store egress info")
if err != nil {
return errors.Wrap(err, "could not update egress info")
}
return nil
}
func (s *RedisStore) DeleteEgress(_ context.Context, info *livekit.EgressInfo) error {
err := s.rc.SRem(s.ctx, RoomEgressPrefix+info.RoomId, info.EgressId).Err()
if err != nil {
// Deletes egress info 24h after the egress has ended
func (s *RedisStore) egressWorker() {
ticker := time.NewTicker(time.Minute * 30)
for {
select {
case <-s.done:
return
case <-ticker.C:
err := s.CleanEndedEgress()
if err != nil {
logger.Errorw("could not clean egress info", err)
}
}
}
}
func (s RedisStore) CleanEndedEgress() error {
values, err := s.rc.HGetAll(s.ctx, EndedEgressKey).Result()
if err != nil && err != redis.Nil {
return err
}
return s.rc.HDel(s.ctx, EgressKey, info.EgressId).Err()
expiry := time.Now().Add(-24 * time.Hour).UnixNano()
for egressID, val := range values {
roomName, endedAt, err := parseEgressEnded(val)
if err != nil {
return err
}
if endedAt < expiry {
tx := s.rc.TxPipeline()
tx.HDel(s.ctx, EndedEgressKey, egressID)
tx.SRem(s.ctx, RoomEgressPrefix+roomName, egressID)
tx.HDel(s.ctx, EgressKey, egressID)
if _, err := tx.Exec(s.ctx); err != nil {
return err
}
}
}
return nil
}
func egressEndedValue(roomName string, endedAt int64) string {
return fmt.Sprintf("%s|%d", roomName, endedAt)
}
func parseEgressEnded(value string) (roomName string, endedAt int64, err error) {
s := strings.Split(value, "|")
if len(s) != 2 {
err = errors.New("invalid egressEnded value")
return
}
roomName = s[0]
endedAt, err = strconv.ParseInt(s[1], 10, 64)
return
}
func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error {
@@ -359,16 +465,16 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
return err
}
results, err := tx.TxPipelined(s.ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(s.ctx, IngressKey, info.IngressId, data)
pipe.HSet(s.ctx, StreamKeyKey, info.IngressId, info.StreamKey)
results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error {
p.HSet(s.ctx, IngressKey, info.IngressId, data)
p.HSet(s.ctx, StreamKeyKey, info.IngressId, info.StreamKey)
if oldRoom != info.RoomName {
if oldRoom != "" {
pipe.SRem(s.ctx, RoomIngressPrefix+oldRoom, info.IngressId)
p.SRem(s.ctx, RoomIngressPrefix+oldRoom, info.IngressId)
}
if info.RoomName != "" {
pipe.SAdd(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId)
p.SAdd(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId)
}
}
@@ -408,20 +514,21 @@ func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo)
func (s *RedisStore) loadIngress(c redis.Cmdable, ingressId string) (*livekit.IngressInfo, error) {
data, err := c.HGet(s.ctx, IngressKey, ingressId).Result()
if err != nil {
if err == redis.Nil {
return nil, ErrIngressNotFound
switch err {
case nil:
info := &livekit.IngressInfo{}
err = proto.Unmarshal([]byte(data), info)
if err != nil {
return nil, err
}
return info, nil
case redis.Nil:
return nil, ErrIngressNotFound
default:
return nil, err
}
info := &livekit.IngressInfo{}
err = proto.Unmarshal([]byte(data), info)
if err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) LoadIngress(_ context.Context, ingressId string) (*livekit.IngressInfo, error) {
@@ -429,16 +536,17 @@ func (s *RedisStore) LoadIngress(_ context.Context, ingressId string) (*livekit.
}
func (s *RedisStore) LoadIngressFromStreamKey(_ context.Context, streamKey string) (*livekit.IngressInfo, error) {
ingressId, err := s.rc.HGet(s.ctx, StreamKeyKey, streamKey).Result()
ingressID, err := s.rc.HGet(s.ctx, StreamKeyKey, streamKey).Result()
switch err {
case nil:
return s.loadIngress(s.rc, ingressID)
case redis.Nil:
return nil, ErrIngressNotFound
default:
return nil, err
}
return s.loadIngress(s.rc, ingressId)
}
func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ([]*livekit.IngressInfo, error) {
@@ -462,7 +570,7 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) (
infos = append(infos, info)
}
} else {
ids, err := s.rc.SMembers(s.ctx, RoomIngressPrefix+string(roomName)).Result()
ingressIDs, err := s.rc.SMembers(s.ctx, RoomIngressPrefix+string(roomName)).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
@@ -470,7 +578,7 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) (
return nil, err
}
data, _ := s.rc.HMGet(s.ctx, IngressKey, ids...).Result()
data, _ := s.rc.HMGet(s.ctx, IngressKey, ingressIDs...).Result()
for _, d := range data {
if d == nil {
continue
@@ -487,25 +595,67 @@ func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) (
return infos, nil
}
func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error {
return s.StoreIngress(ctx, info)
func (s *RedisStore) UpdateIngress(_ context.Context, info *livekit.IngressInfo) error {
return s.StoreIngress(s.ctx, info)
}
func (s *RedisStore) DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error {
err := s.rc.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId).Err()
if err != nil {
return err
}
err = s.rc.HDel(s.ctx, StreamKeyKey, info.IngressId).Err()
if err != nil {
return err
}
err = s.rc.HDel(s.ctx, EgressKey, info.IngressId).Err()
if err != nil {
return err
func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error {
tx := s.rc.TxPipeline()
tx.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId)
tx.HDel(s.ctx, StreamKeyKey, info.IngressId)
tx.HDel(s.ctx, IngressKey, info.IngressId)
if _, err := tx.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not delete ingress info")
}
return nil
}
// Migration to LiveKit >= v1.1.3
func (s *RedisStore) MigrateEgressInfo() (int, error) {
locked, err := s.rc.SetNX(s.ctx, "egress-migration", utils.NewGuid("LOCK"), time.Minute).Result()
if err != nil {
return 0, err
} else if !locked {
return 0, nil
}
it := s.rc.Scan(s.ctx, 0, DeprecatedRoomEgressPrefix+"*", 0).Iterator()
migrated := 0
for it.Next(s.ctx) {
migrated++
key := it.Val()
egressIDs, err := s.rc.SMembers(s.ctx, key).Result()
if err != nil && err != redis.Nil {
return migrated, err
}
for _, egressID := range egressIDs {
info, err := s.LoadEgress(s.ctx, egressID)
if err != nil {
return migrated, err
}
var roomName string
switch req := info.Request.(type) {
case *livekit.EgressInfo_RoomComposite:
roomName = req.RoomComposite.RoomName
case *livekit.EgressInfo_TrackComposite:
roomName = req.TrackComposite.RoomName
case *livekit.EgressInfo_Track:
roomName = req.Track.RoomName
}
tx := s.rc.TxPipeline()
tx.SAdd(s.ctx, RoomEgressPrefix+roomName, egressID)
tx.SRem(s.ctx, key, egressID)
if _, err = tx.Exec(s.ctx); err != nil {
return migrated, err
}
}
s.rc.Del(s.ctx, key)
}
return migrated, nil
}
+97 -1
View File
@@ -8,8 +8,10 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/service"
)
@@ -111,7 +113,101 @@ func TestRoomLock(t *testing.T) {
})
}
func TestStoreIngress(t *testing.T) {
func TestEgressStore(t *testing.T) {
ctx := context.Background()
rc := redisClient()
rs := service.NewRedisStore(rc)
roomName := "egress-test"
// test migration
info := &livekit.EgressInfo{
EgressId: utils.NewGuid(utils.EgressPrefix),
RoomId: utils.NewGuid(utils.RoomPrefix),
RoomName: roomName,
Status: livekit.EgressStatus_EGRESS_STARTING,
Request: &livekit.EgressInfo_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: roomName,
Layout: "speaker-dark",
},
},
}
data, err := proto.Marshal(info)
require.NoError(t, err)
// store egress info the old way
tx := rc.TxPipeline()
tx.HSet(ctx, service.EgressKey, info.EgressId, data)
tx.SAdd(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId, info.EgressId)
_, err = tx.Exec(ctx)
require.NoError(t, err)
// run migration
migrated, err := rs.MigrateEgressInfo()
require.NoError(t, err)
require.Equal(t, 1, migrated)
// check that it was migrated
exists, err := rc.Exists(ctx, service.DeprecatedRoomEgressPrefix+info.RoomId).Result()
require.NoError(t, err)
require.Equal(t, int64(0), exists)
exists, err = rc.Exists(ctx, service.RoomEgressPrefix+info.RoomName).Result()
require.NoError(t, err)
require.Equal(t, int64(1), exists)
// load
res, err := rs.LoadEgress(ctx, info.EgressId)
require.NoError(t, err)
require.Equal(t, res.EgressId, info.EgressId)
// store another
info2 := &livekit.EgressInfo{
EgressId: utils.NewGuid(utils.EgressPrefix),
RoomId: utils.NewGuid(utils.RoomPrefix),
RoomName: "another-egress-test",
Status: livekit.EgressStatus_EGRESS_STARTING,
Request: &livekit.EgressInfo_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: "another-egress-test",
Layout: "speaker-dark",
},
},
}
require.NoError(t, rs.StoreEgress(ctx, info2))
// update
info2.Status = livekit.EgressStatus_EGRESS_COMPLETE
info2.EndedAt = time.Now().Add(-24 * time.Hour).UnixNano()
require.NoError(t, rs.UpdateEgress(ctx, info))
// list
list, err := rs.ListEgress(ctx, "")
require.NoError(t, err)
require.Len(t, list, 2)
// list by room
list, err = rs.ListEgress(ctx, livekit.RoomName(roomName))
require.NoError(t, err)
require.Len(t, list, 1)
// update
info.Status = livekit.EgressStatus_EGRESS_COMPLETE
info.EndedAt = time.Now().Add(-24 * time.Hour).UnixNano()
require.NoError(t, rs.UpdateEgress(ctx, info))
// clean
require.NoError(t, rs.CleanEndedEgress())
// list
list, err = rs.ListEgress(ctx, livekit.RoomName(roomName))
require.NoError(t, err)
require.Len(t, list, 0)
}
func TestIngressStore(t *testing.T) {
ctx := context.Background()
rs := service.NewRedisStore(redisClient())
+3 -1
View File
@@ -146,7 +146,9 @@ func (s *LivekitServer) Start() error {
return err
}
s.egressService.Start()
if err := s.egressService.Start(); err != nil {
return err
}
addresses := s.config.BindAddresses
if addresses == nil {
+6 -82
View File
@@ -10,23 +10,11 @@ import (
)
type FakeEgressStore struct {
DeleteEgressStub func(context.Context, *livekit.EgressInfo) error
deleteEgressMutex sync.RWMutex
deleteEgressArgsForCall []struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}
deleteEgressReturns struct {
result1 error
}
deleteEgressReturnsOnCall map[int]struct {
result1 error
}
ListEgressStub func(context.Context, livekit.RoomID) ([]*livekit.EgressInfo, error)
ListEgressStub func(context.Context, livekit.RoomName) ([]*livekit.EgressInfo, error)
listEgressMutex sync.RWMutex
listEgressArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomID
arg2 livekit.RoomName
}
listEgressReturns struct {
result1 []*livekit.EgressInfo
@@ -78,74 +66,12 @@ type FakeEgressStore struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeEgressStore) DeleteEgress(arg1 context.Context, arg2 *livekit.EgressInfo) error {
fake.deleteEgressMutex.Lock()
ret, specificReturn := fake.deleteEgressReturnsOnCall[len(fake.deleteEgressArgsForCall)]
fake.deleteEgressArgsForCall = append(fake.deleteEgressArgsForCall, struct {
arg1 context.Context
arg2 *livekit.EgressInfo
}{arg1, arg2})
stub := fake.DeleteEgressStub
fakeReturns := fake.deleteEgressReturns
fake.recordInvocation("DeleteEgress", []interface{}{arg1, arg2})
fake.deleteEgressMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeEgressStore) DeleteEgressCallCount() int {
fake.deleteEgressMutex.RLock()
defer fake.deleteEgressMutex.RUnlock()
return len(fake.deleteEgressArgsForCall)
}
func (fake *FakeEgressStore) DeleteEgressCalls(stub func(context.Context, *livekit.EgressInfo) error) {
fake.deleteEgressMutex.Lock()
defer fake.deleteEgressMutex.Unlock()
fake.DeleteEgressStub = stub
}
func (fake *FakeEgressStore) DeleteEgressArgsForCall(i int) (context.Context, *livekit.EgressInfo) {
fake.deleteEgressMutex.RLock()
defer fake.deleteEgressMutex.RUnlock()
argsForCall := fake.deleteEgressArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeEgressStore) DeleteEgressReturns(result1 error) {
fake.deleteEgressMutex.Lock()
defer fake.deleteEgressMutex.Unlock()
fake.DeleteEgressStub = nil
fake.deleteEgressReturns = struct {
result1 error
}{result1}
}
func (fake *FakeEgressStore) DeleteEgressReturnsOnCall(i int, result1 error) {
fake.deleteEgressMutex.Lock()
defer fake.deleteEgressMutex.Unlock()
fake.DeleteEgressStub = nil
if fake.deleteEgressReturnsOnCall == nil {
fake.deleteEgressReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.deleteEgressReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeEgressStore) ListEgress(arg1 context.Context, arg2 livekit.RoomID) ([]*livekit.EgressInfo, error) {
func (fake *FakeEgressStore) ListEgress(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.EgressInfo, error) {
fake.listEgressMutex.Lock()
ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)]
fake.listEgressArgsForCall = append(fake.listEgressArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomID
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.ListEgressStub
fakeReturns := fake.listEgressReturns
@@ -166,13 +92,13 @@ func (fake *FakeEgressStore) ListEgressCallCount() int {
return len(fake.listEgressArgsForCall)
}
func (fake *FakeEgressStore) ListEgressCalls(stub func(context.Context, livekit.RoomID) ([]*livekit.EgressInfo, error)) {
func (fake *FakeEgressStore) ListEgressCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.EgressInfo, error)) {
fake.listEgressMutex.Lock()
defer fake.listEgressMutex.Unlock()
fake.ListEgressStub = stub
}
func (fake *FakeEgressStore) ListEgressArgsForCall(i int) (context.Context, livekit.RoomID) {
func (fake *FakeEgressStore) ListEgressArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.listEgressMutex.RLock()
defer fake.listEgressMutex.RUnlock()
argsForCall := fake.listEgressArgsForCall[i]
@@ -397,8 +323,6 @@ func (fake *FakeEgressStore) UpdateEgressReturnsOnCall(i int, result1 error) {
func (fake *FakeEgressStore) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.deleteEgressMutex.RLock()
defer fake.deleteEgressMutex.RUnlock()
fake.listEgressMutex.RLock()
defer fake.listEgressMutex.RUnlock()
fake.loadEgressMutex.RLock()
-2
View File
@@ -170,8 +170,6 @@ func getEgressStore(s ObjectStore) EgressStore {
switch store := s.(type) {
case *RedisStore:
return store
case *LocalStore:
return store
default:
return nil
}
+1 -4
View File
@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service
@@ -196,8 +195,6 @@ func getEgressStore(s ObjectStore) EgressStore {
switch store := s.(type) {
case *RedisStore:
return store
case *LocalStore:
return store
default:
return nil
}