mirror of
https://github.com/livekit/livekit.git
synced 2026-06-08 00:02:18 +00:00
use egress results channel (#512)
This commit is contained in:
@@ -5,12 +5,12 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/livekit/protocol/egress"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/recording"
|
||||
"github.com/livekit/protocol/utils"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
)
|
||||
@@ -72,7 +72,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if s.bus == nil {
|
||||
return nil, errors.New("egress not connected (redis required)")
|
||||
return nil, ErrEgressNotConnected
|
||||
}
|
||||
|
||||
room, err := s.store.LoadRoom(ctx, roomName)
|
||||
@@ -105,7 +105,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if s.bus == nil {
|
||||
return nil, errors.New("egress not connected (redis required)")
|
||||
return nil, ErrEgressNotConnected
|
||||
}
|
||||
|
||||
info, err := s.store.LoadEgress(ctx, req.EgressId)
|
||||
@@ -147,7 +147,7 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if s.bus == nil {
|
||||
return nil, errors.New("egress not connected (redis required)")
|
||||
return nil, ErrEgressNotConnected
|
||||
}
|
||||
|
||||
info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{
|
||||
@@ -174,7 +174,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if s.bus == nil {
|
||||
return nil, errors.New("egress not connected (redis required)")
|
||||
return nil, ErrEgressNotConnected
|
||||
}
|
||||
|
||||
var roomID livekit.RoomID
|
||||
@@ -199,7 +199,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
if s.bus == nil {
|
||||
return nil, errors.New("recording not configured (redis required)")
|
||||
return nil, ErrEgressNotConnected
|
||||
}
|
||||
|
||||
info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{
|
||||
@@ -222,7 +222,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
|
||||
}
|
||||
|
||||
func (s *EgressService) resultsWorker() {
|
||||
sub, err := s.bus.SubscribeQueue(context.Background(), recording.ResultChannel)
|
||||
sub, err := s.bus.SubscribeQueue(context.Background(), egress.ResultsChannel)
|
||||
if err != nil {
|
||||
logger.Errorw("failed to subscribe to results channel", err)
|
||||
return
|
||||
|
||||
@@ -4,6 +4,7 @@ import "errors"
|
||||
|
||||
var (
|
||||
ErrEgressNotFound = errors.New("egress does not exist")
|
||||
ErrEgressNotConnected = errors.New("egress not connected (redis required)")
|
||||
ErrRoomNotFound = errors.New("requested room does not exist")
|
||||
ErrRoomLockFailed = errors.New("could not lock room")
|
||||
ErrRoomUnlockFailed = errors.New("could not unlock room, lock token does not match")
|
||||
|
||||
Reference in New Issue
Block a user