Update egress info (#694)

This commit is contained in:
David Colburn
2022-05-18 11:30:25 -07:00
committed by GitHub
parent 54bb0a29e8
commit a8ff70f63a

View File

@@ -35,7 +35,7 @@ func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomS
func (s *EgressService) Start() {
if s.bus != nil {
go s.resultsWorker()
go s.updateListener()
}
}
@@ -221,7 +221,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
return info, nil
}
func (s *EgressService) resultsWorker() {
func (s *EgressService) updateListener() {
sub, err := s.bus.SubscribeQueue(context.Background(), egress.ResultsChannel)
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
@@ -240,19 +240,27 @@ func (s *EgressService) resultsWorker() {
continue
}
err = s.store.DeleteEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not delete egress from store", err)
}
switch res.Status {
case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING:
// save updated info to store
err = s.store.UpdateEgress(context.Background(), res)
// log results
if res.Error != "" {
logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId)
} else {
logger.Debugw("egress ended", "egressID", res.EgressId)
}
case livekit.EgressStatus_EGRESS_COMPLETE:
// delete from store
err = s.store.DeleteEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not delete egress from store", err)
}
s.telemetry.EgressEnded(context.Background(), res)
// log results
if res.Error != "" {
logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId)
} else {
logger.Debugw("egress ended", "egressID", res.EgressId)
}
s.telemetry.EgressEnded(context.Background(), res)
}
case <-s.shutdown:
_ = sub.Close()
return