Merge remote-tracking branch 'origin' into raja_min_packets

This commit is contained in:
boks1971
2023-09-19 15:38:24 +05:30
9 changed files with 161 additions and 52 deletions
+1
View File
@@ -161,6 +161,7 @@ keys:
# playout_delay:
# enabled: true
# min: 100
# max: 2000
# Webhooks
# when configured, LiveKit notifies your URL handler with room events
+1 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e
github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+2
View File
@@ -129,6 +129,8 @@ github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c h1:Z44UEdskI35V3nDJfVvYuJ4DOW1wQLku32wbNyz23MM=
github.com/livekit/protocol v1.7.3-0.20230918130519-dd24d071834c/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
+1
View File
@@ -205,6 +205,7 @@ type StreamTrackersConfig struct {
type PlayoutDelayConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
Min int `yaml:"min,omitempty"`
Max int `yaml:"max,omitempty"`
}
type VideoConfig struct {
+33 -14
View File
@@ -28,8 +28,6 @@ import (
"github.com/pion/sctp"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
@@ -38,8 +36,10 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
sutils "github.com/livekit/livekit-server/pkg/utils"
)
const (
@@ -81,6 +81,7 @@ type Room struct {
participants map[livekit.ParticipantIdentity]types.LocalParticipant
participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions
participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource
hasPublished sync.Map // map of identity -> bool
bufferFactory *buffer.FactoryOfBufferFactory
// batch update participant info for non-publishers
@@ -509,6 +510,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
for _, t := range p.GetPublishedTracks() {
r.trackManager.RemoveTrack(t)
}
r.hasPublished.Delete(p.Identity())
p.OnTrackUpdated(nil)
p.OnTrackPublished(nil)
@@ -897,18 +899,35 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
r.trackManager.AddTrack(track, participant.Identity(), participant.ID())
// auto track egress
if r.internal != nil && r.internal.TrackEgress != nil {
if err := StartTrackEgress(
context.Background(),
r.egressLauncher,
r.telemetry,
r.internal.TrackEgress,
track,
r.Name(),
r.ID(),
); err != nil {
r.Logger.Errorw("failed to launch track egress", err)
// auto egress
if r.internal != nil {
if r.internal.ParticipantEgress != nil {
if _, hasPublished := r.hasPublished.Swap(participant.Identity(), true); !hasPublished {
if err := StartParticipantEgress(
context.Background(),
r.egressLauncher,
r.telemetry,
r.internal.ParticipantEgress,
participant.Identity(),
r.Name(),
r.ID(),
); err != nil {
r.Logger.Errorw("failed to launch participant egress", err)
}
}
}
if r.internal.TrackEgress != nil {
if err := StartTrackEgress(
context.Background(),
r.egressLauncher,
r.telemetry,
r.internal.TrackEgress,
track,
r.Name(),
r.ID(),
); err != nil {
r.Logger.Errorw("failed to launch track egress", err)
}
}
}
}
+61 -1
View File
@@ -32,6 +32,67 @@ type EgressLauncher interface {
StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error)
}
func StartParticipantEgress(
ctx context.Context,
launcher EgressLauncher,
ts telemetry.TelemetryService,
opts *livekit.AutoParticipantEgress,
identity livekit.ParticipantIdentity,
roomName livekit.RoomName,
roomID livekit.RoomID,
) error {
if req, err := startParticipantEgress(ctx, launcher, opts, identity, roomName, roomID); err != nil {
// send egress failed webhook
ts.NotifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventEgressEnded,
EgressInfo: &livekit.EgressInfo{
RoomId: string(roomID),
RoomName: string(roomName),
Status: livekit.EgressStatus_EGRESS_FAILED,
Error: err.Error(),
Request: &livekit.EgressInfo_Participant{Participant: req},
},
})
return err
}
return nil
}
func startParticipantEgress(
ctx context.Context,
launcher EgressLauncher,
opts *livekit.AutoParticipantEgress,
identity livekit.ParticipantIdentity,
roomName livekit.RoomName,
roomID livekit.RoomID,
) (*livekit.ParticipantEgressRequest, error) {
req := &livekit.ParticipantEgressRequest{
RoomName: string(roomName),
Identity: string(identity),
FileOutputs: opts.FileOutputs,
SegmentOutputs: opts.SegmentOutputs,
}
switch o := opts.Options.(type) {
case *livekit.AutoParticipantEgress_Preset:
req.Options = &livekit.ParticipantEgressRequest_Preset{Preset: o.Preset}
case *livekit.AutoParticipantEgress_Advanced:
req.Options = &livekit.ParticipantEgressRequest_Advanced{Advanced: o.Advanced}
}
if launcher == nil {
return req, errors.New("egress launcher not found")
}
_, err := launcher.StartEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Participant{
Participant: req,
},
RoomId: string(roomID),
})
return req, err
}
func StartTrackEgress(
ctx context.Context,
launcher EgressLauncher,
@@ -66,7 +127,6 @@ func startTrackEgress(
roomName livekit.RoomName,
roomID livekit.RoomID,
) (*livekit.TrackEgressRequest, error) {
output := &livekit.DirectFileOutput{
Filepath: getFilePath(opts.Filepath),
}
+50 -32
View File
@@ -17,7 +17,6 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -25,6 +24,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
@@ -80,9 +80,10 @@ func NewEgressService(
}
func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livekit.RoomCompositeEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{"room", req.RoomName, "baseUrl", req.CustomBaseUrl}
if t := reflect.TypeOf(req.Output); t != nil {
fields = append(fields, "outputType", t.String())
fields := []interface{}{
"room", req.RoomName,
"baseUrl", req.CustomBaseUrl,
"outputType", egress.GetOutputType(req),
}
defer func() {
AppendLogFields(ctx, fields...)
@@ -99,12 +100,53 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek
return ei, err
}
func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{
"url", req.Url,
"outputType", egress.GetOutputType(req),
}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{
Web: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{
"room", req.RoomName,
"identity", req.Identity,
"outputType", egress.GetOutputType(req),
}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Participant{
Participant: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *livekit.TrackCompositeEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{
"room", req.RoomName, "audioTrackID", req.AudioTrackId, "videoTrackID", req.VideoTrackId,
}
if t := reflect.TypeOf(req.Output); t != nil {
fields = append(fields, "outputType", t.String())
"room", req.RoomName,
"audioTrackID", req.AudioTrackId,
"videoTrackID", req.VideoTrackId,
"outputType", egress.GetOutputType(req),
}
defer func() {
AppendLogFields(ctx, fields...)
@@ -141,30 +183,6 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track
return ei, err
}
func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{"url", req.Url}
if t := reflect.TypeOf(req.Output); t != nil {
fields = append(fields, "outputType", t.String())
}
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{
Web: req,
},
})
if err != nil {
return nil, err
}
fields = append(fields, "egressID", ei.EgressId)
return ei, err
}
func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit.ParticipantEgressRequest) (*livekit.EgressInfo, error) {
return nil, errors.New("under development")
}
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
+3 -1
View File
@@ -85,10 +85,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if req.Egress != nil && req.Egress.Tracks != nil {
internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks}
}
if req.MinPlayoutDelay > 0 {
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
rm.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
Min: req.MinPlayoutDelay,
Max: req.MaxPlayoutDelay,
}
}
@@ -160,5 +161,6 @@ func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) {
room.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: conf.PlayoutDelay.Enabled,
Min: uint32(conf.PlayoutDelay.Min),
Max: uint32(conf.PlayoutDelay.Max),
}
}
+9 -3
View File
@@ -326,14 +326,20 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
})
// set initial playout delay to minimum value
if d.params.PlayoutDelayLimit.GetEnabled() && d.params.PlayoutDelayLimit.GetMin() > 0 {
if d.params.PlayoutDelayLimit.GetEnabled() {
maxDelay := uint32(rtpextension.PlayoutDelayDefaultMax)
if d.params.PlayoutDelayLimit.GetMax() > 0 {
maxDelay = d.params.PlayoutDelayLimit.GetMax()
}
delay := rtpextension.PlayoutDelayFromValue(
uint16(d.params.PlayoutDelayLimit.GetMin()),
rtpextension.PlayoutDelayDefaultMax,
uint16(maxDelay),
)
b, err := delay.Marshal()
if err == nil {
d.playoutDelayBytes.Store(b)
} else {
d.params.Logger.Errorw("failed to marshal playout delay", err, "playoutDelay", d.params.PlayoutDelayLimit)
}
}
if d.kind == webrtc.RTPCodecTypeVideo {
@@ -1309,7 +1315,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
defer ticker.Stop()
for {
if generation != d.blankFramesGeneration.Load() || numFrames <= 0 {
if generation != d.blankFramesGeneration.Load() || numFrames <= 0 || !d.writable.Load() || !d.rtpStats.IsActive() {
close(done)
return
}