Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-08-08 09:05:08 +05:30
6 changed files with 157 additions and 55 deletions
+1
View File
@@ -24,6 +24,7 @@ var (
ErrIdentityEmpty = psrpc.NewErrorf(psrpc.InvalidArgument, "identity cannot be empty")
ErrIngressNotConnected = psrpc.NewErrorf(psrpc.Internal, "ingress not connected (redis required)")
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress does not exist")
ErrIngressNonReusable = psrpc.NewErrorf(psrpc.InvalidArgument, "ingress is not reusable and cannot be modified")
ErrMetadataExceedsLimits = psrpc.NewErrorf(psrpc.InvalidArgument, "metadata size exceeds limits")
ErrOperationFailed = psrpc.NewErrorf(psrpc.Internal, "operation cannot be completed")
ErrParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "participant does not exist")
+97 -16
View File
@@ -16,6 +16,8 @@ package service
import (
"context"
"fmt"
"net/url"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
@@ -27,6 +29,10 @@ import (
"github.com/livekit/psrpc"
)
type IngressLauncher interface {
LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error)
}
type IngressService struct {
conf *config.IngressConfig
nodeID livekit.NodeID
@@ -35,6 +41,30 @@ type IngressService struct {
store IngressStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
launcher IngressLauncher
}
func NewIngressServiceWithIngressLauncher(
conf *config.IngressConfig,
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.IngressClient,
store IngressStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
launcher IngressLauncher,
) *IngressService {
return &IngressService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
telemetry: ts,
launcher: launcher,
}
}
func NewIngressService(
@@ -46,16 +76,11 @@ func NewIngressService(
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *IngressService {
s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, rs, ts, nil)
return &IngressService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
telemetry: ts,
}
s.launcher = s
return s
}
func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
@@ -70,17 +95,18 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI
AppendLogFields(ctx, fields...)
}()
var urlPrefix string
var url string
switch req.InputType {
case livekit.IngressInput_RTMP_INPUT:
urlPrefix = s.conf.RTMPBaseURL
url = s.conf.RTMPBaseURL
case livekit.IngressInput_WHIP_INPUT:
urlPrefix = s.conf.WHIPBaseURL
url = s.conf.WHIPBaseURL
case livekit.IngressInput_URL_INPUT:
default:
return nil, ingress.ErrInvalidIngressType
}
ig, err := s.CreateIngressWithUrlPrefix(ctx, urlPrefix, req)
ig, err := s.CreateIngressWithUrl(ctx, url, req)
if err != nil {
return nil, err
}
@@ -89,7 +115,7 @@ func (s *IngressService) CreateIngress(ctx context.Context, req *livekit.CreateI
return ig, nil
}
func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPrefix string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string, req *livekit.CreateIngressRequest) (*livekit.IngressInfo, error) {
err := EnsureIngressAdminPermission(ctx)
if err != nil {
return nil, twirpAuthError(err)
@@ -98,13 +124,28 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return nil, ErrIngressNotConnected
}
if req.InputType == livekit.IngressInput_URL_INPUT {
if req.Url == "" {
return nil, ingress.ErrInvalidIngress("missing URL parameter")
}
urlObj, err := url.Parse(req.Url)
if err != nil {
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}
if urlObj.Scheme != "http" && urlObj.Scheme != "https" {
return nil, ingress.ErrInvalidIngress(fmt.Sprintf("invalid url scheme %s", urlObj.Scheme))
}
// Marshall the URL again for sanitization
urlStr = urlObj.String()
}
sk := utils.NewGuid("")
info := &livekit.IngressInfo{
IngressId: utils.NewGuid(utils.IngressPrefix),
Name: req.Name,
StreamKey: sk,
Url: urlPrefix,
Url: urlStr,
InputType: req.InputType,
Audio: req.Audio,
Video: req.Video,
@@ -112,14 +153,41 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
RoomName: req.RoomName,
ParticipantIdentity: req.ParticipantIdentity,
ParticipantName: req.ParticipantName,
Reusable: req.InputType == livekit.IngressInput_RTMP_INPUT,
State: &livekit.IngressState{},
}
switch req.InputType {
case livekit.IngressInput_RTMP_INPUT,
livekit.IngressInput_WHIP_INPUT:
info.Reusable = true
if err := ingress.ValidateForSerialization(info); err != nil {
return nil, err
}
case livekit.IngressInput_URL_INPUT:
if err := ingress.Validate(info); err != nil {
return nil, err
}
default:
return nil, ingress.ErrInvalidIngressType
}
if err := ingress.ValidateForSerialization(info); err != nil {
return nil, err
}
if req.InputType == livekit.IngressInput_URL_INPUT {
retInfo, err := s.launcher.LaunchPullIngress(ctx, info)
if retInfo != nil {
info = retInfo
} else {
info.State.Status = livekit.IngressState_ENDPOINT_ERROR
info.State.Error = err.Error()
}
if err != nil {
return info, err
}
}
if err = s.store.StoreIngress(ctx, info); err != nil {
logger.Errorw("could not write ingress info", err)
return nil, err
@@ -129,6 +197,14 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return info, nil
}
func (s *IngressService) LaunchPullIngress(ctx context.Context, info *livekit.IngressInfo) (*livekit.IngressInfo, error) {
req := &rpc.StartIngressRequest{
Info: info,
}
return s.psrpcClient.StartIngress(ctx, req)
}
func updateInfoUsingRequest(req *livekit.UpdateIngressRequest, info *livekit.IngressInfo) error {
if req.Name != "" {
info.Name = req.Name
@@ -183,6 +259,11 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
return nil, err
}
if !info.Reusable {
logger.Infow("ingress update attempted on non reusable ingress", "ingressID", info.IngressId)
return info, ErrIngressNonReusable
}
switch info.State.Status {
case livekit.IngressState_ENDPOINT_ERROR:
info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE
@@ -97,8 +97,15 @@ func (d *DependencyDescriptor) MarshalSizeWithActiveChains(activeChains uint32)
}
func (d *DependencyDescriptor) String() string {
return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %+v, Resolution: %+v, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}",
d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, *d.FrameDependencies, *d.Resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure)
resolution, dependencies := "-", "-"
if d.Resolution != nil {
resolution = fmt.Sprintf("%+v", *d.Resolution)
}
if d.FrameDependencies != nil {
dependencies = fmt.Sprintf("%+v", *d.FrameDependencies)
}
return fmt.Sprintf("DependencyDescriptor{FirstPacketInFrame: %v, LastPacketInFrame: %v, FrameNumber: %v, FrameDependencies: %s, Resolution: %s, ActiveDecodeTargetsBitmask: %v, AttachedStructure: %v}",
d.FirstPacketInFrame, d.LastPacketInFrame, d.FrameNumber, dependencies, resolution, formatBitmask(d.ActiveDecodeTargetsBitmask), d.AttachedStructure)
}
// ------------------------------------------------------------------------------
+18 -14
View File
@@ -51,6 +51,23 @@ type trendDetectorSample struct {
at time.Time
}
func trendDetectorSampleListToString(samples []trendDetectorSample) string {
samplesStr := ""
if len(samples) > 0 {
firstTime := samples[0].at
samplesStr += "["
for i, sample := range samples {
suffix := ", "
if i == len(samples)-1 {
suffix = ""
}
samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix)
}
samplesStr += "]"
}
return samplesStr
}
// ------------------------------------------------
type TrendDetectorParams struct {
@@ -147,23 +164,10 @@ func (t *TrendDetector) HasEnoughSamples() bool {
func (t *TrendDetector) ToString() string {
now := time.Now()
elapsed := now.Sub(t.startTime).Seconds()
samplesStr := ""
if len(t.samples) > 0 {
firstTime := t.samples[0].at
samplesStr += "["
for i, sample := range t.samples {
suffix := ", "
if i == len(t.samples)-1 {
suffix = ""
}
samplesStr += fmt.Sprintf("%d(%d)%s", sample.value, sample.at.Sub(firstTime).Milliseconds(), suffix)
}
samplesStr += "]"
}
return fmt.Sprintf("n: %s, t: %+v|%+v|%.2fs, v: %d|%d|%d|%s|%.2f",
t.params.Name,
t.startTime.Format(time.UnixDate), now.Format(time.UnixDate), elapsed,
t.numSamples, t.lowestValue, t.highestValue, samplesStr, kendallsTau(t.samples),
t.numSamples, t.lowestValue, t.highestValue, trendDetectorSampleListToString(t.samples), kendallsTau(t.samples),
)
}