move recorder to msg bus (#108)

This commit is contained in:
David Colburn
2021-09-02 17:06:48 -05:00
committed by GitHub
parent e82d50d717
commit e9ffbefa70
5 changed files with 36 additions and 31 deletions
+2 -2
View File
@@ -9,12 +9,12 @@ require (
github.com/gammazero/workerpool v1.1.2
github.com/go-logr/logr v1.0.0
github.com/go-logr/zapr v1.0.0
github.com/go-redis/redis/v8 v8.7.1
github.com/go-redis/redis/v8 v8.11.3
github.com/google/subcommands v1.2.0 // indirect
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.8.2
github.com/livekit/protocol v0.8.4
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+17 -18
View File
@@ -80,8 +80,11 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k=
github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
@@ -113,8 +116,8 @@ github.com/go-logr/logr v1.0.0 h1:kH951GinvFVaQgy/ki/B3YYmQtRpExGigSJg6O8z5jo=
github.com/go-logr/logr v1.0.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/zapr v1.0.0 h1:rerrgIsgykt8zVvKMVfqxI2SoYvHAFdX11er/SLZZgI=
github.com/go-logr/zapr v1.0.0/go.mod h1:t7rgfcj/l02iFgbQxqhQeoyWA9jX2+2enc4PUHF6Hp0=
github.com/go-redis/redis/v8 v8.7.1 h1:8IYi6RO83fNcG5amcUUYTN/qH2h4OjZHlim3KWGFSsA=
github.com/go-redis/redis/v8 v8.7.1/go.mod h1:BRxHBWn3pO3CfjyX6vAoyeRmCquvxr6QG+2onGV2gYs=
github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8=
github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
@@ -142,8 +145,9 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -153,8 +157,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -241,8 +246,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.9 h1:Ih+tE5CXqdL1o6iXfZn+uBoddBXWceaOKVIun6aVuZM=
github.com/livekit/ion-sfu v1.20.9/go.mod h1:g8hwobZI5fvX1RXvayf4ZXkgP7spV5YGE4yTSsumpB4=
github.com/livekit/protocol v0.8.2 h1:GvB4fDaIOkDUtYe+G/AMm7mPE19U4zv/55XL7LrrqpY=
github.com/livekit/protocol v0.8.2/go.mod h1:YKcyBbqH0WmNL35i7c5jxr1L2Az13oT10oGNDOijI20=
github.com/livekit/protocol v0.8.4 h1:XoH+DLno8EpT+xWor+NAVhVgmYIN+P1BV5um/nFCZAc=
github.com/livekit/protocol v0.8.4/go.mod h1:D+NoXVPVu5VRqxGe0Lt3Q1gvTm6cSRzTbYhFbRz84h8=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
@@ -296,16 +301,16 @@ github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/ginkgo v1.16.1 h1:foqVmeWDD6yYpK+Yz3fHyNIxFYNxswxqNFjSKe+vI54=
github.com/onsi/ginkgo v1.16.1/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/onsi/gomega v1.11.0 h1:+CqWgvj0OZycCaqclBD1pxKHAU+tOkHmQIWvDHq2aug=
github.com/onsi/gomega v1.11.0/go.mod h1:azGKhqFUon9Vuj0YmTfLSmx0FUwqXYSTl5re8lQLTUg=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
@@ -474,14 +479,6 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v0.18.0 h1:d5Of7+Zw4ANFOJB+TIn2K3QWsgS2Ht7OU9DqZHI6qu8=
go.opentelemetry.io/otel v0.18.0/go.mod h1:PT5zQj4lTsR1YeARt8YNKcFb88/c2IKoSABK9mX0r78=
go.opentelemetry.io/otel/metric v0.18.0 h1:yuZCmY9e1ZTaMlZXLrrbAPmYW6tW1A5ozOZeOYGaTaY=
go.opentelemetry.io/otel/metric v0.18.0/go.mod h1:kEH2QtzAyBy3xDVQfGZKIcok4ZZFvd5xyKPfPcuK6pE=
go.opentelemetry.io/otel/oteltest v0.18.0 h1:FbKDFm/LnQDOHuGjED+fy3s5YMVg0z019GJ9Er66hYo=
go.opentelemetry.io/otel/oteltest v0.18.0/go.mod h1:NyierCU3/G8DLTva7KRzGii2fdxdR89zXKH1bNWY7Bo=
go.opentelemetry.io/otel/trace v0.18.0 h1:ilCfc/fptVKaDMK1vWk0elxpolurJbEgey9J6g6s+wk=
go.opentelemetry.io/otel/trace v0.18.0/go.mod h1:FzdUu3BPwZSZebfQ1vl5/tAa8LyMLXSJN57AXIt/iDk=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -564,6 +561,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -717,6 +715,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+12 -10
View File
@@ -5,18 +5,17 @@ import (
"errors"
"time"
"github.com/go-redis/redis/v8"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"google.golang.org/protobuf/proto"
)
type RecordingService struct {
rc *redis.Client
mb utils.MessageBus
}
func NewRecordingService(rc *redis.Client) *RecordingService {
return &RecordingService{rc: rc}
func NewRecordingService(mb utils.MessageBus) *RecordingService {
return &RecordingService{mb: mb}
}
func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.RecordingResponse, error) {
@@ -24,7 +23,7 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star
return nil, twirpAuthError(err)
}
if s.rc == nil {
if s.mb == nil {
return nil, errors.New("recording not configured (redis required)")
}
@@ -35,7 +34,7 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star
}
// start the recording
err = s.rc.Publish(ctx, utils.StartRecordingChannel(recordingID), nil).Err()
err = s.mb.Publish(ctx, utils.StartRecordingChannel(recordingID), nil)
if err != nil {
return nil, err
}
@@ -55,10 +54,13 @@ func (s *RecordingService) reserveRecorder(ctx context.Context, req *livekit.Sta
return "", err
}
sub := s.rc.Subscribe(ctx, utils.ReservationResponseChannel(id))
sub, err := s.mb.Subscribe(ctx, utils.ReservationResponseChannel(id))
if err != nil {
return "", err
}
defer sub.Close()
if err = s.rc.Publish(ctx, utils.ReservationChannel, string(b)).Err(); err != nil {
if err = s.mb.Publish(ctx, utils.ReservationChannel, string(b)); err != nil {
return "", err
}
@@ -75,11 +77,11 @@ func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRec
return nil, twirpAuthError(err)
}
if s.rc == nil {
if s.mb == nil {
return nil, errors.New("recording not configured (redis required)")
}
if err := s.rc.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil).Err(); err != nil {
if err := s.mb.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil); err != nil {
return nil, err
}
+2
View File
@@ -11,6 +11,7 @@ import (
"github.com/google/wire"
"github.com/livekit/protocol/auth"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/pkg/errors"
@@ -21,6 +22,7 @@ import (
var ServiceSet = wire.NewSet(
createRedisClient,
utils.NewRedisMessageBus,
createRouter,
createStore,
CreateKeyProvider,
+3 -1
View File
@@ -8,6 +8,7 @@ package service
import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/utils"
)
// Injectors from wire.go:
@@ -36,7 +37,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
recordingService := NewRecordingService(client)
messageBus := utils.NewRedisMessageBus(client)
recordingService := NewRecordingService(messageBus)
rtcService := NewRTCService(conf, localRoomManager, router, currentNode)
server, err := NewTurnServer(conf, roomStore, currentNode)
if err != nil {