Rpcs for ingress proxy WHIP (#3911)

See https://github.com/livekit/protocol/pull/1194
This commit is contained in:
cnderrauber
2025-09-09 22:49:42 +08:00
committed by GitHub
parent 991a4a4f53
commit 76645fad5e
13 changed files with 189 additions and 76 deletions
+1 -1
View File
@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade
github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292
github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846
github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55
github.com/mackerelio/go-osstat v0.2.6
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade h1:lpxPcglwzUWNB4J0S2qZuyMehzmR7vW9whzSwV4IGoI=
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292 h1:JKs2f+btdWXaHG67BBzbKeEeNXVAdlaqJarGLFFa6+Q=
github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg=
github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846 h1:J/Ry54lAQShsUZQ74DLxuvVyPl92TCaYi/47eLLKem0=
github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg=
github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55 h1:6/iy4APnZZDmtDOxoqv3/eo5hxhpaA/M0ND75XqL7aA=
github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
+4
View File
@@ -45,6 +45,10 @@ func (c ClientInfo) isAndroid() bool {
return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Os, "android")
}
func (c ClientInfo) isOBS() bool {
return c.ClientInfo != nil && strings.Contains(c.ClientInfo.Browser, "OBS")
}
func (c ClientInfo) SupportsAudioRED() bool {
return !c.isFirefox() && !c.isSafari()
}
+29 -14
View File
@@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil/pkg/twcc"
@@ -305,7 +306,7 @@ type ParticipantImpl struct {
migrateState atomic.Value // types.MigrateState
migratedTracksPublishedFuse core.Fuse
onClose func(types.LocalParticipant)
onClose map[string]func(types.LocalParticipant)
onClaimsChanged func(participant types.LocalParticipant)
onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
@@ -359,6 +360,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
joiningMessageFirstSeqs: make(map[livekit.ParticipantID]uint32),
joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32),
},
onClose: make(map[string]func(types.LocalParticipant)),
}
p.setupSignalling()
@@ -1048,14 +1050,18 @@ func (p *ParticipantImpl) getOnLeave() func(types.LocalParticipant, types.Partic
return p.onLeave
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) {
func (p *ParticipantImpl) AddOnClose(key string, callback func(types.LocalParticipant)) {
if p.isClosed.Load() {
go callback(p)
return
}
p.lock.Lock()
p.onClose = callback
if callback == nil {
delete(p.onClose, key)
} else {
p.onClose[key] = callback
}
p.lock.Unlock()
}
@@ -1505,10 +1511,10 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
// ensure this is synchronized
p.CloseSignalConnection(types.SignallingCloseReasonParticipantClose)
p.lock.RLock()
onClose := p.onClose
onClose := maps.Values(p.onClose)
p.lock.RUnlock()
if onClose != nil {
onClose(p)
for _, cb := range onClose {
cb(p)
}
// Close peer connections without blocking participant Close. If peer connections are gathering candidates
@@ -2851,9 +2857,14 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
if len(req.SimulcastCodecs) == 0 {
// clients not supporting simulcast codecs, synthesise a codec
videoLayerMode := livekit.VideoLayer_MODE_UNUSED
if p.params.ClientInfo.isOBS() {
videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR
}
ti.Codecs = append(ti.Codecs, &livekit.SimulcastCodecInfo{
Cid: req.Cid,
Layers: cloneLayers(req.Layers),
Cid: req.Cid,
Layers: cloneLayers(req.Layers),
VideoLayerMode: videoLayerMode,
})
} else {
seenCodecs := make(map[string]struct{})
@@ -2888,7 +2899,11 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
if mime.IsMimeTypeStringSVC(mimeType) {
videoLayerMode = livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM
} else {
videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM
if p.params.ClientInfo.isOBS() {
videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR
} else {
videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM
}
}
}
} else if req.Type == livekit.TrackType_AUDIO {
@@ -3966,8 +3981,8 @@ func (p *ParticipantImpl) SupportsMoving() error {
return ErrMoveOldClientVersion
}
if kind := p.Kind(); kind == livekit.ParticipantInfo_EGRESS || kind == livekit.ParticipantInfo_AGENT {
return fmt.Errorf("%s participants cannot be moved", kind.String())
if kind := p.Kind(); kind == livekit.ParticipantInfo_EGRESS || kind == livekit.ParticipantInfo_AGENT || p.params.UseOneShotSignallingMode {
return fmt.Errorf("%s participants cannot be moved, one-shot signaling mode: %t", kind.String(), p.params.UseOneShotSignallingMode)
}
return nil
@@ -3977,10 +3992,10 @@ func (p *ParticipantImpl) MoveToRoom(params types.MoveToRoomParams) {
// fire onClose callback for original room
p.lock.Lock()
onClose := p.onClose
p.onClose = nil
p.onClose = make(map[string]func(types.LocalParticipant))
p.lock.Unlock()
if onClose != nil {
onClose(p)
for _, cb := range onClose {
cb(p)
}
for _, track := range p.GetPublishedTracks() {
+7 -1
View File
@@ -265,6 +265,12 @@ func (s SignallingCloseReason) String() string {
}
}
// ---------------------------------------------
const (
ParticipantCloseKeyNormal = "normal"
ParticipantCloseKeyWHIP = "whip"
)
// ---------------------------------------------
//counterfeiter:generate . Participant
@@ -470,7 +476,7 @@ type LocalParticipant interface {
OnDataPacket(callback func(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket))
OnDataMessage(callback func(LocalParticipant, []byte))
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant))
AddOnClose(key string, callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnUpdateSubscriptions(func(
LocalParticipant,
@@ -21,6 +21,12 @@ import (
)
type FakeLocalParticipant struct {
AddOnCloseStub func(string, func(types.LocalParticipant))
addOnCloseMutex sync.RWMutex
addOnCloseArgsForCall []struct {
arg1 string
arg2 func(types.LocalParticipant)
}
AddTrackStub func(*livekit.AddTrackRequest)
addTrackMutex sync.RWMutex
addTrackArgsForCall []struct {
@@ -849,11 +855,6 @@ type FakeLocalParticipant struct {
onClaimsChangedArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnCloseStub func(func(types.LocalParticipant))
onCloseMutex sync.RWMutex
onCloseArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnDataMessageStub func(func(types.LocalParticipant, []byte))
onDataMessageMutex sync.RWMutex
onDataMessageArgsForCall []struct {
@@ -1426,6 +1427,39 @@ type FakeLocalParticipant struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeLocalParticipant) AddOnClose(arg1 string, arg2 func(types.LocalParticipant)) {
fake.addOnCloseMutex.Lock()
fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {
arg1 string
arg2 func(types.LocalParticipant)
}{arg1, arg2})
stub := fake.AddOnCloseStub
fake.recordInvocation("AddOnClose", []interface{}{arg1, arg2})
fake.addOnCloseMutex.Unlock()
if stub != nil {
fake.AddOnCloseStub(arg1, arg2)
}
}
func (fake *FakeLocalParticipant) AddOnCloseCallCount() int {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
return len(fake.addOnCloseArgsForCall)
}
func (fake *FakeLocalParticipant) AddOnCloseCalls(stub func(string, func(types.LocalParticipant))) {
fake.addOnCloseMutex.Lock()
defer fake.addOnCloseMutex.Unlock()
fake.AddOnCloseStub = stub
}
func (fake *FakeLocalParticipant) AddOnCloseArgsForCall(i int) (string, func(types.LocalParticipant)) {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
argsForCall := fake.addOnCloseArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) AddTrack(arg1 *livekit.AddTrackRequest) {
fake.addTrackMutex.Lock()
fake.addTrackArgsForCall = append(fake.addTrackArgsForCall, struct {
@@ -5822,38 +5856,6 @@ func (fake *FakeLocalParticipant) OnClaimsChangedArgsForCall(i int) func(types.L
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant)) {
fake.onCloseMutex.Lock()
fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct {
arg1 func(types.LocalParticipant)
}{arg1})
stub := fake.OnCloseStub
fake.recordInvocation("OnClose", []interface{}{arg1})
fake.onCloseMutex.Unlock()
if stub != nil {
fake.OnCloseStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnCloseCallCount() int {
fake.onCloseMutex.RLock()
defer fake.onCloseMutex.RUnlock()
return len(fake.onCloseArgsForCall)
}
func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant))) {
fake.onCloseMutex.Lock()
defer fake.onCloseMutex.Unlock()
fake.OnCloseStub = stub
}
func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant) {
fake.onCloseMutex.RLock()
defer fake.onCloseMutex.RUnlock()
argsForCall := fake.onCloseArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnDataMessage(arg1 func(types.LocalParticipant, []byte)) {
fake.onDataMessageMutex.Lock()
fake.onDataMessageArgsForCall = append(fake.onDataMessageArgsForCall, struct {
+6 -2
View File
@@ -154,7 +154,11 @@ func NewLocalRoomManager(
return nil, err
}
r.whipServer, err = rpc.NewWHIPServer[livekit.NodeID](whipService{r}, bus, rpc.WithDefaultServerOptions(conf.PSRPC, logger.GetLogger()))
whipService, err := newWhipService(r)
if err != nil {
return nil, err
}
r.whipServer, err = rpc.NewWHIPServer[livekit.NodeID](whipService, bus, rpc.WithDefaultServerOptions(conf.PSRPC, logger.GetLogger()))
if err != nil {
return nil, err
}
@@ -548,7 +552,7 @@ func (r *RoomManager) StartSession(
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region(), Node: string(r.currentNode.NodeID())}
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true)
participant.OnClose(func(p types.LocalParticipant) {
participant.AddOnClose(types.ParticipantCloseKeyNormal, func(p types.LocalParticipant) {
participantServerClosers.Close()
if err := r.roomStore.DeleteParticipant(ctx, room.Name(), p.Identity()); err != nil {
+40 -4
View File
@@ -5,6 +5,10 @@ import (
"fmt"
"time"
"github.com/pion/webrtc/v4"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
@@ -12,13 +16,23 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/pion/webrtc/v4"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/emptypb"
)
type whipService struct {
*RoomManager
ingressRpcCli rpc.IngressHandlerClient
}
func newWhipService(rm *RoomManager) (*whipService, error) {
cli, err := rpc.NewIngressHandlerClient(rm.bus, rpc.WithDefaultClientOptions(logger.GetLogger()))
if err != nil {
return nil, err
}
return &whipService{
RoomManager: rm,
ingressRpcCli: cli,
}, nil
}
func (s whipService) Create(ctx context.Context, req *rpc.WHIPCreateRequest) (*rpc.WHIPCreateResponse, error) {
@@ -97,6 +111,21 @@ func (s whipService) Create(ctx context.Context, req *rpc.WHIPCreateRequest) (*r
return nil, err
}
if req.FromIngress {
lp.AddOnClose(types.ParticipantCloseKeyWHIP, func(lp types.LocalParticipant) {
go func() {
lp.GetLogger().Debugw("whip service: notify participant closed")
_, err := s.ingressRpcCli.WHIPRTCConnectionNotify(context.Background(), string(lp.ID()), &rpc.WHIPRTCConnectionNotifyRequest{
ParticipantId: string(lp.ID()),
Closed: true,
}, psrpc.WithRequestTimeout(rpc.DefaultPSRPCConfig.Timeout))
if err != nil {
lp.GetLogger().Warnw("whip service: could not notify ingress of participant closed", err)
}
}()
})
}
var iceServers []*livekit.ICEServer
apiKey, _, err := s.RoomManager.getFirstKeyPair()
if err != nil {
@@ -185,12 +214,19 @@ func (r whipParticipantService) DeleteSession(ctx context.Context, req *rpc.WHIP
return nil, ErrRoomNotFound
}
reason := types.ParticipantCloseReasonClientRequestLeave
lp := room.GetParticipantByID(livekit.ParticipantID(req.ParticipantId))
if lp == nil && req.FromSweeper && req.ParticipantId == "" {
lp = room.GetParticipant(livekit.ParticipantIdentity(req.ParticipantIdentity))
reason = types.ParticipantCloseReasonStale
}
if lp != nil {
lp.AddOnClose(types.ParticipantCloseKeyWHIP, nil)
room.RemoveParticipant(
lp.Identity(),
lp.ID(),
types.ParticipantCloseReasonClientRequestLeave,
reason,
)
}
+32 -2
View File
@@ -27,6 +27,7 @@ import (
"sync"
"github.com/ua-parser/uap-go/uaparser"
"gopkg.in/yaml.v3"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -162,9 +163,37 @@ var (
userAgentParserInit sync.Once
)
func createUserAgentParserWithCustomRules() (*uaparser.Parser, error) {
defaultYaml := uaparser.DefinitionYaml
rules := make(map[string]interface{})
err := yaml.Unmarshal(defaultYaml, rules)
if err != nil {
return nil, err
}
rules["user_agent_parsers"] = append(rules["user_agent_parsers"].([]interface{}), map[string]interface{}{
"regex": "OBS-Studio\\/([0-9\\.]+)",
"family_replacement": "OBS Studio",
"v1_replacement": "$1",
})
customYaml, err := yaml.Marshal(rules)
if err != nil {
return nil, err
}
return uaparser.NewFromBytes([]byte(customYaml))
}
func getUserAgentParser() *uaparser.Parser {
userAgentParserInit.Do(func() {
userAgentParserCache = uaparser.NewFromSaved()
if parser, err := createUserAgentParserWithCustomRules(); err != nil {
logger.Warnw("could not create user agent parser with custom rules, using default", err)
userAgentParserCache = uaparser.NewFromSaved()
} else {
userAgentParserCache = parser
}
})
return userAgentParserCache
}
@@ -177,7 +206,8 @@ func AugmentClientInfo(ci *livekit.ClientInfo, req *http.Request) {
if ci.Sdk == livekit.ClientInfo_JS ||
ci.Sdk == livekit.ClientInfo_REACT_NATIVE ||
ci.Sdk == livekit.ClientInfo_FLUTTER ||
ci.Sdk == livekit.ClientInfo_UNITY {
ci.Sdk == livekit.ClientInfo_UNITY ||
ci.Sdk == livekit.ClientInfo_UNKNOWN {
client := getUserAgentParser().Parse(req.UserAgent())
if ci.Browser == "" {
ci.Browser = client.UserAgent.Family
+8 -3
View File
@@ -23,6 +23,9 @@ import (
"net/url"
"strings"
"github.com/pion/webrtc/v4"
"github.com/tomnomnom/linkheader"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
@@ -33,8 +36,6 @@ import (
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
"github.com/pion/webrtc/v4"
"github.com/tomnomnom/linkheader"
)
const (
@@ -115,6 +116,7 @@ type createRequest struct {
ClientIP string
OfferSDP string
SubscribedParticipantTrackNames map[string][]string
FromIngress bool
}
func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, error) {
@@ -152,6 +154,8 @@ func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, erro
}
}
fromIngress := r.Header.Get("X-Livekit-Ingress")
offerSDPBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("body does not have SDP offer: %s", err)
@@ -196,6 +200,7 @@ func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, erro
clientInfo.ClientIP,
offerSDP,
clientInfo.SubscribedParticipantTrackNames,
fromIngress != "",
}, http.StatusOK, nil
}
@@ -242,6 +247,7 @@ func (s *WHIPService) handleCreate(w http.ResponseWriter, r *http.Request) {
OfferSdp: req.OfferSDP,
StartSession: starSession,
SubscribedParticipantTracks: subscribedParticipantTracks,
FromIngress: req.FromIngress,
})
if err != nil {
s.handleError("Create", w, r, http.StatusServiceUnavailable, err)
@@ -298,7 +304,6 @@ func (s *WHIPService) handleCreate(w http.ResponseWriter, r *http.Request) {
"status", http.StatusCreated,
"response", logger.Proto(res),
)
return
}
func (s *WHIPService) handleParticipantGet(w http.ResponseWriter, r *http.Request) {
+7 -7
View File
@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
roomClient, err := rpc.NewTypedRoomClient(clientParams)
v, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
v2, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
if err != nil {
return nil, err
}
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
if err != nil {
return nil, err
}
+7 -2
View File
@@ -37,6 +37,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/mime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector"
)
@@ -302,6 +303,10 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
f.lock.Lock()
defer f.lock.Unlock()
if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR {
f.skipReferenceTS = true
}
toMimeType := mime.NormalizeMimeType(codec.MimeType)
codecChanged := f.mime != mime.MimeTypeUnknown && f.mime != toMimeType
if codecChanged {
@@ -346,7 +351,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
}
case mime.MimeTypeVP9:
if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM {
if sfuutils.IsSimulcastMode(videoLayerMode) {
if f.vls != nil {
f.vls = videolayerselector.NewSimulcastFromOther(f.vls)
} else {
@@ -370,7 +375,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [
}
case mime.MimeTypeAV1:
if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM {
if sfuutils.IsSimulcastMode(videoLayerMode) {
if f.vls != nil {
f.vls = videolayerselector.NewSimulcastFromOther(f.vls)
} else {
+7 -1
View File
@@ -18,10 +18,12 @@ import (
"errors"
"fmt"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/protocol/livekit"
)
// Do a fuzzy find for a codec in the list of codecs
@@ -89,3 +91,7 @@ func ValidateRTPPacket(pkt *rtp.Packet, expectedPayloadType uint8, expectedSSRC
return nil
}
func IsSimulcastMode(m livekit.VideoLayer_Mode) bool {
return m == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM || m == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR
}