One shot signalling mode (#3188)

* WIP

* comment

* Verify method on LocalParticipant

* cleanup

* clean up

* pass in one-shot-mode to StartSession

* null message source and sink

* feedback and also remove check in ParticipantImpl for one-shot-mode-filtering as a null sink can be used for that
This commit is contained in:
Raja Subramanian
2024-11-21 09:33:28 +05:30
committed by GitHub
parent 73fbc6b8bb
commit 9f25603213
13 changed files with 747 additions and 278 deletions
+67
View File
@@ -19,6 +19,7 @@ import (
"encoding/json"
"github.com/redis/go-redis/v9"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/auth"
@@ -40,6 +41,37 @@ type MessageSink interface {
ConnectionID() livekit.ConnectionID
}
// ----------
type NullMessageSink struct {
connID livekit.ConnectionID
isClosed atomic.Bool
}
func NewNullMessageSink(connID livekit.ConnectionID) *NullMessageSink {
return &NullMessageSink{
connID: connID,
}
}
func (n *NullMessageSink) WriteMessage(_msg proto.Message) error {
return nil
}
func (n *NullMessageSink) IsClosed() bool {
return n.isClosed.Load()
}
func (n *NullMessageSink) Close() {
n.isClosed.Store(true)
}
func (n *NullMessageSink) ConnectionID() livekit.ConnectionID {
return n.connID
}
// ------------------------------------------------
//counterfeiter:generate . MessageSource
type MessageSource interface {
// ReadChan exposes a one way channel to make it easier to use with select
@@ -49,6 +81,41 @@ type MessageSource interface {
ConnectionID() livekit.ConnectionID
}
// ----------
type NullMessageSource struct {
connID livekit.ConnectionID
msgChan chan proto.Message
isClosed atomic.Bool
}
func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource {
return &NullMessageSource{
connID: connID,
msgChan: make(chan proto.Message, 0),
}
}
func (n *NullMessageSource) ReadChan() <-chan proto.Message {
return n.msgChan
}
func (n *NullMessageSource) IsClosed() bool {
return n.isClosed.Load()
}
func (n *NullMessageSource) Close() {
if !n.isClosed.Swap(true) {
close(n.msgChan)
}
}
func (n *NullMessageSource) ConnectionID() livekit.ConnectionID {
return n.connID
}
// ------------------------------------------------
type ParticipantInit struct {
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
+16 -16
View File
@@ -19,27 +19,27 @@ import (
)
var (
ErrRoomClosed = errors.New("room has already closed")
ErrPermissionDenied = errors.New("no permissions to access the room")
ErrMaxParticipantsExceeded = errors.New("room has exceeded its max participants")
ErrLimitExceeded = errors.New("node has exceeded its configured limit")
ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room")
ErrDataChannelUnavailable = errors.New("data channel is not available")
ErrDataChannelBufferFull = errors.New("data channel buffer is full")
ErrTransportFailure = errors.New("transport failure")
ErrEmptyIdentity = errors.New("participant identity cannot be empty")
ErrEmptyParticipantID = errors.New("participant ID cannot be empty")
ErrMissingGrants = errors.New("VideoGrant is missing")
ErrInternalError = errors.New("internal error")
ErrNameExceedsLimits = errors.New("name length exceeds limits")
ErrMetadataExceedsLimits = errors.New("metadata size exceeds limits")
ErrAttributesExceedsLimits = errors.New("attributes size exceeds limits")
ErrRoomClosed = errors.New("room has already closed")
ErrParticipantSessionClosed = errors.New("participant session is already closed")
ErrPermissionDenied = errors.New("no permissions to access the room")
ErrMaxParticipantsExceeded = errors.New("room has exceeded its max participants")
ErrLimitExceeded = errors.New("node has exceeded its configured limit")
ErrAlreadyJoined = errors.New("a participant with the same identity is already in the room")
ErrDataChannelUnavailable = errors.New("data channel is not available")
ErrDataChannelBufferFull = errors.New("data channel buffer is full")
ErrTransportFailure = errors.New("transport failure")
ErrEmptyIdentity = errors.New("participant identity cannot be empty")
ErrEmptyParticipantID = errors.New("participant ID cannot be empty")
ErrMissingGrants = errors.New("VideoGrant is missing")
ErrInternalError = errors.New("internal error")
ErrNameExceedsLimits = errors.New("name length exceeds limits")
ErrMetadataExceedsLimits = errors.New("metadata size exceeds limits")
ErrAttributesExceedsLimits = errors.New("attributes size exceeds limits")
// Track subscription related
ErrNoTrackPermission = errors.New("participant is not allowed to subscribe to this track")
ErrNoSubscribePermission = errors.New("participant is not given permission to subscribe to tracks")
ErrTrackNotFound = errors.New("track cannot be found")
ErrTrackNotAttached = errors.New("track is not yet attached")
ErrTrackNotBound = errors.New("track not bound")
ErrSubscriptionLimitExceeded = errors.New("participant has exceeded its subscription limit")
+5 -3
View File
@@ -288,12 +288,12 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
// if the attributes match. This prevents SDP from bloating
// because of dormant transceivers building up.
//
sender, transceiver, err = sub.AddTrackToSubscriber(downTrack, addTrackParams)
sender, transceiver, err = sub.AddTrackLocal(downTrack, addTrackParams)
if err != nil {
return nil, err
}
} else {
sender, transceiver, err = sub.AddTransceiverFromTrackToSubscriber(downTrack, addTrackParams)
sender, transceiver, err = sub.AddTransceiverFromTrackLocal(downTrack, addTrackParams)
if err != nil {
return nil, err
}
@@ -305,6 +305,8 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
sub.UncacheDownTrack(transceiver)
// negotiation isn't required if we've replaced track
// ONE-SHOT-SIGNALLING-MODE: this should not be needed, but that mode information is not available here,
// but it is not detrimental to set this, needs clean up when participants modes are separated out better.
subTrack.SetNeedsNegotiation(!replacedTrack)
subTrack.SetRTPSender(sender)
// it is possible that subscribed track is closed before subscription manager sets
@@ -316,7 +318,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
// the `OnClose` callback. So, set it here to handle cases of early close.
subTrack.OnClose(func(isExpectedToResume bool) {
if !isExpectedToResume {
if err := sub.RemoveTrackFromSubscriber(sender); err != nil {
if err := sub.RemoveTrackLocal(sender); err != nil {
t.params.Logger.Warnw("could not remove track from peer connection", err)
}
}
+131 -26
View File
@@ -160,6 +160,7 @@ type ParticipantParams struct {
MetricConfig metric.MetricConfig
UseSendSideBWEInterceptor bool
UseSendSideBWE bool
UseOneShotSignallingMode bool
}
type ParticipantImpl struct {
@@ -815,17 +816,105 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {
}
}
func (p *ParticipantImpl) synthesizeAddTrackRequests(offer webrtc.SessionDescription) error {
parsed, err := offer.Unmarshal()
if err != nil {
return err
}
for _, m := range parsed.MediaDescriptions {
if !strings.EqualFold(m.MediaName.Media, "audio") {
// ONE-SHOT-SIGNALLING-MODE-TODO: support video
continue
}
trackID := ""
msid, ok := m.Attribute(sdp.AttrKeyMsid)
if ok {
if split := strings.Split(msid, " "); len(split) == 2 {
trackID = split[1]
}
}
if trackID == "" {
attr, ok := m.Attribute(sdp.AttrKeySSRC)
if ok {
split := strings.Split(attr, " ")
if len(split) == 3 && strings.HasPrefix(split[1], "msid:") {
trackID = split[2]
}
}
}
if trackID == "" {
trackID = guid.New(utils.TrackPrefix)
}
req := &livekit.AddTrackRequest{
Cid: trackID,
Name: "synthesized-microphone",
Source: livekit.TrackSource_MICROPHONE,
Type: livekit.TrackType_AUDIO,
DisableDtx: true,
Stereo: false,
Stream: "camera",
}
p.AddTrack(req)
}
return nil
}
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) {
func (p *ParticipantImpl) HandleOffer(offer webrtc.SessionDescription) error {
p.pubLogger.Debugw("received offer", "transport", livekit.SignalTarget_PUBLISHER, "offer", offer)
if p.params.UseOneShotSignallingMode {
if err := p.synthesizeAddTrackRequests(offer); err != nil {
return err
}
}
shouldPend := false
if p.MigrateState() == types.MigrateStateInit {
shouldPend = true
}
offer = p.setCodecPreferencesForPublisher(offer)
err := p.TransportManager.HandleOffer(offer, shouldPend)
if p.params.UseOneShotSignallingMode {
p.updateState(livekit.ParticipantInfo_ACTIVE)
}
return err
}
p.TransportManager.HandleOffer(offer, shouldPend)
func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error {
if p.IsClosed() || p.IsDisconnected() {
return nil
}
answer = p.configurePublisherAnswer(answer)
p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer)
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
},
})
}
func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, error) {
if p.IsClosed() || p.IsDisconnected() {
return webrtc.SessionDescription{}, ErrParticipantSessionClosed
}
answer, err := p.TransportManager.GetAnswer()
if err != nil {
return answer, err
}
answer = p.configurePublisherAnswer(answer)
p.pubLogger.Debugw("returning answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer)
return answer, nil
}
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
@@ -844,20 +933,6 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
p.TransportManager.HandleAnswer(answer)
}
func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription) error {
if p.IsClosed() || p.IsDisconnected() {
return nil
}
answer = p.configurePublisherAnswer(answer)
p.pubLogger.Debugw("sending answer", "transport", livekit.SignalTarget_PUBLISHER, "answer", answer)
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
},
})
}
func (p *ParticipantImpl) handleMigrateTracks() {
// muted track won't send rtp packet, so it is required to add mediatrack manually.
// But, synthesising track publish for unmuted tracks keeps a consistent path.
@@ -1018,6 +1093,10 @@ func (p *ParticipantImpl) CloseReason() types.ParticipantCloseReason {
// Negotiate subscriber SDP with client, if force is true, will cancel pending
// negotiate task and negotiate immediately
func (p *ParticipantImpl) Negotiate(force bool) {
if p.params.UseOneShotSignallingMode {
return
}
if p.MigrateState() != types.MigrateStateInit {
p.TransportManager.NegotiateSubscriber(force)
}
@@ -1065,6 +1144,10 @@ func (p *ParticipantImpl) setupMigrationTimerLocked() {
}
func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
if p.params.UseOneShotSignallingMode {
return false
}
allTransportConnected := p.TransportManager.HasSubscriberEverConnected()
if p.IsPublisher() {
allTransportConnected = allTransportConnected && p.TransportManager.HasPublisherEverConnected()
@@ -1139,6 +1222,10 @@ func (p *ParticipantImpl) MigrateState() types.MigrateState {
// ICERestart restarts subscriber ICE connections
func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) {
if p.params.UseOneShotSignallingMode {
return
}
p.clearDisconnectTimer()
p.clearMigrationTimer()
@@ -1265,6 +1352,16 @@ func (p *ParticipantImpl) CanSubscribeMetrics() bool {
return p.grants.Load().Video.GetCanSubscribeMetrics()
}
func (p *ParticipantImpl) Verify() bool {
state := p.State()
isActive := state != livekit.ParticipantInfo_JOINING && state != livekit.ParticipantInfo_JOINED
if p.params.UseOneShotSignallingMode {
isActive = isActive && p.TransportManager.HasPublisherEverConnected()
}
return isActive
}
func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) {
if !p.IsReady() {
// we have not sent a JoinResponse yet. metadata would be covered in JoinResponse
@@ -1428,7 +1525,7 @@ func (p *ParticipantImpl) setupTransportManager() error {
var pth transport.Handler = PublisherTransportHandler{ath}
var sth transport.Handler = SubscriberTransportHandler{ath}
subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe()
subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() && !p.params.UseOneShotSignallingMode
if subscriberAsPrimary {
sth = PrimaryTransportHandler{sth, p}
} else {
@@ -1462,6 +1559,7 @@ func (p *ParticipantImpl) setupTransportManager() error {
DataChannelStats: p.dataChannelStats,
UseSendSideBWEInterceptor: p.params.UseSendSideBWEInterceptor,
UseSendSideBWE: p.params.UseSendSideBWE,
UseOneShotSignallingMode: p.params.UseOneShotSignallingMode,
}
if p.params.SyncStreams && p.params.PlayoutDelay.GetEnabled() && p.params.ClientInfo.isFirefox() {
// we will disable playout delay for Firefox if the user is expecting
@@ -1517,15 +1615,16 @@ func (p *ParticipantImpl) setupUpTrackManager() {
func (p *ParticipantImpl) setupSubscriptionManager() {
p.SubscriptionManager = NewSubscriptionManager(SubscriptionManagerParams{
Participant: p,
Logger: p.subLogger.WithoutSampler(),
TrackResolver: p.params.TrackResolver,
Telemetry: p.params.Telemetry,
OnTrackSubscribed: p.onTrackSubscribed,
OnTrackUnsubscribed: p.onTrackUnsubscribed,
OnSubscriptionError: p.onSubscriptionError,
SubscriptionLimitVideo: p.params.SubscriptionLimitVideo,
SubscriptionLimitAudio: p.params.SubscriptionLimitAudio,
Participant: p,
Logger: p.subLogger.WithoutSampler(),
TrackResolver: p.params.TrackResolver,
Telemetry: p.params.Telemetry,
OnTrackSubscribed: p.onTrackSubscribed,
OnTrackUnsubscribed: p.onTrackUnsubscribed,
OnSubscriptionError: p.onSubscriptionError,
SubscriptionLimitVideo: p.params.SubscriptionLimitVideo,
SubscriptionLimitAudio: p.params.SubscriptionLimitAudio,
UseOneShotSignallingMode: p.params.UseOneShotSignallingMode,
})
}
@@ -1832,6 +1931,12 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
}
func (p *ParticipantImpl) onAnyTransportFailed() {
if p.params.UseOneShotSignallingMode {
// as there is no way to notify participant, close the participant on transport failure
_ = p.Close(false, types.ParticipantCloseReasonPeerConnectionDisconnected, false)
return
}
p.sendLeaveRequest(
types.ParticipantCloseReasonPeerConnectionDisconnected,
true, // isExpectedToResume
+1 -2
View File
@@ -539,8 +539,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
}
time.AfterFunc(time.Minute, func() {
state := participant.State()
if state == livekit.ParticipantInfo_JOINING || state == livekit.ParticipantInfo_JOINED {
if !participant.Verify() {
r.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonJoinTimeout)
}
})
+59 -4
View File
@@ -61,6 +61,8 @@ type SubscriptionManagerParams struct {
Telemetry telemetry.TelemetryService
SubscriptionLimitVideo, SubscriptionLimitAudio int32
UseOneShotSignallingMode bool
}
// SubscriptionManager manages a participant's subscriptions
@@ -138,6 +140,11 @@ func (m *SubscriptionManager) isClosed() bool {
}
func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) {
if m.params.UseOneShotSignallingMode {
m.subscribeSynchronous(trackID)
return
}
sub, desireChanged := m.setDesired(trackID, true)
if sub == nil {
sLogger := m.params.Logger.WithValues(
@@ -160,6 +167,15 @@ func (m *SubscriptionManager) SubscribeToTrack(trackID livekit.TrackID) {
}
func (m *SubscriptionManager) UnsubscribeFromTrack(trackID livekit.TrackID) {
if m.params.UseOneShotSignallingMode {
// ONE-SHOT-SIGNALLING-MODE-TODO
// 1. Remove from peer connection
// 2. Analytics events for `TrackUnsubscribed` and `RTPStats`.
// Note that these are sent only if track was bound.
// So, maybe one shot mode also should maintain subscribed tracks?
return
}
sub, desireChanged := m.setDesired(trackID, false)
if sub == nil || !desireChanged {
return
@@ -341,12 +357,11 @@ func (m *SubscriptionManager) reconcileSubscription(s *trackSubscription) {
s.recordAttempt(false)
switch err {
case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrTrackNotAttached, ErrSubscriptionLimitExceeded:
case ErrNoTrackPermission, ErrNoSubscribePermission, ErrNoReceiver, ErrNotOpen, ErrSubscriptionLimitExceeded:
// these are errors that are outside of our control, so we'll keep trying
// - ErrNoTrackPermission: publisher did not grant subscriber permission, may change any moment
// - ErrNoSubscribePermission: participant was not granted canSubscribe, may change any moment
// - ErrNoReceiver: Track is in the process of closing (another local track published to the same instance)
// - ErrTrackNotAttached: Remote Track that is not attached, but may be attached later
// - ErrNotOpen: Track is closing or already closed
// - ErrSubscriptionLimitExceeded: the participant have reached the limit of subscriptions, wait for the other subscription to be unsubscribed
// We'll still log an event to reflect this in telemetry since it's been too long
@@ -524,7 +539,7 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
// ignore error(s): already subscribed
if !utils.ErrorIsOneOf(err, ErrTrackNotAttached, ErrNoReceiver) {
if !utils.ErrorIsOneOf(err, ErrNoReceiver) {
// as track resolution could take some time, not logging errors due to waiting for track resolution
m.params.Logger.Warnw("add subscriber failed", err, "trackID", trackID)
}
@@ -596,6 +611,46 @@ func (m *SubscriptionManager) subscribe(s *trackSubscription) error {
return nil
}
func (m *SubscriptionManager) subscribeSynchronous(trackID livekit.TrackID) error {
m.params.Logger.Debugw("executing subscribe synchronous", "trackID", trackID)
if !m.params.Participant.CanSubscribe() {
return ErrNoSubscribePermission
}
res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
m.params.Logger.Debugw("resolved track", "trackID", trackID, " result", res)
track := res.Track
if track == nil {
return ErrTrackNotFound
}
subTrack, err := track.AddSubscriber(m.params.Participant)
if err != nil && !errors.Is(err, errAlreadySubscribed) {
return err
}
if err == errAlreadySubscribed {
m.params.Logger.Debugw(
"already subscribed to track",
"trackID", trackID,
"subscribedAudioCount", m.subscribedAudioCount.Load(),
"subscribedVideoCount", m.subscribedVideoCount.Load(),
)
}
if err == nil && subTrack != nil { // subTrack could be nil if already subscribed
m.params.Logger.Debugw(
"subscribed to track",
"trackID", trackID,
"subscribedAudioCount", m.subscribedAudioCount.Load(),
"subscribedVideoCount", m.subscribedVideoCount.Load(),
)
}
// ONE-SHOT-SIGNALLING-MODE-TODO
// 1. Analytics events for `TrackSubscribed`
return nil
}
func (m *SubscriptionManager) unsubscribe(s *trackSubscription) error {
s.logger.Debugw("executing unsubscribe")
@@ -707,7 +762,7 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i
"kind", subTrack.MediaTrack().Kind(),
)
if err := m.params.Participant.RemoveTrackFromSubscriber(sender); err != nil {
if err := m.params.Participant.RemoveTrackLocal(sender); err != nil {
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
+43 -4
View File
@@ -86,6 +86,8 @@ var (
ErrNoTransceiver = errors.New("no transceiver")
ErrNoSender = errors.New("no sender")
ErrMidNotFound = errors.New("mid not found")
ErrNotSynchronousPeerConnectionMode = errors.New("not using synchronous peer connection mode")
ErrNoRemoteDescription = errors.New("no remote description")
)
// -------------------------------------------------------------------------
@@ -259,6 +261,7 @@ type TransportParams struct {
DataChannelMaxBufferedAmount uint64
UseSendSideBWEInterceptor bool
UseSendSideBWE bool
UseOneShotSignallingMode bool
}
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
@@ -492,10 +495,12 @@ func (t *PCTransport) createPeerConnection() error {
}
t.pc = pc
t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange)
if !t.params.UseOneShotSignallingMode {
// one shot signalling mode gathers all candidates and sends in answer
t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange)
t.pc.OnICECandidate(t.onICECandidateTrickle)
}
t.pc.OnICEConnectionStateChange(t.onICEConnectionStateChange)
t.pc.OnICECandidate(t.onICECandidateTrickle)
t.pc.OnConnectionStateChange(t.onPeerConnectionStateChange)
t.pc.OnDataChannel(t.onDataChannel)
@@ -1018,11 +1023,45 @@ func (t *PCTransport) clearConnTimer() {
}
}
func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) {
func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) error {
if t.params.UseOneShotSignallingMode {
err := t.pc.SetRemoteDescription(sd)
if err != nil {
t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err)
}
return err
}
t.postEvent(event{
signal: signalRemoteDescriptionReceived,
data: &sd,
})
return nil
}
func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, error) {
if !t.params.UseOneShotSignallingMode {
return webrtc.SessionDescription{}, ErrNotSynchronousPeerConnectionMode
}
prd := t.pc.PendingRemoteDescription()
if prd == nil || prd.Type != webrtc.SDPTypeOffer {
return webrtc.SessionDescription{}, ErrNoRemoteDescription
}
answer, err := t.pc.CreateAnswer(nil)
if err != nil {
return webrtc.SessionDescription{}, err
}
if err = t.pc.SetLocalDescription(answer); err != nil {
return webrtc.SessionDescription{}, err
}
// wait for gathering to complete to include all candidates in the answer
<-webrtc.GatheringCompletePromise(t.pc)
return *t.pc.CurrentLocalDescription(), nil
}
func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {
+51 -23
View File
@@ -105,6 +105,7 @@ type TransportManagerParams struct {
DataChannelStats *telemetry.BytesTrackStats
UseSendSideBWEInterceptor bool
UseSendSideBWE bool
UseOneShotSignallingMode bool
}
type TransportManager struct {
@@ -146,19 +147,20 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER)
publisher, err := NewPCTransport(TransportParams{
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
Twcc: params.Twcc,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: lgr,
SimTracks: params.SimTracks,
ClientInfo: params.ClientInfo,
Transport: livekit.SignalTarget_PUBLISHER,
Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}},
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
Twcc: params.Twcc,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: lgr,
SimTracks: params.SimTracks,
ClientInfo: params.ClientInfo,
Transport: livekit.SignalTarget_PUBLISHER,
Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}},
UseOneShotSignallingMode: params.UseOneShotSignallingMode,
})
if err != nil {
return nil, err
@@ -232,16 +234,34 @@ func (t *TransportManager) HasSubscriberEverConnected() bool {
return t.subscriber.HasEverConnected()
}
func (t *TransportManager) AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
return t.subscriber.AddTrack(trackLocal, params)
func (t *TransportManager) AddTrackLocal(
trackLocal webrtc.TrackLocal,
params types.AddTrackParams,
) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
if t.params.UseOneShotSignallingMode {
return t.publisher.AddTrack(trackLocal, params)
} else {
return t.subscriber.AddTrack(trackLocal, params)
}
}
func (t *TransportManager) AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
return t.subscriber.AddTransceiverFromTrack(trackLocal, params)
func (t *TransportManager) AddTransceiverFromTrackLocal(
trackLocal webrtc.TrackLocal,
params types.AddTrackParams,
) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
if t.params.UseOneShotSignallingMode {
return t.publisher.AddTransceiverFromTrack(trackLocal, params)
} else {
return t.subscriber.AddTransceiverFromTrack(trackLocal, params)
}
}
func (t *TransportManager) RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error {
return t.subscriber.RemoveTrack(sender)
func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error {
if t.params.UseOneShotSignallingMode {
return t.publisher.RemoveTrack(sender)
} else {
return t.subscriber.RemoveTrack(sender)
}
}
func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error {
@@ -373,17 +393,25 @@ func (t *TransportManager) LastPublisherOffer() webrtc.SessionDescription {
return webrtc.SessionDescription{}
}
func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, shouldPend bool) {
func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, shouldPend bool) error {
t.lock.Lock()
if shouldPend {
t.pendingOfferPublisher = &offer
t.lock.Unlock()
return
return nil
}
t.lock.Unlock()
t.lastPublisherOffer.Store(offer)
t.publisher.HandleRemoteDescription(offer)
return t.publisher.HandleRemoteDescription(offer)
}
func (t *TransportManager) GetAnswer() (webrtc.SessionDescription, error) {
answer, err := t.publisher.GetAnswer()
if err == nil {
t.lastPublisherAnswer.Store(answer)
}
return answer, err
}
func (t *TransportManager) ProcessPendingPublisherOffer() {
@@ -536,7 +564,7 @@ func (t *TransportManager) getTransport(isPrimary bool) *PCTransport {
}
func (t *TransportManager) handleConnectionFailed(isShortLived bool) {
if !t.params.AllowTCPFallback {
if !t.params.AllowTCPFallback || t.params.UseOneShotSignallingMode {
return
}
+6 -4
View File
@@ -362,16 +362,17 @@ type LocalParticipant interface {
// PeerConnection
AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget)
HandleOffer(sdp webrtc.SessionDescription)
HandleOffer(sdp webrtc.SessionDescription) error
GetAnswer() (webrtc.SessionDescription, error)
AddTrack(req *livekit.AddTrackRequest)
SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) *livekit.TrackInfo
HandleAnswer(sdp webrtc.SessionDescription)
Negotiate(force bool)
ICERestart(iceConfig *livekit.ICEConfig)
AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error
AddTrackLocal(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
AddTransceiverFromTrackLocal(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
RemoveTrackLocal(sender *webrtc.RTPSender) error
WriteSubscriberRTCP(pkts []rtcp.Packet) error
@@ -380,6 +381,7 @@ type LocalParticipant interface {
UnsubscribeFromTrack(trackID livekit.TrackID)
UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings)
GetSubscribedTracks() []SubscribedTrack
Verify() bool
VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32)
// WaitUntilSubscribed waits until all subscriptions have been settled, or if the timeout
// has been reached. If the timeout expires, it will return an error.
+291 -121
View File
@@ -30,34 +30,34 @@ type FakeLocalParticipant struct {
addTrackArgsForCall []struct {
arg1 *livekit.AddTrackRequest
}
AddTrackToSubscriberStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
addTrackToSubscriberMutex sync.RWMutex
addTrackToSubscriberArgsForCall []struct {
AddTrackLocalStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
addTrackLocalMutex sync.RWMutex
addTrackLocalArgsForCall []struct {
arg1 webrtc.TrackLocal
arg2 types.AddTrackParams
}
addTrackToSubscriberReturns struct {
addTrackLocalReturns struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}
addTrackToSubscriberReturnsOnCall map[int]struct {
addTrackLocalReturnsOnCall map[int]struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}
AddTransceiverFromTrackToSubscriberStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
addTransceiverFromTrackToSubscriberMutex sync.RWMutex
addTransceiverFromTrackToSubscriberArgsForCall []struct {
AddTransceiverFromTrackLocalStub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
addTransceiverFromTrackLocalMutex sync.RWMutex
addTransceiverFromTrackLocalArgsForCall []struct {
arg1 webrtc.TrackLocal
arg2 types.AddTrackParams
}
addTransceiverFromTrackToSubscriberReturns struct {
addTransceiverFromTrackLocalReturns struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}
addTransceiverFromTrackToSubscriberReturnsOnCall map[int]struct {
addTransceiverFromTrackLocalReturnsOnCall map[int]struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
@@ -211,6 +211,18 @@ type FakeLocalParticipant struct {
getAdaptiveStreamReturnsOnCall map[int]struct {
result1 bool
}
GetAnswerStub func() (webrtc.SessionDescription, error)
getAnswerMutex sync.RWMutex
getAnswerArgsForCall []struct {
}
getAnswerReturns struct {
result1 webrtc.SessionDescription
result2 error
}
getAnswerReturnsOnCall map[int]struct {
result1 webrtc.SessionDescription
result2 error
}
GetAudioLevelStub func() (float64, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
@@ -425,11 +437,17 @@ type FakeLocalParticipant struct {
handleMetricsReturnsOnCall map[int]struct {
result1 error
}
HandleOfferStub func(webrtc.SessionDescription)
HandleOfferStub func(webrtc.SessionDescription) error
handleOfferMutex sync.RWMutex
handleOfferArgsForCall []struct {
arg1 webrtc.SessionDescription
}
handleOfferReturns struct {
result1 error
}
handleOfferReturnsOnCall map[int]struct {
result1 error
}
HandleReceiverReportStub func(*sfu.DownTrack, *rtcp.ReceiverReport)
handleReceiverReportMutex sync.RWMutex
handleReceiverReportArgsForCall []struct {
@@ -733,15 +751,15 @@ type FakeLocalParticipant struct {
arg2 bool
arg3 bool
}
RemoveTrackFromSubscriberStub func(*webrtc.RTPSender) error
removeTrackFromSubscriberMutex sync.RWMutex
removeTrackFromSubscriberArgsForCall []struct {
RemoveTrackLocalStub func(*webrtc.RTPSender) error
removeTrackLocalMutex sync.RWMutex
removeTrackLocalArgsForCall []struct {
arg1 *webrtc.RTPSender
}
removeTrackFromSubscriberReturns struct {
removeTrackLocalReturns struct {
result1 error
}
removeTrackFromSubscriberReturnsOnCall map[int]struct {
removeTrackLocalReturnsOnCall map[int]struct {
result1 error
}
SendConnectionQualityUpdateStub func(*livekit.ConnectionQualityUpdate) error
@@ -1098,6 +1116,16 @@ type FakeLocalParticipant struct {
updateVideoTrackReturnsOnCall map[int]struct {
result1 error
}
VerifyStub func() bool
verifyMutex sync.RWMutex
verifyArgsForCall []struct {
}
verifyReturns struct {
result1 bool
}
verifyReturnsOnCall map[int]struct {
result1 bool
}
VerifySubscribeParticipantInfoStub func(livekit.ParticipantID, uint32)
verifySubscribeParticipantInfoMutex sync.RWMutex
verifySubscribeParticipantInfoArgsForCall []struct {
@@ -1205,17 +1233,17 @@ func (fake *FakeLocalParticipant) AddTrackArgsForCall(i int) *livekit.AddTrackRe
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) AddTrackToSubscriber(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
fake.addTrackToSubscriberMutex.Lock()
ret, specificReturn := fake.addTrackToSubscriberReturnsOnCall[len(fake.addTrackToSubscriberArgsForCall)]
fake.addTrackToSubscriberArgsForCall = append(fake.addTrackToSubscriberArgsForCall, struct {
func (fake *FakeLocalParticipant) AddTrackLocal(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
fake.addTrackLocalMutex.Lock()
ret, specificReturn := fake.addTrackLocalReturnsOnCall[len(fake.addTrackLocalArgsForCall)]
fake.addTrackLocalArgsForCall = append(fake.addTrackLocalArgsForCall, struct {
arg1 webrtc.TrackLocal
arg2 types.AddTrackParams
}{arg1, arg2})
stub := fake.AddTrackToSubscriberStub
fakeReturns := fake.addTrackToSubscriberReturns
fake.recordInvocation("AddTrackToSubscriber", []interface{}{arg1, arg2})
fake.addTrackToSubscriberMutex.Unlock()
stub := fake.AddTrackLocalStub
fakeReturns := fake.addTrackLocalReturns
fake.recordInvocation("AddTrackLocal", []interface{}{arg1, arg2})
fake.addTrackLocalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
@@ -1225,65 +1253,65 @@ func (fake *FakeLocalParticipant) AddTrackToSubscriber(arg1 webrtc.TrackLocal, a
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeLocalParticipant) AddTrackToSubscriberCallCount() int {
fake.addTrackToSubscriberMutex.RLock()
defer fake.addTrackToSubscriberMutex.RUnlock()
return len(fake.addTrackToSubscriberArgsForCall)
func (fake *FakeLocalParticipant) AddTrackLocalCallCount() int {
fake.addTrackLocalMutex.RLock()
defer fake.addTrackLocalMutex.RUnlock()
return len(fake.addTrackLocalArgsForCall)
}
func (fake *FakeLocalParticipant) AddTrackToSubscriberCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) {
fake.addTrackToSubscriberMutex.Lock()
defer fake.addTrackToSubscriberMutex.Unlock()
fake.AddTrackToSubscriberStub = stub
func (fake *FakeLocalParticipant) AddTrackLocalCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) {
fake.addTrackLocalMutex.Lock()
defer fake.addTrackLocalMutex.Unlock()
fake.AddTrackLocalStub = stub
}
func (fake *FakeLocalParticipant) AddTrackToSubscriberArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) {
fake.addTrackToSubscriberMutex.RLock()
defer fake.addTrackToSubscriberMutex.RUnlock()
argsForCall := fake.addTrackToSubscriberArgsForCall[i]
func (fake *FakeLocalParticipant) AddTrackLocalArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) {
fake.addTrackLocalMutex.RLock()
defer fake.addTrackLocalMutex.RUnlock()
argsForCall := fake.addTrackLocalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) AddTrackToSubscriberReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTrackToSubscriberMutex.Lock()
defer fake.addTrackToSubscriberMutex.Unlock()
fake.AddTrackToSubscriberStub = nil
fake.addTrackToSubscriberReturns = struct {
func (fake *FakeLocalParticipant) AddTrackLocalReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTrackLocalMutex.Lock()
defer fake.addTrackLocalMutex.Unlock()
fake.AddTrackLocalStub = nil
fake.addTrackLocalReturns = struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}{result1, result2, result3}
}
func (fake *FakeLocalParticipant) AddTrackToSubscriberReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTrackToSubscriberMutex.Lock()
defer fake.addTrackToSubscriberMutex.Unlock()
fake.AddTrackToSubscriberStub = nil
if fake.addTrackToSubscriberReturnsOnCall == nil {
fake.addTrackToSubscriberReturnsOnCall = make(map[int]struct {
func (fake *FakeLocalParticipant) AddTrackLocalReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTrackLocalMutex.Lock()
defer fake.addTrackLocalMutex.Unlock()
fake.AddTrackLocalStub = nil
if fake.addTrackLocalReturnsOnCall == nil {
fake.addTrackLocalReturnsOnCall = make(map[int]struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
})
}
fake.addTrackToSubscriberReturnsOnCall[i] = struct {
fake.addTrackLocalReturnsOnCall[i] = struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}{result1, result2, result3}
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriber(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
fake.addTransceiverFromTrackToSubscriberMutex.Lock()
ret, specificReturn := fake.addTransceiverFromTrackToSubscriberReturnsOnCall[len(fake.addTransceiverFromTrackToSubscriberArgsForCall)]
fake.addTransceiverFromTrackToSubscriberArgsForCall = append(fake.addTransceiverFromTrackToSubscriberArgsForCall, struct {
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocal(arg1 webrtc.TrackLocal, arg2 types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
fake.addTransceiverFromTrackLocalMutex.Lock()
ret, specificReturn := fake.addTransceiverFromTrackLocalReturnsOnCall[len(fake.addTransceiverFromTrackLocalArgsForCall)]
fake.addTransceiverFromTrackLocalArgsForCall = append(fake.addTransceiverFromTrackLocalArgsForCall, struct {
arg1 webrtc.TrackLocal
arg2 types.AddTrackParams
}{arg1, arg2})
stub := fake.AddTransceiverFromTrackToSubscriberStub
fakeReturns := fake.addTransceiverFromTrackToSubscriberReturns
fake.recordInvocation("AddTransceiverFromTrackToSubscriber", []interface{}{arg1, arg2})
fake.addTransceiverFromTrackToSubscriberMutex.Unlock()
stub := fake.AddTransceiverFromTrackLocalStub
fakeReturns := fake.addTransceiverFromTrackLocalReturns
fake.recordInvocation("AddTransceiverFromTrackLocal", []interface{}{arg1, arg2})
fake.addTransceiverFromTrackLocalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
@@ -1293,48 +1321,48 @@ func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriber(arg1 webrt
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberCallCount() int {
fake.addTransceiverFromTrackToSubscriberMutex.RLock()
defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock()
return len(fake.addTransceiverFromTrackToSubscriberArgsForCall)
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalCallCount() int {
fake.addTransceiverFromTrackLocalMutex.RLock()
defer fake.addTransceiverFromTrackLocalMutex.RUnlock()
return len(fake.addTransceiverFromTrackLocalArgsForCall)
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) {
fake.addTransceiverFromTrackToSubscriberMutex.Lock()
defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock()
fake.AddTransceiverFromTrackToSubscriberStub = stub
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalCalls(stub func(webrtc.TrackLocal, types.AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)) {
fake.addTransceiverFromTrackLocalMutex.Lock()
defer fake.addTransceiverFromTrackLocalMutex.Unlock()
fake.AddTransceiverFromTrackLocalStub = stub
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) {
fake.addTransceiverFromTrackToSubscriberMutex.RLock()
defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock()
argsForCall := fake.addTransceiverFromTrackToSubscriberArgsForCall[i]
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalArgsForCall(i int) (webrtc.TrackLocal, types.AddTrackParams) {
fake.addTransceiverFromTrackLocalMutex.RLock()
defer fake.addTransceiverFromTrackLocalMutex.RUnlock()
argsForCall := fake.addTransceiverFromTrackLocalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTransceiverFromTrackToSubscriberMutex.Lock()
defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock()
fake.AddTransceiverFromTrackToSubscriberStub = nil
fake.addTransceiverFromTrackToSubscriberReturns = struct {
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalReturns(result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTransceiverFromTrackLocalMutex.Lock()
defer fake.addTransceiverFromTrackLocalMutex.Unlock()
fake.AddTransceiverFromTrackLocalStub = nil
fake.addTransceiverFromTrackLocalReturns = struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
}{result1, result2, result3}
}
func (fake *FakeLocalParticipant) AddTransceiverFromTrackToSubscriberReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTransceiverFromTrackToSubscriberMutex.Lock()
defer fake.addTransceiverFromTrackToSubscriberMutex.Unlock()
fake.AddTransceiverFromTrackToSubscriberStub = nil
if fake.addTransceiverFromTrackToSubscriberReturnsOnCall == nil {
fake.addTransceiverFromTrackToSubscriberReturnsOnCall = make(map[int]struct {
func (fake *FakeLocalParticipant) AddTransceiverFromTrackLocalReturnsOnCall(i int, result1 *webrtc.RTPSender, result2 *webrtc.RTPTransceiver, result3 error) {
fake.addTransceiverFromTrackLocalMutex.Lock()
defer fake.addTransceiverFromTrackLocalMutex.Unlock()
fake.AddTransceiverFromTrackLocalStub = nil
if fake.addTransceiverFromTrackLocalReturnsOnCall == nil {
fake.addTransceiverFromTrackLocalReturnsOnCall = make(map[int]struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
})
}
fake.addTransceiverFromTrackToSubscriberReturnsOnCall[i] = struct {
fake.addTransceiverFromTrackLocalReturnsOnCall[i] = struct {
result1 *webrtc.RTPSender
result2 *webrtc.RTPTransceiver
result3 error
@@ -2124,6 +2152,62 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeLocalParticipant) GetAnswer() (webrtc.SessionDescription, error) {
fake.getAnswerMutex.Lock()
ret, specificReturn := fake.getAnswerReturnsOnCall[len(fake.getAnswerArgsForCall)]
fake.getAnswerArgsForCall = append(fake.getAnswerArgsForCall, struct {
}{})
stub := fake.GetAnswerStub
fakeReturns := fake.getAnswerReturns
fake.recordInvocation("GetAnswer", []interface{}{})
fake.getAnswerMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeLocalParticipant) GetAnswerCallCount() int {
fake.getAnswerMutex.RLock()
defer fake.getAnswerMutex.RUnlock()
return len(fake.getAnswerArgsForCall)
}
func (fake *FakeLocalParticipant) GetAnswerCalls(stub func() (webrtc.SessionDescription, error)) {
fake.getAnswerMutex.Lock()
defer fake.getAnswerMutex.Unlock()
fake.GetAnswerStub = stub
}
func (fake *FakeLocalParticipant) GetAnswerReturns(result1 webrtc.SessionDescription, result2 error) {
fake.getAnswerMutex.Lock()
defer fake.getAnswerMutex.Unlock()
fake.GetAnswerStub = nil
fake.getAnswerReturns = struct {
result1 webrtc.SessionDescription
result2 error
}{result1, result2}
}
func (fake *FakeLocalParticipant) GetAnswerReturnsOnCall(i int, result1 webrtc.SessionDescription, result2 error) {
fake.getAnswerMutex.Lock()
defer fake.getAnswerMutex.Unlock()
fake.GetAnswerStub = nil
if fake.getAnswerReturnsOnCall == nil {
fake.getAnswerReturnsOnCall = make(map[int]struct {
result1 webrtc.SessionDescription
result2 error
})
}
fake.getAnswerReturnsOnCall[i] = struct {
result1 webrtc.SessionDescription
result2 error
}{result1, result2}
}
func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
@@ -3255,17 +3339,23 @@ func (fake *FakeLocalParticipant) HandleMetricsReturnsOnCall(i int, result1 erro
}{result1}
}
func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) {
func (fake *FakeLocalParticipant) HandleOffer(arg1 webrtc.SessionDescription) error {
fake.handleOfferMutex.Lock()
ret, specificReturn := fake.handleOfferReturnsOnCall[len(fake.handleOfferArgsForCall)]
fake.handleOfferArgsForCall = append(fake.handleOfferArgsForCall, struct {
arg1 webrtc.SessionDescription
}{arg1})
stub := fake.HandleOfferStub
fakeReturns := fake.handleOfferReturns
fake.recordInvocation("HandleOffer", []interface{}{arg1})
fake.handleOfferMutex.Unlock()
if stub != nil {
fake.HandleOfferStub(arg1)
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) HandleOfferCallCount() int {
@@ -3274,7 +3364,7 @@ func (fake *FakeLocalParticipant) HandleOfferCallCount() int {
return len(fake.handleOfferArgsForCall)
}
func (fake *FakeLocalParticipant) HandleOfferCalls(stub func(webrtc.SessionDescription)) {
func (fake *FakeLocalParticipant) HandleOfferCalls(stub func(webrtc.SessionDescription) error) {
fake.handleOfferMutex.Lock()
defer fake.handleOfferMutex.Unlock()
fake.HandleOfferStub = stub
@@ -3287,6 +3377,29 @@ func (fake *FakeLocalParticipant) HandleOfferArgsForCall(i int) webrtc.SessionDe
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleOfferReturns(result1 error) {
fake.handleOfferMutex.Lock()
defer fake.handleOfferMutex.Unlock()
fake.HandleOfferStub = nil
fake.handleOfferReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleOfferReturnsOnCall(i int, result1 error) {
fake.handleOfferMutex.Lock()
defer fake.handleOfferMutex.Unlock()
fake.HandleOfferStub = nil
if fake.handleOfferReturnsOnCall == nil {
fake.handleOfferReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.handleOfferReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleReceiverReport(arg1 *sfu.DownTrack, arg2 *rtcp.ReceiverReport) {
fake.handleReceiverReportMutex.Lock()
fake.handleReceiverReportArgsForCall = append(fake.handleReceiverReportArgsForCall, struct {
@@ -4977,16 +5090,16 @@ func (fake *FakeLocalParticipant) RemovePublishedTrackArgsForCall(i int) (types.
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriber(arg1 *webrtc.RTPSender) error {
fake.removeTrackFromSubscriberMutex.Lock()
ret, specificReturn := fake.removeTrackFromSubscriberReturnsOnCall[len(fake.removeTrackFromSubscriberArgsForCall)]
fake.removeTrackFromSubscriberArgsForCall = append(fake.removeTrackFromSubscriberArgsForCall, struct {
func (fake *FakeLocalParticipant) RemoveTrackLocal(arg1 *webrtc.RTPSender) error {
fake.removeTrackLocalMutex.Lock()
ret, specificReturn := fake.removeTrackLocalReturnsOnCall[len(fake.removeTrackLocalArgsForCall)]
fake.removeTrackLocalArgsForCall = append(fake.removeTrackLocalArgsForCall, struct {
arg1 *webrtc.RTPSender
}{arg1})
stub := fake.RemoveTrackFromSubscriberStub
fakeReturns := fake.removeTrackFromSubscriberReturns
fake.recordInvocation("RemoveTrackFromSubscriber", []interface{}{arg1})
fake.removeTrackFromSubscriberMutex.Unlock()
stub := fake.RemoveTrackLocalStub
fakeReturns := fake.removeTrackLocalReturns
fake.recordInvocation("RemoveTrackLocal", []interface{}{arg1})
fake.removeTrackLocalMutex.Unlock()
if stub != nil {
return stub(arg1)
}
@@ -4996,44 +5109,44 @@ func (fake *FakeLocalParticipant) RemoveTrackFromSubscriber(arg1 *webrtc.RTPSend
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberCallCount() int {
fake.removeTrackFromSubscriberMutex.RLock()
defer fake.removeTrackFromSubscriberMutex.RUnlock()
return len(fake.removeTrackFromSubscriberArgsForCall)
func (fake *FakeLocalParticipant) RemoveTrackLocalCallCount() int {
fake.removeTrackLocalMutex.RLock()
defer fake.removeTrackLocalMutex.RUnlock()
return len(fake.removeTrackLocalArgsForCall)
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberCalls(stub func(*webrtc.RTPSender) error) {
fake.removeTrackFromSubscriberMutex.Lock()
defer fake.removeTrackFromSubscriberMutex.Unlock()
fake.RemoveTrackFromSubscriberStub = stub
func (fake *FakeLocalParticipant) RemoveTrackLocalCalls(stub func(*webrtc.RTPSender) error) {
fake.removeTrackLocalMutex.Lock()
defer fake.removeTrackLocalMutex.Unlock()
fake.RemoveTrackLocalStub = stub
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberArgsForCall(i int) *webrtc.RTPSender {
fake.removeTrackFromSubscriberMutex.RLock()
defer fake.removeTrackFromSubscriberMutex.RUnlock()
argsForCall := fake.removeTrackFromSubscriberArgsForCall[i]
func (fake *FakeLocalParticipant) RemoveTrackLocalArgsForCall(i int) *webrtc.RTPSender {
fake.removeTrackLocalMutex.RLock()
defer fake.removeTrackLocalMutex.RUnlock()
argsForCall := fake.removeTrackLocalArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberReturns(result1 error) {
fake.removeTrackFromSubscriberMutex.Lock()
defer fake.removeTrackFromSubscriberMutex.Unlock()
fake.RemoveTrackFromSubscriberStub = nil
fake.removeTrackFromSubscriberReturns = struct {
func (fake *FakeLocalParticipant) RemoveTrackLocalReturns(result1 error) {
fake.removeTrackLocalMutex.Lock()
defer fake.removeTrackLocalMutex.Unlock()
fake.RemoveTrackLocalStub = nil
fake.removeTrackLocalReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) RemoveTrackFromSubscriberReturnsOnCall(i int, result1 error) {
fake.removeTrackFromSubscriberMutex.Lock()
defer fake.removeTrackFromSubscriberMutex.Unlock()
fake.RemoveTrackFromSubscriberStub = nil
if fake.removeTrackFromSubscriberReturnsOnCall == nil {
fake.removeTrackFromSubscriberReturnsOnCall = make(map[int]struct {
func (fake *FakeLocalParticipant) RemoveTrackLocalReturnsOnCall(i int, result1 error) {
fake.removeTrackLocalMutex.Lock()
defer fake.removeTrackLocalMutex.Unlock()
fake.RemoveTrackLocalStub = nil
if fake.removeTrackLocalReturnsOnCall == nil {
fake.removeTrackLocalReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.removeTrackFromSubscriberReturnsOnCall[i] = struct {
fake.removeTrackLocalReturnsOnCall[i] = struct {
result1 error
}{result1}
}
@@ -6997,6 +7110,59 @@ func (fake *FakeLocalParticipant) UpdateVideoTrackReturnsOnCall(i int, result1 e
}{result1}
}
func (fake *FakeLocalParticipant) Verify() bool {
fake.verifyMutex.Lock()
ret, specificReturn := fake.verifyReturnsOnCall[len(fake.verifyArgsForCall)]
fake.verifyArgsForCall = append(fake.verifyArgsForCall, struct {
}{})
stub := fake.VerifyStub
fakeReturns := fake.verifyReturns
fake.recordInvocation("Verify", []interface{}{})
fake.verifyMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) VerifyCallCount() int {
fake.verifyMutex.RLock()
defer fake.verifyMutex.RUnlock()
return len(fake.verifyArgsForCall)
}
func (fake *FakeLocalParticipant) VerifyCalls(stub func() bool) {
fake.verifyMutex.Lock()
defer fake.verifyMutex.Unlock()
fake.VerifyStub = stub
}
func (fake *FakeLocalParticipant) VerifyReturns(result1 bool) {
fake.verifyMutex.Lock()
defer fake.verifyMutex.Unlock()
fake.VerifyStub = nil
fake.verifyReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) VerifyReturnsOnCall(i int, result1 bool) {
fake.verifyMutex.Lock()
defer fake.verifyMutex.Unlock()
fake.VerifyStub = nil
if fake.verifyReturnsOnCall == nil {
fake.verifyReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.verifyReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) VerifySubscribeParticipantInfo(arg1 livekit.ParticipantID, arg2 uint32) {
fake.verifySubscribeParticipantInfoMutex.Lock()
fake.verifySubscribeParticipantInfoArgsForCall = append(fake.verifySubscribeParticipantInfoArgsForCall, struct {
@@ -7217,10 +7383,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.addICECandidateMutex.RUnlock()
fake.addTrackMutex.RLock()
defer fake.addTrackMutex.RUnlock()
fake.addTrackToSubscriberMutex.RLock()
defer fake.addTrackToSubscriberMutex.RUnlock()
fake.addTransceiverFromTrackToSubscriberMutex.RLock()
defer fake.addTransceiverFromTrackToSubscriberMutex.RUnlock()
fake.addTrackLocalMutex.RLock()
defer fake.addTrackLocalMutex.RUnlock()
fake.addTransceiverFromTrackLocalMutex.RLock()
defer fake.addTransceiverFromTrackLocalMutex.RUnlock()
fake.cacheDownTrackMutex.RLock()
defer fake.cacheDownTrackMutex.RUnlock()
fake.canPublishMutex.RLock()
@@ -7251,6 +7417,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.disconnectedMutex.RUnlock()
fake.getAdaptiveStreamMutex.RLock()
defer fake.getAdaptiveStreamMutex.RUnlock()
fake.getAnswerMutex.RLock()
defer fake.getAnswerMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getBufferFactoryMutex.RLock()
@@ -7373,8 +7541,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.protocolVersionMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()
defer fake.removePublishedTrackMutex.RUnlock()
fake.removeTrackFromSubscriberMutex.RLock()
defer fake.removeTrackFromSubscriberMutex.RUnlock()
fake.removeTrackLocalMutex.RLock()
defer fake.removeTrackLocalMutex.RUnlock()
fake.sendConnectionQualityUpdateMutex.RLock()
defer fake.sendConnectionQualityUpdateMutex.RUnlock()
fake.sendDataPacketMutex.RLock()
@@ -7457,6 +7625,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.updateSubscriptionPermissionMutex.RUnlock()
fake.updateVideoTrackMutex.RLock()
defer fake.updateVideoTrackMutex.RUnlock()
fake.verifyMutex.RLock()
defer fake.verifyMutex.RUnlock()
fake.verifySubscribeParticipantInfoMutex.RLock()
defer fake.verifySubscribeParticipantInfoMutex.RUnlock()
fake.versionMutex.RLock()
+2
View File
@@ -281,6 +281,7 @@ func (r *RoomManager) StartSession(
pi routing.ParticipantInit,
requestSource routing.MessageSource,
responseSink routing.MessageSink,
useOneShotSignallingMode bool,
) error {
sessionStartTime := time.Now()
@@ -489,6 +490,7 @@ func (r *RoomManager) StartSession(
SyncStreams: roomInternal.GetSyncStreams(),
ForwardStats: r.forwardStats,
MetricConfig: r.config.Metric,
UseOneShotSignallingMode: useOneShotSignallingMode,
})
if err != nil {
return err
+74 -74
View File
@@ -10,6 +10,17 @@ import (
)
type FakeRoomAllocator struct {
AutoCreateEnabledStub func(context.Context) bool
autoCreateEnabledMutex sync.RWMutex
autoCreateEnabledArgsForCall []struct {
arg1 context.Context
}
autoCreateEnabledReturns struct {
result1 bool
}
autoCreateEnabledReturnsOnCall map[int]struct {
result1 bool
}
CreateRoomStub func(context.Context, *livekit.CreateRoomRequest, bool) (*livekit.Room, *livekit.RoomInternal, bool, error)
createRoomMutex sync.RWMutex
createRoomArgsForCall []struct {
@@ -29,17 +40,6 @@ type FakeRoomAllocator struct {
result3 bool
result4 error
}
AutoCreateEnabledStub func(context.Context) bool
roomAutoCreateEnabledMutex sync.RWMutex
roomAutoCreateEnabledArgsForCall []struct {
arg1 context.Context
}
roomAutoCreateEnabledReturns struct {
result1 bool
}
roomAutoCreateEnabledReturnsOnCall map[int]struct {
result1 bool
}
SelectRoomNodeStub func(context.Context, livekit.RoomName, livekit.NodeID) error
selectRoomNodeMutex sync.RWMutex
selectRoomNodeArgsForCall []struct {
@@ -69,6 +69,67 @@ type FakeRoomAllocator struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool {
fake.autoCreateEnabledMutex.Lock()
ret, specificReturn := fake.autoCreateEnabledReturnsOnCall[len(fake.autoCreateEnabledArgsForCall)]
fake.autoCreateEnabledArgsForCall = append(fake.autoCreateEnabledArgsForCall, struct {
arg1 context.Context
}{arg1})
stub := fake.AutoCreateEnabledStub
fakeReturns := fake.autoCreateEnabledReturns
fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1})
fake.autoCreateEnabledMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int {
fake.autoCreateEnabledMutex.RLock()
defer fake.autoCreateEnabledMutex.RUnlock()
return len(fake.autoCreateEnabledArgsForCall)
}
func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) {
fake.autoCreateEnabledMutex.Lock()
defer fake.autoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = stub
}
func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context {
fake.autoCreateEnabledMutex.RLock()
defer fake.autoCreateEnabledMutex.RUnlock()
argsForCall := fake.autoCreateEnabledArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) {
fake.autoCreateEnabledMutex.Lock()
defer fake.autoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
fake.autoCreateEnabledReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) {
fake.autoCreateEnabledMutex.Lock()
defer fake.autoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
if fake.autoCreateEnabledReturnsOnCall == nil {
fake.autoCreateEnabledReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.autoCreateEnabledReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest, arg3 bool) (*livekit.Room, *livekit.RoomInternal, bool, error) {
fake.createRoomMutex.Lock()
ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)]
@@ -141,67 +202,6 @@ func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.R
}{result1, result2, result3, result4}
}
func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool {
fake.roomAutoCreateEnabledMutex.Lock()
ret, specificReturn := fake.roomAutoCreateEnabledReturnsOnCall[len(fake.roomAutoCreateEnabledArgsForCall)]
fake.roomAutoCreateEnabledArgsForCall = append(fake.roomAutoCreateEnabledArgsForCall, struct {
arg1 context.Context
}{arg1})
stub := fake.AutoCreateEnabledStub
fakeReturns := fake.roomAutoCreateEnabledReturns
fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1})
fake.roomAutoCreateEnabledMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int {
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
return len(fake.roomAutoCreateEnabledArgsForCall)
}
func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = stub
}
func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context {
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
argsForCall := fake.roomAutoCreateEnabledArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
fake.roomAutoCreateEnabledReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
if fake.roomAutoCreateEnabledReturnsOnCall == nil {
fake.roomAutoCreateEnabledReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.roomAutoCreateEnabledReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeRoomAllocator) SelectRoomNode(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.NodeID) error {
fake.selectRoomNodeMutex.Lock()
ret, specificReturn := fake.selectRoomNodeReturnsOnCall[len(fake.selectRoomNodeArgsForCall)]
@@ -330,10 +330,10 @@ func (fake *FakeRoomAllocator) ValidateCreateRoomReturnsOnCall(i int, result1 er
func (fake *FakeRoomAllocator) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.autoCreateEnabledMutex.RLock()
defer fake.autoCreateEnabledMutex.RUnlock()
fake.createRoomMutex.RLock()
defer fake.createRoomMutex.RUnlock()
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
fake.selectRoomNodeMutex.RLock()
defer fake.selectRoomNodeMutex.RUnlock()
fake.validateCreateRoomMutex.RLock()
+1 -1
View File
@@ -111,7 +111,7 @@ func (s *defaultSessionHandler) HandleSession(
return err
}
return s.roomManager.StartSession(ctx, pi, requestSource, responseSink)
return s.roomManager.StartSession(ctx, pi, requestSource, responseSink, false)
}
func (s *SignalServer) Start() error {