diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 061c710be..b55d8319a 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -35,7 +35,7 @@ func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomS func (s *EgressService) Start() { if s.bus != nil { - go s.resultsWorker() + go s.updateListener() } } @@ -221,7 +221,7 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR return info, nil } -func (s *EgressService) resultsWorker() { +func (s *EgressService) updateListener() { sub, err := s.bus.SubscribeQueue(context.Background(), egress.ResultsChannel) if err != nil { logger.Errorw("failed to subscribe to results channel", err) @@ -240,19 +240,27 @@ func (s *EgressService) resultsWorker() { continue } - err = s.store.DeleteEgress(context.Background(), res) - if err != nil { - logger.Errorw("could not delete egress from store", err) - } + switch res.Status { + case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: + // save updated info to store + err = s.store.UpdateEgress(context.Background(), res) - // log results - if res.Error != "" { - logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId) - } else { - logger.Debugw("egress ended", "egressID", res.EgressId) - } + case livekit.EgressStatus_EGRESS_COMPLETE: + // delete from store + err = s.store.DeleteEgress(context.Background(), res) + if err != nil { + logger.Errorw("could not delete egress from store", err) + } - s.telemetry.EgressEnded(context.Background(), res) + // log results + if res.Error != "" { + logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId) + } else { + logger.Debugw("egress ended", "egressID", res.EgressId) + } + + s.telemetry.EgressEnded(context.Background(), res) + } case <-s.shutdown: _ = sub.Close() return