diff --git a/config-sample.yaml b/config-sample.yaml index 0b44b11bf..15c7ada18 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -161,6 +161,7 @@ keys: # playout_delay: # enabled: true # min: 100 +# max: 2000 # Webhooks # when configured, LiveKit notifies your URL handler with room events diff --git a/go.mod b/go.mod index c4f7e12c9..340dc1deb 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f - github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e + github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index fe75c3753..ed858e767 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA= github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c h1:Z44UEdskI35V3nDJfVvYuJ4DOW1wQLku32wbNyz23MM= +github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/config/config.go b/pkg/config/config.go index fdf468145..3bbd8c826 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -205,6 +205,7 @@ type StreamTrackersConfig struct { type PlayoutDelayConfig struct { Enabled bool `yaml:"enabled,omitempty"` Min int `yaml:"min,omitempty"` + Max int `yaml:"max,omitempty"` } type VideoConfig struct { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 92f50ca01..cbc1cf54c 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -28,8 +28,6 @@ import ( "github.com/pion/sctp" - "github.com/livekit/livekit-server/pkg/sfu/connectionquality" - sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -38,8 +36,10 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + sutils "github.com/livekit/livekit-server/pkg/utils" ) const ( @@ -81,6 +81,7 @@ type Room struct { participants map[livekit.ParticipantIdentity]types.LocalParticipant participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource + hasPublished sync.Map // map of identity -> bool bufferFactory *buffer.FactoryOfBufferFactory // batch update participant info for non-publishers @@ -509,6 +510,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek for _, t := range p.GetPublishedTracks() { r.trackManager.RemoveTrack(t) } + r.hasPublished.Delete(p.Identity()) p.OnTrackUpdated(nil) p.OnTrackPublished(nil) @@ -897,18 +899,35 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.trackManager.AddTrack(track, participant.Identity(), participant.ID()) - // auto track egress - if r.internal != nil && r.internal.TrackEgress != nil { - if err := StartTrackEgress( - context.Background(), - r.egressLauncher, - r.telemetry, - r.internal.TrackEgress, - track, - r.Name(), - r.ID(), - ); err != nil { - r.Logger.Errorw("failed to launch track egress", err) + // auto egress + if r.internal != nil { + if r.internal.ParticipantEgress != nil { + if _, hasPublished := r.hasPublished.Swap(participant.Identity(), true); !hasPublished { + if err := StartParticipantEgress( + context.Background(), + r.egressLauncher, + r.telemetry, + r.internal.ParticipantEgress, + participant.Identity(), + r.Name(), + r.ID(), + ); err != nil { + r.Logger.Errorw("failed to launch participant egress", err) + } + } + } + if r.internal.TrackEgress != nil { + if err := StartTrackEgress( + context.Background(), + r.egressLauncher, + r.telemetry, + r.internal.TrackEgress, + track, + r.Name(), + r.ID(), + ); err != nil { + r.Logger.Errorw("failed to launch track egress", err) + } } } } diff --git a/pkg/rtc/room_egress.go b/pkg/rtc/room_egress.go index d632a007b..db6d12813 100644 --- a/pkg/rtc/room_egress.go +++ b/pkg/rtc/room_egress.go @@ -32,6 +32,67 @@ type EgressLauncher interface { StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) } +func StartParticipantEgress( + ctx context.Context, + launcher EgressLauncher, + ts telemetry.TelemetryService, + opts *livekit.AutoParticipantEgress, + identity livekit.ParticipantIdentity, + roomName livekit.RoomName, + roomID livekit.RoomID, +) error { + if req, err := startParticipantEgress(ctx, launcher, opts, identity, roomName, roomID); err != nil { + // send egress failed webhook + ts.NotifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventEgressEnded, + EgressInfo: &livekit.EgressInfo{ + RoomId: string(roomID), + RoomName: string(roomName), + Status: livekit.EgressStatus_EGRESS_FAILED, + Error: err.Error(), + Request: &livekit.EgressInfo_Participant{Participant: req}, + }, + }) + return err + } + return nil +} + +func startParticipantEgress( + ctx context.Context, + launcher EgressLauncher, + opts *livekit.AutoParticipantEgress, + identity livekit.ParticipantIdentity, + roomName livekit.RoomName, + roomID livekit.RoomID, +) (*livekit.ParticipantEgressRequest, error) { + req := &livekit.ParticipantEgressRequest{ + RoomName: string(roomName), + Identity: string(identity), + FileOutputs: opts.FileOutputs, + SegmentOutputs: opts.SegmentOutputs, + } + + switch o := opts.Options.(type) { + case *livekit.AutoParticipantEgress_Preset: + req.Options = &livekit.ParticipantEgressRequest_Preset{Preset: o.Preset} + case *livekit.AutoParticipantEgress_Advanced: + req.Options = &livekit.ParticipantEgressRequest_Advanced{Advanced: o.Advanced} + } + + if launcher == nil { + return req, errors.New("egress launcher not found") + } + + _, err := launcher.StartEgress(ctx, &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Participant{ + Participant: req, + }, + RoomId: string(roomID), + }) + return req, err +} + func StartTrackEgress( ctx context.Context, launcher EgressLauncher, @@ -66,7 +127,6 @@ func startTrackEgress( roomName livekit.RoomName, roomID livekit.RoomID, ) (*livekit.TrackEgressRequest, error) { - output := &livekit.DirectFileOutput{ Filepath: getFilePath(opts.Filepath), } diff --git a/pkg/service/egress.go b/pkg/service/egress.go index ffe16f839..8675d0b80 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -17,7 +17,6 @@ package service import ( "context" "encoding/json" - "errors" "fmt" "reflect" @@ -25,6 +24,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -80,9 +80,10 @@ func NewEgressService( } func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livekit.RoomCompositeEgressRequest) (*livekit.EgressInfo, error) { - fields := []interface{}{"room", req.RoomName, "baseUrl", req.CustomBaseUrl} - if t := reflect.TypeOf(req.Output); t != nil { - fields = append(fields, "outputType", t.String()) + fields := []interface{}{ + "room", req.RoomName, + "baseUrl", req.CustomBaseUrl, + "outputType", egress.GetOutputType(req), } defer func() { AppendLogFields(ctx, fields...) @@ -99,12 +100,53 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek return ei, err } +func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgressRequest) (*livekit.EgressInfo, error) { + fields := []interface{}{ + "url", req.Url, + "outputType", egress.GetOutputType(req), + } + defer func() { + AppendLogFields(ctx, fields...) + }() + ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Web{ + Web: req, + }, + }) + if err != nil { + return nil, err + } + fields = append(fields, "egressID", ei.EgressId) + return ei, err +} + +func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) { + fields := []interface{}{ + "room", req.RoomName, + "identity", req.Identity, + "outputType", egress.GetOutputType(req), + } + defer func() { + AppendLogFields(ctx, fields...) + }() + ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Participant{ + Participant: req, + }, + }) + if err != nil { + return nil, err + } + fields = append(fields, "egressID", ei.EgressId) + return ei, err +} + func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *livekit.TrackCompositeEgressRequest) (*livekit.EgressInfo, error) { fields := []interface{}{ - "room", req.RoomName, "audioTrackID", req.AudioTrackId, "videoTrackID", req.VideoTrackId, - } - if t := reflect.TypeOf(req.Output); t != nil { - fields = append(fields, "outputType", t.String()) + "room", req.RoomName, + "audioTrackID", req.AudioTrackId, + "videoTrackID", req.VideoTrackId, + "outputType", egress.GetOutputType(req), } defer func() { AppendLogFields(ctx, fields...) @@ -141,30 +183,6 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track return ei, err } -func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgressRequest) (*livekit.EgressInfo, error) { - fields := []interface{}{"url", req.Url} - if t := reflect.TypeOf(req.Output); t != nil { - fields = append(fields, "outputType", t.String()) - } - defer func() { - AppendLogFields(ctx, fields...) - }() - ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{ - Request: &rpc.StartEgressRequest_Web{ - Web: req, - }, - }) - if err != nil { - return nil, err - } - fields = append(fields, "egressID", ei.EgressId) - return ei, err -} - -func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) { - return nil, errors.New("under development") -} - func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index e7fbf878f..6db962549 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -85,10 +85,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if req.Egress != nil && req.Egress.Tracks != nil { internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} } - if req.MinPlayoutDelay > 0 { + if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { rm.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, Min: req.MinPlayoutDelay, + Max: req.MaxPlayoutDelay, } } @@ -160,5 +161,6 @@ func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) { room.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: conf.PlayoutDelay.Enabled, Min: uint32(conf.PlayoutDelay.Min), + Max: uint32(conf.PlayoutDelay.Max), } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 86931681f..3ed1c1d87 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -326,14 +326,20 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { }) // set initial playout delay to minimum value - if d.params.PlayoutDelayLimit.GetEnabled() && d.params.PlayoutDelayLimit.GetMin() > 0 { + if d.params.PlayoutDelayLimit.GetEnabled() { + maxDelay := uint32(rtpextension.PlayoutDelayDefaultMax) + if d.params.PlayoutDelayLimit.GetMax() > 0 { + maxDelay = d.params.PlayoutDelayLimit.GetMax() + } delay := rtpextension.PlayoutDelayFromValue( uint16(d.params.PlayoutDelayLimit.GetMin()), - rtpextension.PlayoutDelayDefaultMax, + uint16(maxDelay), ) b, err := delay.Marshal() if err == nil { d.playoutDelayBytes.Store(b) + } else { + d.params.Logger.Errorw("failed to marshal playout delay", err, "playoutDelay", d.params.PlayoutDelayLimit) } } if d.kind == webrtc.RTPCodecTypeVideo { @@ -1309,7 +1315,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan defer ticker.Stop() for { - if generation != d.blankFramesGeneration.Load() || numFrames <= 0 { + if generation != d.blankFramesGeneration.Load() || numFrames <= 0 || !d.writable.Load() || !d.rtpStats.IsActive() { close(done) return }