diff --git a/go.mod b/go.mod index 105e45f77..18e9f19ab 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade - github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292 + github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846 github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55 github.com/mackerelio/go-osstat v0.2.6 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index cc8e5723b..6df3fd063 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade h1:lpxPcglwzUWNB4J0S2qZuyMehzmR7vW9whzSwV4IGoI= github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292 h1:JKs2f+btdWXaHG67BBzbKeEeNXVAdlaqJarGLFFa6+Q= -github.com/livekit/protocol v1.41.1-0.20250905101817-7b3b388b3292/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg= +github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846 h1:J/Ry54lAQShsUZQ74DLxuvVyPl92TCaYi/47eLLKem0= +github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846/go.mod h1:Scx8arfj5y65w6EYA3ZIKJafoN2xBuV8pauvyrvI4eg= github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55 h1:6/iy4APnZZDmtDOxoqv3/eo5hxhpaA/M0ND75XqL7aA= github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0= diff --git a/pkg/rtc/clientinfo.go b/pkg/rtc/clientinfo.go index 52b179684..25205935c 100644 --- a/pkg/rtc/clientinfo.go +++ b/pkg/rtc/clientinfo.go @@ -45,6 +45,10 @@ func (c ClientInfo) isAndroid() bool { return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Os, "android") } +func (c ClientInfo) isOBS() bool { + return c.ClientInfo != nil && strings.Contains(c.ClientInfo.Browser, "OBS") +} + func (c ClientInfo) SupportsAudioRED() bool { return !c.isFirefox() && !c.isSafari() } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 378e1b430..eab5dbd34 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -32,6 +32,7 @@ import ( "github.com/pkg/errors" "go.uber.org/atomic" "go.uber.org/zap/zapcore" + "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil/pkg/twcc" @@ -305,7 +306,7 @@ type ParticipantImpl struct { migrateState atomic.Value // types.MigrateState migratedTracksPublishedFuse core.Fuse - onClose func(types.LocalParticipant) + onClose map[string]func(types.LocalParticipant) onClaimsChanged func(participant types.LocalParticipant) onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) @@ -359,6 +360,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { joiningMessageFirstSeqs: make(map[livekit.ParticipantID]uint32), joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32), }, + onClose: make(map[string]func(types.LocalParticipant)), } p.setupSignalling() @@ -1048,14 +1050,18 @@ func (p *ParticipantImpl) getOnLeave() func(types.LocalParticipant, types.Partic return p.onLeave } -func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) { +func (p *ParticipantImpl) AddOnClose(key string, callback func(types.LocalParticipant)) { if p.isClosed.Load() { go callback(p) return } p.lock.Lock() - p.onClose = callback + if callback == nil { + delete(p.onClose, key) + } else { + p.onClose[key] = callback + } p.lock.Unlock() } @@ -1505,10 +1511,10 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea // ensure this is synchronized p.CloseSignalConnection(types.SignallingCloseReasonParticipantClose) p.lock.RLock() - onClose := p.onClose + onClose := maps.Values(p.onClose) p.lock.RUnlock() - if onClose != nil { - onClose(p) + for _, cb := range onClose { + cb(p) } // Close peer connections without blocking participant Close. If peer connections are gathering candidates @@ -2851,9 +2857,14 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l if len(req.SimulcastCodecs) == 0 { // clients not supporting simulcast codecs, synthesise a codec + videoLayerMode := livekit.VideoLayer_MODE_UNUSED + if p.params.ClientInfo.isOBS() { + videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR + } ti.Codecs = append(ti.Codecs, &livekit.SimulcastCodecInfo{ - Cid: req.Cid, - Layers: cloneLayers(req.Layers), + Cid: req.Cid, + Layers: cloneLayers(req.Layers), + VideoLayerMode: videoLayerMode, }) } else { seenCodecs := make(map[string]struct{}) @@ -2888,7 +2899,11 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l if mime.IsMimeTypeStringSVC(mimeType) { videoLayerMode = livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM } else { - videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM + if p.params.ClientInfo.isOBS() { + videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR + } else { + videoLayerMode = livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM + } } } } else if req.Type == livekit.TrackType_AUDIO { @@ -3966,8 +3981,8 @@ func (p *ParticipantImpl) SupportsMoving() error { return ErrMoveOldClientVersion } - if kind := p.Kind(); kind == livekit.ParticipantInfo_EGRESS || kind == livekit.ParticipantInfo_AGENT { - return fmt.Errorf("%s participants cannot be moved", kind.String()) + if kind := p.Kind(); kind == livekit.ParticipantInfo_EGRESS || kind == livekit.ParticipantInfo_AGENT || p.params.UseOneShotSignallingMode { + return fmt.Errorf("%s participants cannot be moved, one-shot signaling mode: %t", kind.String(), p.params.UseOneShotSignallingMode) } return nil @@ -3977,10 +3992,10 @@ func (p *ParticipantImpl) MoveToRoom(params types.MoveToRoomParams) { // fire onClose callback for original room p.lock.Lock() onClose := p.onClose - p.onClose = nil + p.onClose = make(map[string]func(types.LocalParticipant)) p.lock.Unlock() - if onClose != nil { - onClose(p) + for _, cb := range onClose { + cb(p) } for _, track := range p.GetPublishedTracks() { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index f22018d50..e732672e9 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -265,6 +265,12 @@ func (s SignallingCloseReason) String() string { } } +// --------------------------------------------- +const ( + ParticipantCloseKeyNormal = "normal" + ParticipantCloseKeyWHIP = "whip" +) + // --------------------------------------------- //counterfeiter:generate . Participant @@ -470,7 +476,7 @@ type LocalParticipant interface { OnDataPacket(callback func(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)) OnDataMessage(callback func(LocalParticipant, []byte)) OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) - OnClose(callback func(LocalParticipant)) + AddOnClose(key string, callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) OnUpdateSubscriptions(func( LocalParticipant, diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 3abdd99b3..abde499ad 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -21,6 +21,12 @@ import ( ) type FakeLocalParticipant struct { + AddOnCloseStub func(string, func(types.LocalParticipant)) + addOnCloseMutex sync.RWMutex + addOnCloseArgsForCall []struct { + arg1 string + arg2 func(types.LocalParticipant) + } AddTrackStub func(*livekit.AddTrackRequest) addTrackMutex sync.RWMutex addTrackArgsForCall []struct { @@ -849,11 +855,6 @@ type FakeLocalParticipant struct { onClaimsChangedArgsForCall []struct { arg1 func(types.LocalParticipant) } - OnCloseStub func(func(types.LocalParticipant)) - onCloseMutex sync.RWMutex - onCloseArgsForCall []struct { - arg1 func(types.LocalParticipant) - } OnDataMessageStub func(func(types.LocalParticipant, []byte)) onDataMessageMutex sync.RWMutex onDataMessageArgsForCall []struct { @@ -1426,6 +1427,39 @@ type FakeLocalParticipant struct { invocationsMutex sync.RWMutex } +func (fake *FakeLocalParticipant) AddOnClose(arg1 string, arg2 func(types.LocalParticipant)) { + fake.addOnCloseMutex.Lock() + fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct { + arg1 string + arg2 func(types.LocalParticipant) + }{arg1, arg2}) + stub := fake.AddOnCloseStub + fake.recordInvocation("AddOnClose", []interface{}{arg1, arg2}) + fake.addOnCloseMutex.Unlock() + if stub != nil { + fake.AddOnCloseStub(arg1, arg2) + } +} + +func (fake *FakeLocalParticipant) AddOnCloseCallCount() int { + fake.addOnCloseMutex.RLock() + defer fake.addOnCloseMutex.RUnlock() + return len(fake.addOnCloseArgsForCall) +} + +func (fake *FakeLocalParticipant) AddOnCloseCalls(stub func(string, func(types.LocalParticipant))) { + fake.addOnCloseMutex.Lock() + defer fake.addOnCloseMutex.Unlock() + fake.AddOnCloseStub = stub +} + +func (fake *FakeLocalParticipant) AddOnCloseArgsForCall(i int) (string, func(types.LocalParticipant)) { + fake.addOnCloseMutex.RLock() + defer fake.addOnCloseMutex.RUnlock() + argsForCall := fake.addOnCloseArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalParticipant) AddTrack(arg1 *livekit.AddTrackRequest) { fake.addTrackMutex.Lock() fake.addTrackArgsForCall = append(fake.addTrackArgsForCall, struct { @@ -5822,38 +5856,6 @@ func (fake *FakeLocalParticipant) OnClaimsChangedArgsForCall(i int) func(types.L return argsForCall.arg1 } -func (fake *FakeLocalParticipant) OnClose(arg1 func(types.LocalParticipant)) { - fake.onCloseMutex.Lock() - fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { - arg1 func(types.LocalParticipant) - }{arg1}) - stub := fake.OnCloseStub - fake.recordInvocation("OnClose", []interface{}{arg1}) - fake.onCloseMutex.Unlock() - if stub != nil { - fake.OnCloseStub(arg1) - } -} - -func (fake *FakeLocalParticipant) OnCloseCallCount() int { - fake.onCloseMutex.RLock() - defer fake.onCloseMutex.RUnlock() - return len(fake.onCloseArgsForCall) -} - -func (fake *FakeLocalParticipant) OnCloseCalls(stub func(func(types.LocalParticipant))) { - fake.onCloseMutex.Lock() - defer fake.onCloseMutex.Unlock() - fake.OnCloseStub = stub -} - -func (fake *FakeLocalParticipant) OnCloseArgsForCall(i int) func(types.LocalParticipant) { - fake.onCloseMutex.RLock() - defer fake.onCloseMutex.RUnlock() - argsForCall := fake.onCloseArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeLocalParticipant) OnDataMessage(arg1 func(types.LocalParticipant, []byte)) { fake.onDataMessageMutex.Lock() fake.onDataMessageArgsForCall = append(fake.onDataMessageArgsForCall, struct { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index e3fb42cdd..070c8cadc 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -154,7 +154,11 @@ func NewLocalRoomManager( return nil, err } - r.whipServer, err = rpc.NewWHIPServer[livekit.NodeID](whipService{r}, bus, rpc.WithDefaultServerOptions(conf.PSRPC, logger.GetLogger())) + whipService, err := newWhipService(r) + if err != nil { + return nil, err + } + r.whipServer, err = rpc.NewWHIPServer[livekit.NodeID](whipService, bus, rpc.WithDefaultServerOptions(conf.PSRPC, logger.GetLogger())) if err != nil { return nil, err } @@ -548,7 +552,7 @@ func (r *RoomManager) StartSession( clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region(), Node: string(r.currentNode.NodeID())} r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true) - participant.OnClose(func(p types.LocalParticipant) { + participant.AddOnClose(types.ParticipantCloseKeyNormal, func(p types.LocalParticipant) { participantServerClosers.Close() if err := r.roomStore.DeleteParticipant(ctx, room.Name(), p.Identity()); err != nil { diff --git a/pkg/service/roommanager_service.go b/pkg/service/roommanager_service.go index 1e91e8bac..2c689687d 100644 --- a/pkg/service/roommanager_service.go +++ b/pkg/service/roommanager_service.go @@ -5,6 +5,10 @@ import ( "fmt" "time" + "github.com/pion/webrtc/v4" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/types/known/emptypb" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" @@ -12,13 +16,23 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" - "github.com/pion/webrtc/v4" - "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/types/known/emptypb" ) type whipService struct { *RoomManager + + ingressRpcCli rpc.IngressHandlerClient +} + +func newWhipService(rm *RoomManager) (*whipService, error) { + cli, err := rpc.NewIngressHandlerClient(rm.bus, rpc.WithDefaultClientOptions(logger.GetLogger())) + if err != nil { + return nil, err + } + return &whipService{ + RoomManager: rm, + ingressRpcCli: cli, + }, nil } func (s whipService) Create(ctx context.Context, req *rpc.WHIPCreateRequest) (*rpc.WHIPCreateResponse, error) { @@ -97,6 +111,21 @@ func (s whipService) Create(ctx context.Context, req *rpc.WHIPCreateRequest) (*r return nil, err } + if req.FromIngress { + lp.AddOnClose(types.ParticipantCloseKeyWHIP, func(lp types.LocalParticipant) { + go func() { + lp.GetLogger().Debugw("whip service: notify participant closed") + _, err := s.ingressRpcCli.WHIPRTCConnectionNotify(context.Background(), string(lp.ID()), &rpc.WHIPRTCConnectionNotifyRequest{ + ParticipantId: string(lp.ID()), + Closed: true, + }, psrpc.WithRequestTimeout(rpc.DefaultPSRPCConfig.Timeout)) + if err != nil { + lp.GetLogger().Warnw("whip service: could not notify ingress of participant closed", err) + } + }() + }) + } + var iceServers []*livekit.ICEServer apiKey, _, err := s.RoomManager.getFirstKeyPair() if err != nil { @@ -185,12 +214,19 @@ func (r whipParticipantService) DeleteSession(ctx context.Context, req *rpc.WHIP return nil, ErrRoomNotFound } + reason := types.ParticipantCloseReasonClientRequestLeave lp := room.GetParticipantByID(livekit.ParticipantID(req.ParticipantId)) + if lp == nil && req.FromSweeper && req.ParticipantId == "" { + lp = room.GetParticipant(livekit.ParticipantIdentity(req.ParticipantIdentity)) + reason = types.ParticipantCloseReasonStale + } + if lp != nil { + lp.AddOnClose(types.ParticipantCloseKeyWHIP, nil) room.RemoveParticipant( lp.Identity(), lp.ID(), - types.ParticipantCloseReasonClientRequestLeave, + reason, ) } diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 2b1d7a50f..446be1d9f 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -27,6 +27,7 @@ import ( "sync" "github.com/ua-parser/uap-go/uaparser" + "gopkg.in/yaml.v3" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -162,9 +163,37 @@ var ( userAgentParserInit sync.Once ) +func createUserAgentParserWithCustomRules() (*uaparser.Parser, error) { + defaultYaml := uaparser.DefinitionYaml + + rules := make(map[string]interface{}) + err := yaml.Unmarshal(defaultYaml, rules) + if err != nil { + return nil, err + } + + rules["user_agent_parsers"] = append(rules["user_agent_parsers"].([]interface{}), map[string]interface{}{ + "regex": "OBS-Studio\\/([0-9\\.]+)", + "family_replacement": "OBS Studio", + "v1_replacement": "$1", + }) + + customYaml, err := yaml.Marshal(rules) + if err != nil { + return nil, err + } + + return uaparser.NewFromBytes([]byte(customYaml)) +} + func getUserAgentParser() *uaparser.Parser { userAgentParserInit.Do(func() { - userAgentParserCache = uaparser.NewFromSaved() + if parser, err := createUserAgentParserWithCustomRules(); err != nil { + logger.Warnw("could not create user agent parser with custom rules, using default", err) + userAgentParserCache = uaparser.NewFromSaved() + } else { + userAgentParserCache = parser + } }) return userAgentParserCache } @@ -177,7 +206,8 @@ func AugmentClientInfo(ci *livekit.ClientInfo, req *http.Request) { if ci.Sdk == livekit.ClientInfo_JS || ci.Sdk == livekit.ClientInfo_REACT_NATIVE || ci.Sdk == livekit.ClientInfo_FLUTTER || - ci.Sdk == livekit.ClientInfo_UNITY { + ci.Sdk == livekit.ClientInfo_UNITY || + ci.Sdk == livekit.ClientInfo_UNKNOWN { client := getUserAgentParser().Parse(req.UserAgent()) if ci.Browser == "" { ci.Browser = client.UserAgent.Family diff --git a/pkg/service/whipservice.go b/pkg/service/whipservice.go index 0e0cb5dc9..83f29ecf3 100644 --- a/pkg/service/whipservice.go +++ b/pkg/service/whipservice.go @@ -23,6 +23,9 @@ import ( "net/url" "strings" + "github.com/pion/webrtc/v4" + "github.com/tomnomnom/linkheader" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" @@ -33,8 +36,6 @@ import ( "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" - "github.com/pion/webrtc/v4" - "github.com/tomnomnom/linkheader" ) const ( @@ -115,6 +116,7 @@ type createRequest struct { ClientIP string OfferSDP string SubscribedParticipantTrackNames map[string][]string + FromIngress bool } func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, error) { @@ -152,6 +154,8 @@ func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, erro } } + fromIngress := r.Header.Get("X-Livekit-Ingress") + offerSDPBytes, err := ioutil.ReadAll(r.Body) if err != nil { return nil, http.StatusBadRequest, fmt.Errorf("body does not have SDP offer: %s", err) @@ -196,6 +200,7 @@ func (s *WHIPService) validateCreate(r *http.Request) (*createRequest, int, erro clientInfo.ClientIP, offerSDP, clientInfo.SubscribedParticipantTrackNames, + fromIngress != "", }, http.StatusOK, nil } @@ -242,6 +247,7 @@ func (s *WHIPService) handleCreate(w http.ResponseWriter, r *http.Request) { OfferSdp: req.OfferSDP, StartSession: starSession, SubscribedParticipantTracks: subscribedParticipantTracks, + FromIngress: req.FromIngress, }) if err != nil { s.handleError("Create", w, r, http.StatusServiceUnavailable, err) @@ -298,7 +304,6 @@ func (s *WHIPService) handleCreate(w http.ResponseWriter, r *http.Request) { "status", http.StatusCreated, "response", logger.Proto(res), ) - return } func (s *WHIPService) handleParticipantGet(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 3b8913f0c..0c62abbbf 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - roomClient, err := rpc.NewTypedRoomClient(clientParams) + v, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - participantClient, err := rpc.NewTypedParticipantClient(clientParams) + v2, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) if err != nil { return nil, err } - agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) + v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) if err != nil { return nil, err } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 5d68ebc6a..ca4a8621c 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -37,6 +37,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/mime" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" "github.com/livekit/livekit-server/pkg/sfu/rtpstats" + sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector" ) @@ -302,6 +303,10 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ f.lock.Lock() defer f.lock.Unlock() + if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR { + f.skipReferenceTS = true + } + toMimeType := mime.NormalizeMimeType(codec.MimeType) codecChanged := f.mime != mime.MimeTypeUnknown && f.mime != toMimeType if codecChanged { @@ -346,7 +351,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ } case mime.MimeTypeVP9: - if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM { + if sfuutils.IsSimulcastMode(videoLayerMode) { if f.vls != nil { f.vls = videolayerselector.NewSimulcastFromOther(f.vls) } else { @@ -370,7 +375,7 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ } case mime.MimeTypeAV1: - if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM { + if sfuutils.IsSimulcastMode(videoLayerMode) { if f.vls != nil { f.vls = videolayerselector.NewSimulcastFromOther(f.vls) } else { diff --git a/pkg/sfu/utils/helpers.go b/pkg/sfu/utils/helpers.go index d237d511a..bfd40194d 100644 --- a/pkg/sfu/utils/helpers.go +++ b/pkg/sfu/utils/helpers.go @@ -18,10 +18,12 @@ import ( "errors" "fmt" - "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/pion/interceptor" "github.com/pion/rtp" "github.com/pion/webrtc/v4" + + "github.com/livekit/livekit-server/pkg/sfu/mime" + "github.com/livekit/protocol/livekit" ) // Do a fuzzy find for a codec in the list of codecs @@ -89,3 +91,7 @@ func ValidateRTPPacket(pkt *rtp.Packet, expectedPayloadType uint8, expectedSSRC return nil } + +func IsSimulcastMode(m livekit.VideoLayer_Mode) bool { + return m == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM || m == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR +}