Added logging fields for Ingress & Egress services (#1205)

This commit is contained in:
David Zhao
2022-12-04 21:44:16 -08:00
committed by GitHub
parent 7548ffbb82
commit e9abb47020
3 changed files with 81 additions and 6 deletions

3
.idea/protoeditor.xml generated
View File

@@ -10,6 +10,9 @@
<ImportPathEntry>
<option name="location" value="file://$PROJECT_DIR$/proto" />
</ImportPathEntry>
<ImportPathEntry>
<option name="location" value="file://$USER_HOME$/Library/Caches/JetBrains/GoLand2022.3/protoeditor" />
</ImportPathEntry>
</list>
</option>
<option name="descriptorPath" value="google/protobuf/descriptor.proto" />

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"reflect"
"time"
"google.golang.org/protobuf/proto"
@@ -80,38 +81,76 @@ func (s *EgressService) Stop() {
}
func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livekit.RoomCompositeEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
fields := []interface{}{"room", req.RoomName, "outputType", reflect.TypeOf(req.Output).String(), "baseUrl", req.CustomBaseUrl}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_RoomComposite{
RoomComposite: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *livekit.TrackCompositeEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
fields := []interface{}{
"room", req.RoomName, "outputType", reflect.TypeOf(req.Output).String(), "audioTrackID", req.AudioTrackId, "videoTrackID", req.VideoTrackId,
}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_TrackComposite{
TrackComposite: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.TrackEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
fields := []interface{}{"room", req.RoomName, "trackID", req.TrackId, "outputType", reflect.TypeOf(req.Output).String()}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Track{
Track: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgress(ctx, "", &livekit.StartEgressRequest{
fields := []interface{}{"url", req.Url, "outputType", reflect.TypeOf(req.Output).String()}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, "", &livekit.StartEgressRequest{
Request: &livekit.StartEgressRequest_Web{
Web: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *livekit.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
} else if s.launcher == nil {
@@ -150,6 +189,7 @@ type LayoutMetadata struct {
}
func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLayoutRequest) (*livekit.EgressInfo, error) {
AppendLogFields(ctx, "egressID", req.EgressId, "layout", req.Layout)
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
@@ -184,6 +224,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
}
func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRequest) (*livekit.EgressInfo, error) {
AppendLogFields(ctx, "egressID", req.EgressId, "addUrls", req.AddOutputUrls, "removeUrls", req.RemoveOutputUrls)
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
@@ -211,6 +252,9 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
}
func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) {
if req.RoomName != "" {
AppendLogFields(ctx, "room", req.RoomName)
}
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
@@ -227,6 +271,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
}
func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
AppendLogFields(ctx, "egressID", req.EgressId)
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}

View File

@@ -59,7 +59,24 @@ func (s *IngressService) Stop() {
}
func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
return s.CreateIngressWithUrlPrefix(ctx, s.conf.RTMPBaseURL, req)
fields := []interface{}{
"inputType", req.InputType,
"name", req.Name,
}
if req.RoomName != "" {
fields = append(fields, "room", req.RoomName, "identity", req.ParticipantIdentity)
}
defer func() {
AppendLogFields(ctx, fields...)
}()
ig, err := s.CreateIngressWithUrlPrefix(ctx, s.conf.RTMPBaseURL, req)
if err != nil {
return nil, err
}
fields = append(fields, "ingressID", ig.IngressId)
return ig, nil
}
func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPrefix string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
@@ -129,6 +146,14 @@ func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.Ingr
}
func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) {
fields := []interface{}{
"ingress", req.IngressId,
"name", req.Name,
}
if req.RoomName != "" {
fields = append(fields, "room", req.RoomName, "identity", req.ParticipantIdentity)
}
AppendLogFields(ctx, fields...)
err := EnsureIngressAdminPermission(ctx)
if err != nil {
return nil, twirpAuthError(err)
@@ -197,6 +222,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
}
func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngressRequest) (*livekit.ListIngressResponse, error) {
AppendLogFields(ctx, "room", req.RoomName)
err := EnsureIngressAdminPermission(ctx)
if err != nil {
return nil, twirpAuthError(err)
@@ -212,6 +238,7 @@ func (s *IngressService) ListIngress(ctx context.Context, req *livekit.ListIngre
}
func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteIngressRequest) (*livekit.IngressInfo, error) {
AppendLogFields(ctx, "ingressID", req.IngressId)
if err := EnsureIngressAdminPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}