diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index e4492bcc8..4451d0a4c 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/redis/go-redis/v9" + "go.uber.org/atomic" "google.golang.org/protobuf/proto" "github.com/livekit/protocol/auth" @@ -40,6 +41,37 @@ type MessageSink interface { ConnectionID() livekit.ConnectionID } +// ---------- + +type NullMessageSink struct { + connID livekit.ConnectionID + isClosed atomic.Bool +} + +func NewNullMessageSink(connID livekit.ConnectionID) *NullMessageSink { + return &NullMessageSink{ + connID: connID, + } +} + +func (n *NullMessageSink) WriteMessage(_msg proto.Message) error { + return nil +} + +func (n *NullMessageSink) IsClosed() bool { + return n.isClosed.Load() +} + +func (n *NullMessageSink) Close() { + n.isClosed.Store(true) +} + +func (n *NullMessageSink) ConnectionID() livekit.ConnectionID { + return n.connID +} + +// ------------------------------------------------ + //counterfeiter:generate . MessageSource type MessageSource interface { // ReadChan exposes a one way channel to make it easier to use with select @@ -49,6 +81,41 @@ type MessageSource interface { ConnectionID() livekit.ConnectionID } +// ---------- + +type NullMessageSource struct { + connID livekit.ConnectionID + msgChan chan proto.Message + isClosed atomic.Bool +} + +func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource { + return &NullMessageSource{ + connID: connID, + msgChan: make(chan proto.Message, 0), + } +} + +func (n *NullMessageSource) ReadChan() <-chan proto.Message { + return n.msgChan +} + +func (n *NullMessageSource) IsClosed() bool { + return n.isClosed.Load() +} + +func (n *NullMessageSource) Close() { + if !n.isClosed.Swap(true) { + close(n.msgChan) + } +} + +func (n *NullMessageSource) ConnectionID() livekit.ConnectionID { + return n.connID +} + +// ------------------------------------------------ + type ParticipantInit struct { Identity livekit.ParticipantIdentity Name livekit.ParticipantName diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index 4141f6aff..d66fc0076 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -19,27 +19,27 @@ import ( ) var ( - ErrRoomClosed = errors.New("room has already closed") - ErrPermissionDenied = errors.New("no permissions to access the room") - ErrMaxParticipantsExceeded = errors.New("room has exceeded its max participants") - ErrLimitExceeded = errors.New("node has exceeded its configured limit") - ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room") - ErrDataChannelUnavailable = errors.New("data channel is not available") - ErrDataChannelBufferFull = errors.New("data channel buffer is full") - ErrTransportFailure = errors.New("transport failure") - ErrEmptyIdentity = errors.New("participant identity cannot be empty") - ErrEmptyParticipantID = errors.New("participant ID cannot be empty") - ErrMissingGrants = errors.New("VideoGrant is missing") - ErrInternalError = errors.New("internal error") - ErrNameExceedsLimits = errors.New("name length exceeds limits") - ErrMetadataExceedsLimits = errors.New("metadata size exceeds limits") - ErrAttributesExceedsLimits = errors.New("attributes size exceeds limits") + ErrRoomClosed = errors.New("room has already closed") + ErrParticipantSessionClosed = errors.New("participant session is already closed") + ErrPermissionDenied = errors.New("no permissions to access the room") + ErrMaxParticipantsExceeded = errors.New("room has exceeded its max participants") + ErrLimitExceeded = errors.New("node has exceeded its configured limit") + ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room") + ErrDataChannelUnavailable = errors.New("data channel is not available") + ErrDataChannelBufferFull = errors.New("data channel buffer is full") + ErrTransportFailure = errors.New("transport failure") + ErrEmptyIdentity = errors.New("participant identity cannot be empty") + ErrEmptyParticipantID = errors.New("participant ID cannot be empty") + ErrMissingGrants = errors.New("VideoGrant is missing") + ErrInternalError = errors.New("internal error") + ErrNameExceedsLimits = errors.New("name length exceeds limits") + ErrMetadataExceedsLimits = errors.New("metadata size exceeds limits") + ErrAttributesExceedsLimits = errors.New("attributes size exceeds limits") // Track subscription related ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track") ErrNoSubscribePermission = errors.New("participant is not given permission to subscribe to tracks") ErrTrackNotFound = errors.New("track cannot be found") - ErrTrackNotAttached = errors.New("track is not yet attached") ErrTrackNotBound = errors.New("track not bound") ErrSubscriptionLimitExceeded = errors.New("participant has exceeded its subscription limit") diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index ecde486fd..ff0d2f75d 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -288,12 +288,12 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * // if the attributes match. This prevents SDP from bloating // because of dormant transceivers building up. // - sender, transceiver, err = sub.AddTrackToSubscriber(downTrack, addTrackParams) + sender, transceiver, err = sub.AddTrackLocal(downTrack, addTrackParams) if err != nil { return nil, err } } else { - sender, transceiver, err = sub.AddTransceiverFromTrackToSubscriber(downTrack, addTrackParams) + sender, transceiver, err = sub.AddTransceiverFromTrackLocal(downTrack, addTrackParams) if err != nil { return nil, err } @@ -305,6 +305,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * sub.UncacheDownTrack(transceiver) // negotiation isn't required if we've replaced track + // ONE-SHOT-SIGNALLING-MODE: this should not be needed, but that mode information is not available here, + // but it is not detrimental to set this, needs clean up when participants modes are separated out better. subTrack.SetNeedsNegotiation(!replacedTrack) subTrack.SetRTPSender(sender) // it is possible that subscribed track is closed before subscription manager sets @@ -316,7 +318,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * // the `OnClose` callback. So, set it here to handle cases of early close. subTrack.OnClose(func(isExpectedToResume bool) { if !isExpectedToResume { - if err := sub.RemoveTrackFromSubscriber(sender); err != nil { + if err := sub.RemoveTrackLocal(sender); err != nil { t.params.Logger.Warnw("could not remove track from peer connection", err) } } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ca4c5da35..d7ef6062d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -160,6 +160,7 @@ type ParticipantParams struct { MetricConfig metric.MetricConfig UseSendSideBWEInterceptor bool UseSendSideBWE bool + UseOneShotSignallingMode bool } type ParticipantImpl struct { @@ -815,17 +816,105 @@ func (p *ParticipantImpl) HandleSignalSourceClose() { } } +func (p *ParticipantImpl) synthesizeAddTrackRequests(offer webrtc.SessionDescription) error { + parsed, err := offer.Unmarshal() + if err != nil { + return err + } + + for _, m := range parsed.MediaDescriptions { + if !strings.EqualFold(m.MediaName.Media, "audio") { + // ONE-SHOT-SIGNALLING-MODE-TODO: support video + continue + } + + trackID := "" + + msid, ok := m.Attribute(sdp.AttrKeyMsid) + if ok { + if split := strings.Split(msid, " "); len(split) == 2 { + trackID = split[1] + } + } + + if trackID == "" { + attr, ok := m.Attribute(sdp.AttrKeySSRC) + if ok { + split := strings.Split(attr, " ") + if len(split) == 3 && strings.HasPrefix(split[1], "msid:") { + trackID = split[2] + } + } + } + + if trackID == "" { + trackID = guid.New(utils.TrackPrefix) + } + + req := &livekit.AddTrackRequest{ + Cid: trackID, + Name: "synthesized-microphone", + Source: livekit.TrackSource_MICROPHONE, + Type: livekit.TrackType_AUDIO, + DisableDtx: true, + Stereo: false, + Stream: "camera", + } + p.AddTrack(req) + } + return nil +} + // HandleOffer an offer from remote participant, used when clients make the initial connection -func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) { +func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) error { p.pubLogger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER, "offer", offer) + + if p.params.UseOneShotSignallingMode { + if err := p.synthesizeAddTrackRequests(offer); err != nil { + return err + } + } + shouldPend := false if p.MigrateState() == types.MigrateStateInit { shouldPend = true } offer = p.setCodecPreferencesForPublisher(offer) + err := p.TransportManager.HandleOffer(offer, shouldPend) + if p.params.UseOneShotSignallingMode { + p.updateState(livekit.ParticipantInfo_ACTIVE) + } + return err +} - p.TransportManager.HandleOffer(offer, shouldPend) +func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error { + if p.IsClosed() || p.IsDisconnected() { + return nil + } + + answer = p.configurePublisherAnswer(answer) + p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer) + return p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_Answer{ + Answer: ToProtoSessionDescription(answer), + }, + }) +} + +func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, error) { + if p.IsClosed() || p.IsDisconnected() { + return webrtc.SessionDescription{}, ErrParticipantSessionClosed + } + + answer, err := p.TransportManager.GetAnswer() + if err != nil { + return answer, err + } + + answer = p.configurePublisherAnswer(answer) + p.pubLogger.Debugw("returning answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer) + return answer, nil } // HandleAnswer handles a client answer response, with subscriber PC, server initiates the @@ -844,20 +933,6 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { p.TransportManager.HandleAnswer(answer) } -func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error { - if p.IsClosed() || p.IsDisconnected() { - return nil - } - - answer = p.configurePublisherAnswer(answer) - p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer) - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Answer{ - Answer: ToProtoSessionDescription(answer), - }, - }) -} - func (p *ParticipantImpl) handleMigrateTracks() { // muted track won't send rtp packet, so it is required to add mediatrack manually. // But, synthesising track publish for unmuted tracks keeps a consistent path. @@ -1018,6 +1093,10 @@ func (p *ParticipantImpl) CloseReason() types.ParticipantCloseReason { // Negotiate subscriber SDP with client, if force is true, will cancel pending // negotiate task and negotiate immediately func (p *ParticipantImpl) Negotiate(force bool) { + if p.params.UseOneShotSignallingMode { + return + } + if p.MigrateState() != types.MigrateStateInit { p.TransportManager.NegotiateSubscriber(force) } @@ -1065,6 +1144,10 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() { } func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool { + if p.params.UseOneShotSignallingMode { + return false + } + allTransportConnected := p.TransportManager.HasSubscriberEverConnected() if p.IsPublisher() { allTransportConnected = allTransportConnected && p.TransportManager.HasPublisherEverConnected() @@ -1139,6 +1222,10 @@ func (p *ParticipantImpl) MigrateState() types.MigrateState { // ICERestart restarts subscriber ICE connections func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) { + if p.params.UseOneShotSignallingMode { + return + } + p.clearDisconnectTimer() p.clearMigrationTimer() @@ -1265,6 +1352,16 @@ func (p *ParticipantImpl) CanSubscribeMetrics() bool { return p.grants.Load().Video.GetCanSubscribeMetrics() } +func (p *ParticipantImpl) Verify() bool { + state := p.State() + isActive := state != livekit.ParticipantInfo_JOINING && state != livekit.ParticipantInfo_JOINED + if p.params.UseOneShotSignallingMode { + isActive = isActive && p.TransportManager.HasPublisherEverConnected() + } + + return isActive +} + func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) { if !p.IsReady() { // we have not sent a JoinResponse yet. metadata would be covered in JoinResponse @@ -1428,7 +1525,7 @@ func (p *ParticipantImpl) setupTransportManager() error { var pth transport.Handler = PublisherTransportHandler{ath} var sth transport.Handler = SubscriberTransportHandler{ath} - subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() + subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() && !p.params.UseOneShotSignallingMode if subscriberAsPrimary { sth = PrimaryTransportHandler{sth, p} } else { @@ -1462,6 +1559,7 @@ func (p *ParticipantImpl) setupTransportManager() error { DataChannelStats: p.dataChannelStats, UseSendSideBWEInterceptor: p.params.UseSendSideBWEInterceptor, UseSendSideBWE: p.params.UseSendSideBWE, + UseOneShotSignallingMode: p.params.UseOneShotSignallingMode, } if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() { // we will disable playout delay for Firefox if the user is expecting @@ -1517,15 +1615,16 @@ func (p *ParticipantImpl) setupUpTrackManager() { func (p *ParticipantImpl) setupSubscriptionManager() { p.SubscriptionManager = NewSubscriptionManager(SubscriptionManagerParams{ - Participant: p, - Logger: p.subLogger.WithoutSampler(), - TrackResolver: p.params.TrackResolver, - Telemetry: p.params.Telemetry, - OnTrackSubscribed: p.onTrackSubscribed, - OnTrackUnsubscribed: p.onTrackUnsubscribed, - OnSubscriptionError: p.onSubscriptionError, - SubscriptionLimitVideo: p.params.SubscriptionLimitVideo, - SubscriptionLimitAudio: p.params.SubscriptionLimitAudio, + Participant: p, + Logger: p.subLogger.WithoutSampler(), + TrackResolver: p.params.TrackResolver, + Telemetry: p.params.Telemetry, + OnTrackSubscribed: p.onTrackSubscribed, + OnTrackUnsubscribed: p.onTrackUnsubscribed, + OnSubscriptionError: p.onSubscriptionError, + SubscriptionLimitVideo: p.params.SubscriptionLimitVideo, + SubscriptionLimitAudio: p.params.SubscriptionLimitAudio, + UseOneShotSignallingMode: p.params.UseOneShotSignallingMode, }) } @@ -1832,6 +1931,12 @@ func (p *ParticipantImpl) setupDisconnectTimer() { } func (p *ParticipantImpl) onAnyTransportFailed() { + if p.params.UseOneShotSignallingMode { + // as there is no way to notify participant, close the participant on transport failure + _ = p.Close(false, types.ParticipantCloseReasonPeerConnectionDisconnected, false) + return + } + p.sendLeaveRequest( types.ParticipantCloseReasonPeerConnectionDisconnected, true, // isExpectedToResume diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index f94532bb5..8bcb02935 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -539,8 +539,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } time.AfterFunc(time.Minute, func() { - state := participant.State() - if state == livekit.ParticipantInfo_JOINING || state == livekit.ParticipantInfo_JOINED { + if !participant.Verify() { r.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonJoinTimeout) } }) diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 33585b4f1..2f0f25038 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -61,6 +61,8 @@ type SubscriptionManagerParams struct { Telemetry telemetry.TelemetryService SubscriptionLimitVideo, SubscriptionLimitAudio int32 + + UseOneShotSignallingMode bool } // SubscriptionManager manages a participant's subscriptions @@ -138,6 +140,11 @@ func (m *SubscriptionManager) isClosed() bool { } func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) { + if m.params.UseOneShotSignallingMode { + m.subscribeSynchronous(trackID) + return + } + sub, desireChanged := m.setDesired(trackID, true) if sub == nil { sLogger := m.params.Logger.WithValues( @@ -160,6 +167,15 @@ func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) { } func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) { + if m.params.UseOneShotSignallingMode { + // ONE-SHOT-SIGNALLING-MODE-TODO + // 1. Remove from peer connection + // 2. Analytics events for `TrackUnsubscribed` and `RTPStats`. + // Note that these are sent only if track was bound. + // So, maybe one shot mode also should maintain subscribed tracks? + return + } + sub, desireChanged := m.setDesired(trackID, false) if sub == nil || !desireChanged { return @@ -341,12 +357,11 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) { s.recordAttempt(false) switch err { - case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrTrackNotAttached, ErrSubscriptionLimitExceeded: + case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrSubscriptionLimitExceeded: // these are errors that are outside of our control, so we'll keep trying // - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment // - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment // - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance) - // - ErrTrackNotAttached: Remote Track that is not attached, but may be attached later // - ErrNotOpen: Track is closing or already closed // - ErrSubscriptionLimitExceeded: the participant have reached the limit of subscriptions, wait for the other subscription to be unsubscribed // We'll still log an event to reflect this in telemetry since it's been too long @@ -524,7 +539,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { subTrack, err := track.AddSubscriber(m.params.Participant) if err != nil && !errors.Is(err, errAlreadySubscribed) { // ignore error(s): already subscribed - if !utils.ErrorIsOneOf(err, ErrTrackNotAttached, ErrNoReceiver) { + if !utils.ErrorIsOneOf(err, ErrNoReceiver) { // as track resolution could take some time, not logging errors due to waiting for track resolution m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID) } @@ -596,6 +611,46 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error { return nil } +func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) error { + m.params.Logger.Debugw("executing subscribe synchronous", "trackID", trackID) + + if !m.params.Participant.CanSubscribe() { + return ErrNoSubscribePermission + } + + res := m.params.TrackResolver(m.params.Participant.Identity(), trackID) + m.params.Logger.Debugw("resolved track", "trackID", trackID, " result", res) + + track := res.Track + if track == nil { + return ErrTrackNotFound + } + + subTrack, err := track.AddSubscriber(m.params.Participant) + if err != nil && !errors.Is(err, errAlreadySubscribed) { + return err + } + if err == errAlreadySubscribed { + m.params.Logger.Debugw( + "already subscribed to track", + "trackID", trackID, + "subscribedAudioCount", m.subscribedAudioCount.Load(), + "subscribedVideoCount", m.subscribedVideoCount.Load(), + ) + } + if err == nil && subTrack != nil { // subTrack could be nil if already subscribed + m.params.Logger.Debugw( + "subscribed to track", + "trackID", trackID, + "subscribedAudioCount", m.subscribedAudioCount.Load(), + "subscribedVideoCount", m.subscribedVideoCount.Load(), + ) + } + // ONE-SHOT-SIGNALLING-MODE-TODO + // 1. Analytics events for `TrackSubscribed` + return nil +} + func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error { s.logger.Debugw("executing unsubscribe") @@ -707,7 +762,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i "kind", subTrack.MediaTrack().Kind(), ) - if err := m.params.Participant.RemoveTrackFromSubscriber(sender); err != nil { + if err := m.params.Participant.RemoveTrackLocal(sender); err != nil { if _, ok := err.(*rtcerr.InvalidStateError); !ok { // most of these are safe to ignore, since the track state might have already // been set to Inactive diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 803ebdfe2..97d13a1a7 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -86,6 +86,8 @@ var ( ErrNoTransceiver = errors.New("no transceiver") ErrNoSender = errors.New("no sender") ErrMidNotFound = errors.New("mid not found") + ErrNotSynchronousPeerConnectionMode = errors.New("not using synchronous peer connection mode") + ErrNoRemoteDescription = errors.New("no remote description") ) // ------------------------------------------------------------------------- @@ -259,6 +261,7 @@ type TransportParams struct { DataChannelMaxBufferedAmount uint64 UseSendSideBWEInterceptor bool UseSendSideBWE bool + UseOneShotSignallingMode bool } func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { @@ -492,10 +495,12 @@ func (t *PCTransport) createPeerConnection() error { } t.pc = pc - t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange) + if !t.params.UseOneShotSignallingMode { + // one shot signalling mode gathers all candidates and sends in answer + t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange) + t.pc.OnICECandidate(t.onICECandidateTrickle) + } t.pc.OnICEConnectionStateChange(t.onICEConnectionStateChange) - t.pc.OnICECandidate(t.onICECandidateTrickle) - t.pc.OnConnectionStateChange(t.onPeerConnectionStateChange) t.pc.OnDataChannel(t.onDataChannel) @@ -1018,11 +1023,45 @@ func (t *PCTransport) clearConnTimer() { } } -func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) { +func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) error { + if t.params.UseOneShotSignallingMode { + err := t.pc.SetRemoteDescription(sd) + if err != nil { + t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err) + } + return err + } + t.postEvent(event{ signal: signalRemoteDescriptionReceived, data: &sd, }) + return nil +} + +func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, error) { + if !t.params.UseOneShotSignallingMode { + return webrtc.SessionDescription{}, ErrNotSynchronousPeerConnectionMode + } + + prd := t.pc.PendingRemoteDescription() + if prd == nil || prd.Type != webrtc.SDPTypeOffer { + return webrtc.SessionDescription{}, ErrNoRemoteDescription + } + + answer, err := t.pc.CreateAnswer(nil) + if err != nil { + return webrtc.SessionDescription{}, err + } + + if err = t.pc.SetLocalDescription(answer); err != nil { + return webrtc.SessionDescription{}, err + } + + // wait for gathering to complete to include all candidates in the answer + <-webrtc.GatheringCompletePromise(t.pc) + + return *t.pc.CurrentLocalDescription(), nil } func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 661cd2894..ca34d3a1a 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -105,6 +105,7 @@ type TransportManagerParams struct { DataChannelStats *telemetry.BytesTrackStats UseSendSideBWEInterceptor bool UseSendSideBWE bool + UseOneShotSignallingMode bool } type TransportManager struct { @@ -146,19 +147,20 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER) publisher, err := NewPCTransport(TransportParams{ - ParticipantID: params.SID, - ParticipantIdentity: params.Identity, - ProtocolVersion: params.ProtocolVersion, - Config: params.Config, - Twcc: params.Twcc, - DirectionConfig: params.Config.Publisher, - CongestionControlConfig: params.CongestionControlConfig, - EnabledCodecs: params.EnabledPublishCodecs, - Logger: lgr, - SimTracks: params.SimTracks, - ClientInfo: params.ClientInfo, - Transport: livekit.SignalTarget_PUBLISHER, - Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}}, + ParticipantID: params.SID, + ParticipantIdentity: params.Identity, + ProtocolVersion: params.ProtocolVersion, + Config: params.Config, + Twcc: params.Twcc, + DirectionConfig: params.Config.Publisher, + CongestionControlConfig: params.CongestionControlConfig, + EnabledCodecs: params.EnabledPublishCodecs, + Logger: lgr, + SimTracks: params.SimTracks, + ClientInfo: params.ClientInfo, + Transport: livekit.SignalTarget_PUBLISHER, + Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}}, + UseOneShotSignallingMode: params.UseOneShotSignallingMode, }) if err != nil { return nil, err @@ -232,16 +234,34 @@ func (t *TransportManager) HasSubscriberEverConnected() bool { return t.subscriber.HasEverConnected() } -func (t *TransportManager) AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { - return t.subscriber.AddTrack(trackLocal, params) +func (t *TransportManager) AddTrackLocal( + trackLocal webrtc.TrackLocal, + params types.AddTrackParams, +) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { + if t.params.UseOneShotSignallingMode { + return t.publisher.AddTrack(trackLocal, params) + } else { + return t.subscriber.AddTrack(trackLocal, params) + } } -func (t *TransportManager) AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { - return t.subscriber.AddTransceiverFromTrack(trackLocal, params) +func (t *TransportManager) AddTransceiverFromTrackLocal( + trackLocal webrtc.TrackLocal, + params types.AddTrackParams, +) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { + if t.params.UseOneShotSignallingMode { + return t.publisher.AddTransceiverFromTrack(trackLocal, params) + } else { + return t.subscriber.AddTransceiverFromTrack(trackLocal, params) + } } -func (t *TransportManager) RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error { - return t.subscriber.RemoveTrack(sender) +func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error { + if t.params.UseOneShotSignallingMode { + return t.publisher.RemoveTrack(sender) + } else { + return t.subscriber.RemoveTrack(sender) + } } func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error { @@ -373,17 +393,25 @@ func (t *TransportManager) LastPublisherOffer() webrtc.SessionDescription { return webrtc.SessionDescription{} } -func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, shouldPend bool) { +func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, shouldPend bool) error { t.lock.Lock() if shouldPend { t.pendingOfferPublisher = &offer t.lock.Unlock() - return + return nil } t.lock.Unlock() t.lastPublisherOffer.Store(offer) - t.publisher.HandleRemoteDescription(offer) + return t.publisher.HandleRemoteDescription(offer) +} + +func (t *TransportManager) GetAnswer() (webrtc.SessionDescription, error) { + answer, err := t.publisher.GetAnswer() + if err == nil { + t.lastPublisherAnswer.Store(answer) + } + return answer, err } func (t *TransportManager) ProcessPendingPublisherOffer() { @@ -536,7 +564,7 @@ func (t *TransportManager) getTransport(isPrimary bool) *PCTransport { } func (t *TransportManager) handleConnectionFailed(isShortLived bool) { - if !t.params.AllowTCPFallback { + if !t.params.AllowTCPFallback || t.params.UseOneShotSignallingMode { return } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 4cc86e6b4..cfea9dafc 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -362,16 +362,17 @@ type LocalParticipant interface { // PeerConnection AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) - HandleOffer(sdp webrtc.SessionDescription) + HandleOffer(sdp webrtc.SessionDescription) error + GetAnswer() (webrtc.SessionDescription, error) AddTrack(req *livekit.AddTrackRequest) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo HandleAnswer(sdp webrtc.SessionDescription) Negotiate(force bool) ICERestart(iceConfig *livekit.ICEConfig) - AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) - AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) - RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error + AddTrackLocal(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) + AddTransceiverFromTrackLocal(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) + RemoveTrackLocal(sender *webrtc.RTPSender) error WriteSubscriberRTCP(pkts []rtcp.Packet) error @@ -380,6 +381,7 @@ type LocalParticipant interface { UnsubscribeFromTrack(trackID livekit.TrackID) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) GetSubscribedTracks() []SubscribedTrack + Verify() bool VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) // WaitUntilSubscribed waits until all subscriptions have been settled, or if the timeout // has been reached. If the timeout expires, it will return an error. diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index fb2679941..201dd9a49 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -30,34 +30,34 @@ type FakeLocalParticipant struct { addTrackArgsForCall []struct { arg1 *livekit.AddTrackRequest } - AddTrackToSubscriberStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) - addTrackToSubscriberMutex sync.RWMutex - addTrackToSubscriberArgsForCall []struct { + AddTrackLocalStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) + addTrackLocalMutex sync.RWMutex + addTrackLocalArgsForCall []struct { arg1 webrtc.TrackLocal arg2 types.AddTrackParams } - addTrackToSubscriberReturns struct { + addTrackLocalReturns struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error } - addTrackToSubscriberReturnsOnCall map[int]struct { + addTrackLocalReturnsOnCall map[int]struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error } - AddTransceiverFromTrackToSubscriberStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) - addTransceiverFromTrackToSubscriberMutex sync.RWMutex - addTransceiverFromTrackToSubscriberArgsForCall []struct { + AddTransceiverFromTrackLocalStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) + addTransceiverFromTrackLocalMutex sync.RWMutex + addTransceiverFromTrackLocalArgsForCall []struct { arg1 webrtc.TrackLocal arg2 types.AddTrackParams } - addTransceiverFromTrackToSubscriberReturns struct { + addTransceiverFromTrackLocalReturns struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error } - addTransceiverFromTrackToSubscriberReturnsOnCall map[int]struct { + addTransceiverFromTrackLocalReturnsOnCall map[int]struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error @@ -211,6 +211,18 @@ type FakeLocalParticipant struct { getAdaptiveStreamReturnsOnCall map[int]struct { result1 bool } + GetAnswerStub func() (webrtc.SessionDescription, error) + getAnswerMutex sync.RWMutex + getAnswerArgsForCall []struct { + } + getAnswerReturns struct { + result1 webrtc.SessionDescription + result2 error + } + getAnswerReturnsOnCall map[int]struct { + result1 webrtc.SessionDescription + result2 error + } GetAudioLevelStub func() (float64, bool) getAudioLevelMutex sync.RWMutex getAudioLevelArgsForCall []struct { @@ -425,11 +437,17 @@ type FakeLocalParticipant struct { handleMetricsReturnsOnCall map[int]struct { result1 error } - HandleOfferStub func(webrtc.SessionDescription) + HandleOfferStub func(webrtc.SessionDescription) error handleOfferMutex sync.RWMutex handleOfferArgsForCall []struct { arg1 webrtc.SessionDescription } + handleOfferReturns struct { + result1 error + } + handleOfferReturnsOnCall map[int]struct { + result1 error + } HandleReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport) handleReceiverReportMutex sync.RWMutex handleReceiverReportArgsForCall []struct { @@ -733,15 +751,15 @@ type FakeLocalParticipant struct { arg2 bool arg3 bool } - RemoveTrackFromSubscriberStub func(*webrtc.RTPSender) error - removeTrackFromSubscriberMutex sync.RWMutex - removeTrackFromSubscriberArgsForCall []struct { + RemoveTrackLocalStub func(*webrtc.RTPSender) error + removeTrackLocalMutex sync.RWMutex + removeTrackLocalArgsForCall []struct { arg1 *webrtc.RTPSender } - removeTrackFromSubscriberReturns struct { + removeTrackLocalReturns struct { result1 error } - removeTrackFromSubscriberReturnsOnCall map[int]struct { + removeTrackLocalReturnsOnCall map[int]struct { result1 error } SendConnectionQualityUpdateStub func(*livekit.ConnectionQualityUpdate) error @@ -1098,6 +1116,16 @@ type FakeLocalParticipant struct { updateVideoTrackReturnsOnCall map[int]struct { result1 error } + VerifyStub func() bool + verifyMutex sync.RWMutex + verifyArgsForCall []struct { + } + verifyReturns struct { + result1 bool + } + verifyReturnsOnCall map[int]struct { + result1 bool + } VerifySubscribeParticipantInfoStub func(livekit.ParticipantID, uint32) verifySubscribeParticipantInfoMutex sync.RWMutex verifySubscribeParticipantInfoArgsForCall []struct { @@ -1205,17 +1233,17 @@ func (fake *FakeLocalParticipant) AddTrackArgsForCall(i int) *livekit.AddTrackRe return argsForCall.arg1 } -func (fake *FakeLocalParticipant) AddTrackToSubscriber(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { - fake.addTrackToSubscriberMutex.Lock() - ret, specificReturn := fake.addTrackToSubscriberReturnsOnCall[len(fake.addTrackToSubscriberArgsForCall)] - fake.addTrackToSubscriberArgsForCall = append(fake.addTrackToSubscriberArgsForCall, struct { +func (fake *FakeLocalParticipant) AddTrackLocal(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { + fake.addTrackLocalMutex.Lock() + ret, specificReturn := fake.addTrackLocalReturnsOnCall[len(fake.addTrackLocalArgsForCall)] + fake.addTrackLocalArgsForCall = append(fake.addTrackLocalArgsForCall, struct { arg1 webrtc.TrackLocal arg2 types.AddTrackParams }{arg1, arg2}) - stub := fake.AddTrackToSubscriberStub - fakeReturns := fake.addTrackToSubscriberReturns - fake.recordInvocation("AddTrackToSubscriber", []interface{}{arg1, arg2}) - fake.addTrackToSubscriberMutex.Unlock() + stub := fake.AddTrackLocalStub + fakeReturns := fake.addTrackLocalReturns + fake.recordInvocation("AddTrackLocal", []interface{}{arg1, arg2}) + fake.addTrackLocalMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -1225,65 +1253,65 @@ func (fake *FakeLocalParticipant) AddTrackToSubscriber(arg1 webrtc.TrackLocal, a return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } -func (fake *FakeLocalParticipant) AddTrackToSubscriberCallCount() int { - fake.addTrackToSubscriberMutex.RLock() - defer fake.addTrackToSubscriberMutex.RUnlock() - return len(fake.addTrackToSubscriberArgsForCall) +func (fake *FakeLocalParticipant) AddTrackLocalCallCount() int { + fake.addTrackLocalMutex.RLock() + defer fake.addTrackLocalMutex.RUnlock() + return len(fake.addTrackLocalArgsForCall) } -func (fake *FakeLocalParticipant) AddTrackToSubscriberCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) { - fake.addTrackToSubscriberMutex.Lock() - defer fake.addTrackToSubscriberMutex.Unlock() - fake.AddTrackToSubscriberStub = stub +func (fake *FakeLocalParticipant) AddTrackLocalCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) { + fake.addTrackLocalMutex.Lock() + defer fake.addTrackLocalMutex.Unlock() + fake.AddTrackLocalStub = stub } -func (fake *FakeLocalParticipant) AddTrackToSubscriberArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) { - fake.addTrackToSubscriberMutex.RLock() - defer fake.addTrackToSubscriberMutex.RUnlock() - argsForCall := fake.addTrackToSubscriberArgsForCall[i] +func (fake *FakeLocalParticipant) AddTrackLocalArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) { + fake.addTrackLocalMutex.RLock() + defer fake.addTrackLocalMutex.RUnlock() + argsForCall := fake.addTrackLocalArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeLocalParticipant) AddTrackToSubscriberReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { - fake.addTrackToSubscriberMutex.Lock() - defer fake.addTrackToSubscriberMutex.Unlock() - fake.AddTrackToSubscriberStub = nil - fake.addTrackToSubscriberReturns = struct { +func (fake *FakeLocalParticipant) AddTrackLocalReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { + fake.addTrackLocalMutex.Lock() + defer fake.addTrackLocalMutex.Unlock() + fake.AddTrackLocalStub = nil + fake.addTrackLocalReturns = struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error }{result1, result2, result3} } -func (fake *FakeLocalParticipant) AddTrackToSubscriberReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { - fake.addTrackToSubscriberMutex.Lock() - defer fake.addTrackToSubscriberMutex.Unlock() - fake.AddTrackToSubscriberStub = nil - if fake.addTrackToSubscriberReturnsOnCall == nil { - fake.addTrackToSubscriberReturnsOnCall = make(map[int]struct { +func (fake *FakeLocalParticipant) AddTrackLocalReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { + fake.addTrackLocalMutex.Lock() + defer fake.addTrackLocalMutex.Unlock() + fake.AddTrackLocalStub = nil + if fake.addTrackLocalReturnsOnCall == nil { + fake.addTrackLocalReturnsOnCall = make(map[int]struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error }) } - fake.addTrackToSubscriberReturnsOnCall[i] = struct { + fake.addTrackLocalReturnsOnCall[i] = struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error }{result1, result2, result3} } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriber(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { - fake.addTransceiverFromTrackToSubscriberMutex.Lock() - ret, specificReturn := fake.addTransceiverFromTrackToSubscriberReturnsOnCall[len(fake.addTransceiverFromTrackToSubscriberArgsForCall)] - fake.addTransceiverFromTrackToSubscriberArgsForCall = append(fake.addTransceiverFromTrackToSubscriberArgsForCall, struct { +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocal(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { + fake.addTransceiverFromTrackLocalMutex.Lock() + ret, specificReturn := fake.addTransceiverFromTrackLocalReturnsOnCall[len(fake.addTransceiverFromTrackLocalArgsForCall)] + fake.addTransceiverFromTrackLocalArgsForCall = append(fake.addTransceiverFromTrackLocalArgsForCall, struct { arg1 webrtc.TrackLocal arg2 types.AddTrackParams }{arg1, arg2}) - stub := fake.AddTransceiverFromTrackToSubscriberStub - fakeReturns := fake.addTransceiverFromTrackToSubscriberReturns - fake.recordInvocation("AddTransceiverFromTrackToSubscriber", []interface{}{arg1, arg2}) - fake.addTransceiverFromTrackToSubscriberMutex.Unlock() + stub := fake.AddTransceiverFromTrackLocalStub + fakeReturns := fake.addTransceiverFromTrackLocalReturns + fake.recordInvocation("AddTransceiverFromTrackLocal", []interface{}{arg1, arg2}) + fake.addTransceiverFromTrackLocalMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -1293,48 +1321,48 @@ func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriber(arg1 webrt return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberCallCount() int { - fake.addTransceiverFromTrackToSubscriberMutex.RLock() - defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock() - return len(fake.addTransceiverFromTrackToSubscriberArgsForCall) +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalCallCount() int { + fake.addTransceiverFromTrackLocalMutex.RLock() + defer fake.addTransceiverFromTrackLocalMutex.RUnlock() + return len(fake.addTransceiverFromTrackLocalArgsForCall) } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) { - fake.addTransceiverFromTrackToSubscriberMutex.Lock() - defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock() - fake.AddTransceiverFromTrackToSubscriberStub = stub +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) { + fake.addTransceiverFromTrackLocalMutex.Lock() + defer fake.addTransceiverFromTrackLocalMutex.Unlock() + fake.AddTransceiverFromTrackLocalStub = stub } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) { - fake.addTransceiverFromTrackToSubscriberMutex.RLock() - defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock() - argsForCall := fake.addTransceiverFromTrackToSubscriberArgsForCall[i] +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) { + fake.addTransceiverFromTrackLocalMutex.RLock() + defer fake.addTransceiverFromTrackLocalMutex.RUnlock() + argsForCall := fake.addTransceiverFromTrackLocalArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { - fake.addTransceiverFromTrackToSubscriberMutex.Lock() - defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock() - fake.AddTransceiverFromTrackToSubscriberStub = nil - fake.addTransceiverFromTrackToSubscriberReturns = struct { +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { + fake.addTransceiverFromTrackLocalMutex.Lock() + defer fake.addTransceiverFromTrackLocalMutex.Unlock() + fake.AddTransceiverFromTrackLocalStub = nil + fake.addTransceiverFromTrackLocalReturns = struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error }{result1, result2, result3} } -func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { - fake.addTransceiverFromTrackToSubscriberMutex.Lock() - defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock() - fake.AddTransceiverFromTrackToSubscriberStub = nil - if fake.addTransceiverFromTrackToSubscriberReturnsOnCall == nil { - fake.addTransceiverFromTrackToSubscriberReturnsOnCall = make(map[int]struct { +func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) { + fake.addTransceiverFromTrackLocalMutex.Lock() + defer fake.addTransceiverFromTrackLocalMutex.Unlock() + fake.AddTransceiverFromTrackLocalStub = nil + if fake.addTransceiverFromTrackLocalReturnsOnCall == nil { + fake.addTransceiverFromTrackLocalReturnsOnCall = make(map[int]struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error }) } - fake.addTransceiverFromTrackToSubscriberReturnsOnCall[i] = struct { + fake.addTransceiverFromTrackLocalReturnsOnCall[i] = struct { result1 *webrtc.RTPSender result2 *webrtc.RTPTransceiver result3 error @@ -2124,6 +2152,62 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1 }{result1} } +func (fake *FakeLocalParticipant) GetAnswer() (webrtc.SessionDescription, error) { + fake.getAnswerMutex.Lock() + ret, specificReturn := fake.getAnswerReturnsOnCall[len(fake.getAnswerArgsForCall)] + fake.getAnswerArgsForCall = append(fake.getAnswerArgsForCall, struct { + }{}) + stub := fake.GetAnswerStub + fakeReturns := fake.getAnswerReturns + fake.recordInvocation("GetAnswer", []interface{}{}) + fake.getAnswerMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeLocalParticipant) GetAnswerCallCount() int { + fake.getAnswerMutex.RLock() + defer fake.getAnswerMutex.RUnlock() + return len(fake.getAnswerArgsForCall) +} + +func (fake *FakeLocalParticipant) GetAnswerCalls(stub func() (webrtc.SessionDescription, error)) { + fake.getAnswerMutex.Lock() + defer fake.getAnswerMutex.Unlock() + fake.GetAnswerStub = stub +} + +func (fake *FakeLocalParticipant) GetAnswerReturns(result1 webrtc.SessionDescription, result2 error) { + fake.getAnswerMutex.Lock() + defer fake.getAnswerMutex.Unlock() + fake.GetAnswerStub = nil + fake.getAnswerReturns = struct { + result1 webrtc.SessionDescription + result2 error + }{result1, result2} +} + +func (fake *FakeLocalParticipant) GetAnswerReturnsOnCall(i int, result1 webrtc.SessionDescription, result2 error) { + fake.getAnswerMutex.Lock() + defer fake.getAnswerMutex.Unlock() + fake.GetAnswerStub = nil + if fake.getAnswerReturnsOnCall == nil { + fake.getAnswerReturnsOnCall = make(map[int]struct { + result1 webrtc.SessionDescription + result2 error + }) + } + fake.getAnswerReturnsOnCall[i] = struct { + result1 webrtc.SessionDescription + result2 error + }{result1, result2} +} + func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) { fake.getAudioLevelMutex.Lock() ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] @@ -3255,17 +3339,23 @@ func (fake *FakeLocalParticipant) HandleMetricsReturnsOnCall(i int, result1 erro }{result1} } -func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) { +func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) error { fake.handleOfferMutex.Lock() + ret, specificReturn := fake.handleOfferReturnsOnCall[len(fake.handleOfferArgsForCall)] fake.handleOfferArgsForCall = append(fake.handleOfferArgsForCall, struct { arg1 webrtc.SessionDescription }{arg1}) stub := fake.HandleOfferStub + fakeReturns := fake.handleOfferReturns fake.recordInvocation("HandleOffer", []interface{}{arg1}) fake.handleOfferMutex.Unlock() if stub != nil { - fake.HandleOfferStub(arg1) + return stub(arg1) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeLocalParticipant) HandleOfferCallCount() int { @@ -3274,7 +3364,7 @@ func (fake *FakeLocalParticipant) HandleOfferCallCount() int { return len(fake.handleOfferArgsForCall) } -func (fake *FakeLocalParticipant) HandleOfferCalls(stub func(webrtc.SessionDescription)) { +func (fake *FakeLocalParticipant) HandleOfferCalls(stub func(webrtc.SessionDescription) error) { fake.handleOfferMutex.Lock() defer fake.handleOfferMutex.Unlock() fake.HandleOfferStub = stub @@ -3287,6 +3377,29 @@ func (fake *FakeLocalParticipant) HandleOfferArgsForCall(i int) webrtc.SessionDe return argsForCall.arg1 } +func (fake *FakeLocalParticipant) HandleOfferReturns(result1 error) { + fake.handleOfferMutex.Lock() + defer fake.handleOfferMutex.Unlock() + fake.HandleOfferStub = nil + fake.handleOfferReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleOfferReturnsOnCall(i int, result1 error) { + fake.handleOfferMutex.Lock() + defer fake.handleOfferMutex.Unlock() + fake.HandleOfferStub = nil + if fake.handleOfferReturnsOnCall == nil { + fake.handleOfferReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleOfferReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) HandleReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) { fake.handleReceiverReportMutex.Lock() fake.handleReceiverReportArgsForCall = append(fake.handleReceiverReportArgsForCall, struct { @@ -4977,16 +5090,16 @@ func (fake *FakeLocalParticipant) RemovePublishedTrackArgsForCall(i int) (types. return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriber(arg1 *webrtc.RTPSender) error { - fake.removeTrackFromSubscriberMutex.Lock() - ret, specificReturn := fake.removeTrackFromSubscriberReturnsOnCall[len(fake.removeTrackFromSubscriberArgsForCall)] - fake.removeTrackFromSubscriberArgsForCall = append(fake.removeTrackFromSubscriberArgsForCall, struct { +func (fake *FakeLocalParticipant) RemoveTrackLocal(arg1 *webrtc.RTPSender) error { + fake.removeTrackLocalMutex.Lock() + ret, specificReturn := fake.removeTrackLocalReturnsOnCall[len(fake.removeTrackLocalArgsForCall)] + fake.removeTrackLocalArgsForCall = append(fake.removeTrackLocalArgsForCall, struct { arg1 *webrtc.RTPSender }{arg1}) - stub := fake.RemoveTrackFromSubscriberStub - fakeReturns := fake.removeTrackFromSubscriberReturns - fake.recordInvocation("RemoveTrackFromSubscriber", []interface{}{arg1}) - fake.removeTrackFromSubscriberMutex.Unlock() + stub := fake.RemoveTrackLocalStub + fakeReturns := fake.removeTrackLocalReturns + fake.recordInvocation("RemoveTrackLocal", []interface{}{arg1}) + fake.removeTrackLocalMutex.Unlock() if stub != nil { return stub(arg1) } @@ -4996,44 +5109,44 @@ func (fake *FakeLocalParticipant) RemoveTrackFromSubscriber(arg1 *webrtc.RTPSend return fakeReturns.result1 } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberCallCount() int { - fake.removeTrackFromSubscriberMutex.RLock() - defer fake.removeTrackFromSubscriberMutex.RUnlock() - return len(fake.removeTrackFromSubscriberArgsForCall) +func (fake *FakeLocalParticipant) RemoveTrackLocalCallCount() int { + fake.removeTrackLocalMutex.RLock() + defer fake.removeTrackLocalMutex.RUnlock() + return len(fake.removeTrackLocalArgsForCall) } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberCalls(stub func(*webrtc.RTPSender) error) { - fake.removeTrackFromSubscriberMutex.Lock() - defer fake.removeTrackFromSubscriberMutex.Unlock() - fake.RemoveTrackFromSubscriberStub = stub +func (fake *FakeLocalParticipant) RemoveTrackLocalCalls(stub func(*webrtc.RTPSender) error) { + fake.removeTrackLocalMutex.Lock() + defer fake.removeTrackLocalMutex.Unlock() + fake.RemoveTrackLocalStub = stub } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberArgsForCall(i int) *webrtc.RTPSender { - fake.removeTrackFromSubscriberMutex.RLock() - defer fake.removeTrackFromSubscriberMutex.RUnlock() - argsForCall := fake.removeTrackFromSubscriberArgsForCall[i] +func (fake *FakeLocalParticipant) RemoveTrackLocalArgsForCall(i int) *webrtc.RTPSender { + fake.removeTrackLocalMutex.RLock() + defer fake.removeTrackLocalMutex.RUnlock() + argsForCall := fake.removeTrackLocalArgsForCall[i] return argsForCall.arg1 } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberReturns(result1 error) { - fake.removeTrackFromSubscriberMutex.Lock() - defer fake.removeTrackFromSubscriberMutex.Unlock() - fake.RemoveTrackFromSubscriberStub = nil - fake.removeTrackFromSubscriberReturns = struct { +func (fake *FakeLocalParticipant) RemoveTrackLocalReturns(result1 error) { + fake.removeTrackLocalMutex.Lock() + defer fake.removeTrackLocalMutex.Unlock() + fake.RemoveTrackLocalStub = nil + fake.removeTrackLocalReturns = struct { result1 error }{result1} } -func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberReturnsOnCall(i int, result1 error) { - fake.removeTrackFromSubscriberMutex.Lock() - defer fake.removeTrackFromSubscriberMutex.Unlock() - fake.RemoveTrackFromSubscriberStub = nil - if fake.removeTrackFromSubscriberReturnsOnCall == nil { - fake.removeTrackFromSubscriberReturnsOnCall = make(map[int]struct { +func (fake *FakeLocalParticipant) RemoveTrackLocalReturnsOnCall(i int, result1 error) { + fake.removeTrackLocalMutex.Lock() + defer fake.removeTrackLocalMutex.Unlock() + fake.RemoveTrackLocalStub = nil + if fake.removeTrackLocalReturnsOnCall == nil { + fake.removeTrackLocalReturnsOnCall = make(map[int]struct { result1 error }) } - fake.removeTrackFromSubscriberReturnsOnCall[i] = struct { + fake.removeTrackLocalReturnsOnCall[i] = struct { result1 error }{result1} } @@ -6997,6 +7110,59 @@ func (fake *FakeLocalParticipant) UpdateVideoTrackReturnsOnCall(i int, result1 e }{result1} } +func (fake *FakeLocalParticipant) Verify() bool { + fake.verifyMutex.Lock() + ret, specificReturn := fake.verifyReturnsOnCall[len(fake.verifyArgsForCall)] + fake.verifyArgsForCall = append(fake.verifyArgsForCall, struct { + }{}) + stub := fake.VerifyStub + fakeReturns := fake.verifyReturns + fake.recordInvocation("Verify", []interface{}{}) + fake.verifyMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) VerifyCallCount() int { + fake.verifyMutex.RLock() + defer fake.verifyMutex.RUnlock() + return len(fake.verifyArgsForCall) +} + +func (fake *FakeLocalParticipant) VerifyCalls(stub func() bool) { + fake.verifyMutex.Lock() + defer fake.verifyMutex.Unlock() + fake.VerifyStub = stub +} + +func (fake *FakeLocalParticipant) VerifyReturns(result1 bool) { + fake.verifyMutex.Lock() + defer fake.verifyMutex.Unlock() + fake.VerifyStub = nil + fake.verifyReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) VerifyReturnsOnCall(i int, result1 bool) { + fake.verifyMutex.Lock() + defer fake.verifyMutex.Unlock() + fake.VerifyStub = nil + if fake.verifyReturnsOnCall == nil { + fake.verifyReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.verifyReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfo(arg1 livekit.ParticipantID, arg2 uint32) { fake.verifySubscribeParticipantInfoMutex.Lock() fake.verifySubscribeParticipantInfoArgsForCall = append(fake.verifySubscribeParticipantInfoArgsForCall, struct { @@ -7217,10 +7383,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.addICECandidateMutex.RUnlock() fake.addTrackMutex.RLock() defer fake.addTrackMutex.RUnlock() - fake.addTrackToSubscriberMutex.RLock() - defer fake.addTrackToSubscriberMutex.RUnlock() - fake.addTransceiverFromTrackToSubscriberMutex.RLock() - defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock() + fake.addTrackLocalMutex.RLock() + defer fake.addTrackLocalMutex.RUnlock() + fake.addTransceiverFromTrackLocalMutex.RLock() + defer fake.addTransceiverFromTrackLocalMutex.RUnlock() fake.cacheDownTrackMutex.RLock() defer fake.cacheDownTrackMutex.RUnlock() fake.canPublishMutex.RLock() @@ -7251,6 +7417,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.disconnectedMutex.RUnlock() fake.getAdaptiveStreamMutex.RLock() defer fake.getAdaptiveStreamMutex.RUnlock() + fake.getAnswerMutex.RLock() + defer fake.getAnswerMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() fake.getBufferFactoryMutex.RLock() @@ -7373,8 +7541,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock() defer fake.removePublishedTrackMutex.RUnlock() - fake.removeTrackFromSubscriberMutex.RLock() - defer fake.removeTrackFromSubscriberMutex.RUnlock() + fake.removeTrackLocalMutex.RLock() + defer fake.removeTrackLocalMutex.RUnlock() fake.sendConnectionQualityUpdateMutex.RLock() defer fake.sendConnectionQualityUpdateMutex.RUnlock() fake.sendDataPacketMutex.RLock() @@ -7457,6 +7625,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.updateSubscriptionPermissionMutex.RUnlock() fake.updateVideoTrackMutex.RLock() defer fake.updateVideoTrackMutex.RUnlock() + fake.verifyMutex.RLock() + defer fake.verifyMutex.RUnlock() fake.verifySubscribeParticipantInfoMutex.RLock() defer fake.verifySubscribeParticipantInfoMutex.RUnlock() fake.versionMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index ca102f29d..dcc59a592 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -281,6 +281,7 @@ func (r *RoomManager) StartSession( pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink, + useOneShotSignallingMode bool, ) error { sessionStartTime := time.Now() @@ -489,6 +490,7 @@ func (r *RoomManager) StartSession( SyncStreams: roomInternal.GetSyncStreams(), ForwardStats: r.forwardStats, MetricConfig: r.config.Metric, + UseOneShotSignallingMode: useOneShotSignallingMode, }) if err != nil { return err diff --git a/pkg/service/servicefakes/fake_room_allocator.go b/pkg/service/servicefakes/fake_room_allocator.go index 9a60a47ee..8cc11f637 100644 --- a/pkg/service/servicefakes/fake_room_allocator.go +++ b/pkg/service/servicefakes/fake_room_allocator.go @@ -10,6 +10,17 @@ import ( ) type FakeRoomAllocator struct { + AutoCreateEnabledStub func(context.Context) bool + autoCreateEnabledMutex sync.RWMutex + autoCreateEnabledArgsForCall []struct { + arg1 context.Context + } + autoCreateEnabledReturns struct { + result1 bool + } + autoCreateEnabledReturnsOnCall map[int]struct { + result1 bool + } CreateRoomStub func(context.Context, *livekit.CreateRoomRequest, bool) (*livekit.Room, *livekit.RoomInternal, bool, error) createRoomMutex sync.RWMutex createRoomArgsForCall []struct { @@ -29,17 +40,6 @@ type FakeRoomAllocator struct { result3 bool result4 error } - AutoCreateEnabledStub func(context.Context) bool - roomAutoCreateEnabledMutex sync.RWMutex - roomAutoCreateEnabledArgsForCall []struct { - arg1 context.Context - } - roomAutoCreateEnabledReturns struct { - result1 bool - } - roomAutoCreateEnabledReturnsOnCall map[int]struct { - result1 bool - } SelectRoomNodeStub func(context.Context, livekit.RoomName, livekit.NodeID) error selectRoomNodeMutex sync.RWMutex selectRoomNodeArgsForCall []struct { @@ -69,6 +69,67 @@ type FakeRoomAllocator struct { invocationsMutex sync.RWMutex } +func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool { + fake.autoCreateEnabledMutex.Lock() + ret, specificReturn := fake.autoCreateEnabledReturnsOnCall[len(fake.autoCreateEnabledArgsForCall)] + fake.autoCreateEnabledArgsForCall = append(fake.autoCreateEnabledArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.AutoCreateEnabledStub + fakeReturns := fake.autoCreateEnabledReturns + fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1}) + fake.autoCreateEnabledMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int { + fake.autoCreateEnabledMutex.RLock() + defer fake.autoCreateEnabledMutex.RUnlock() + return len(fake.autoCreateEnabledArgsForCall) +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) { + fake.autoCreateEnabledMutex.Lock() + defer fake.autoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = stub +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context { + fake.autoCreateEnabledMutex.RLock() + defer fake.autoCreateEnabledMutex.RUnlock() + argsForCall := fake.autoCreateEnabledArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) { + fake.autoCreateEnabledMutex.Lock() + defer fake.autoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = nil + fake.autoCreateEnabledReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) { + fake.autoCreateEnabledMutex.Lock() + defer fake.autoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = nil + if fake.autoCreateEnabledReturnsOnCall == nil { + fake.autoCreateEnabledReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.autoCreateEnabledReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest, arg3 bool) (*livekit.Room, *livekit.RoomInternal, bool, error) { fake.createRoomMutex.Lock() ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] @@ -141,67 +202,6 @@ func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.R }{result1, result2, result3, result4} } -func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool { - fake.roomAutoCreateEnabledMutex.Lock() - ret, specificReturn := fake.roomAutoCreateEnabledReturnsOnCall[len(fake.roomAutoCreateEnabledArgsForCall)] - fake.roomAutoCreateEnabledArgsForCall = append(fake.roomAutoCreateEnabledArgsForCall, struct { - arg1 context.Context - }{arg1}) - stub := fake.AutoCreateEnabledStub - fakeReturns := fake.roomAutoCreateEnabledReturns - fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1}) - fake.roomAutoCreateEnabledMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int { - fake.roomAutoCreateEnabledMutex.RLock() - defer fake.roomAutoCreateEnabledMutex.RUnlock() - return len(fake.roomAutoCreateEnabledArgsForCall) -} - -func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) { - fake.roomAutoCreateEnabledMutex.Lock() - defer fake.roomAutoCreateEnabledMutex.Unlock() - fake.AutoCreateEnabledStub = stub -} - -func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context { - fake.roomAutoCreateEnabledMutex.RLock() - defer fake.roomAutoCreateEnabledMutex.RUnlock() - argsForCall := fake.roomAutoCreateEnabledArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) { - fake.roomAutoCreateEnabledMutex.Lock() - defer fake.roomAutoCreateEnabledMutex.Unlock() - fake.AutoCreateEnabledStub = nil - fake.roomAutoCreateEnabledReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) { - fake.roomAutoCreateEnabledMutex.Lock() - defer fake.roomAutoCreateEnabledMutex.Unlock() - fake.AutoCreateEnabledStub = nil - if fake.roomAutoCreateEnabledReturnsOnCall == nil { - fake.roomAutoCreateEnabledReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.roomAutoCreateEnabledReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeRoomAllocator) SelectRoomNode(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.NodeID) error { fake.selectRoomNodeMutex.Lock() ret, specificReturn := fake.selectRoomNodeReturnsOnCall[len(fake.selectRoomNodeArgsForCall)] @@ -330,10 +330,10 @@ func (fake *FakeRoomAllocator) ValidateCreateRoomReturnsOnCall(i int, result1 er func (fake *FakeRoomAllocator) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.autoCreateEnabledMutex.RLock() + defer fake.autoCreateEnabledMutex.RUnlock() fake.createRoomMutex.RLock() defer fake.createRoomMutex.RUnlock() - fake.roomAutoCreateEnabledMutex.RLock() - defer fake.roomAutoCreateEnabledMutex.RUnlock() fake.selectRoomNodeMutex.RLock() defer fake.selectRoomNodeMutex.RUnlock() fake.validateCreateRoomMutex.RLock() diff --git a/pkg/service/signal.go b/pkg/service/signal.go index b636aa018..75866fae3 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -111,7 +111,7 @@ func (s *defaultSessionHandler) HandleSession( return err } - return s.roomManager.StartSession(ctx, pi, requestSource, responseSink) + return s.roomManager.StartSession(ctx, pi, requestSource, responseSink, false) } func (s *SignalServer) Start() error {