diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 817438a74..368de678a 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -199,24 +199,20 @@ type TransportParams struct { ParticipantID livekit.ParticipantID ParticipantIdentity livekit.ParticipantIdentity ProtocolVersion types.ProtocolVersion - Target livekit.SignalTarget Config *WebRTCConfig + DirectionConfig DirectionConfig CongestionControlConfig config.CongestionControlConfig Telemetry telemetry.TelemetryService EnabledCodecs []*livekit.Codec Logger logger.Logger SimTracks map[uint32]SimulcastTrackInfo ClientInfo ClientInfo + IsOfferer bool + IsSendSide bool } func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { - var directionConfig DirectionConfig - if params.Target == livekit.SignalTarget_PUBLISHER { - directionConfig = params.Config.Publisher - } else { - directionConfig = params.Config.Subscriber - } - + directionConfig := params.DirectionConfig // enable nack if audio red is not support if !isCodecEnabled(params.EnabledCodecs, webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed}) || !params.ClientInfo.SupportsAudioRED() { directionConfig.RTCPFeedback.Audio = append(directionConfig.RTCPFeedback.Audio, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBNACK}) @@ -260,7 +256,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat } ir := &interceptor.Registry{} - if params.Target == livekit.SignalTarget_SUBSCRIBER { + if params.IsSendSide { isSendSideBWE := false for _, ext := range directionConfig.RTPHeaderExtension.Video { if ext == sdp.TransportCCURI { @@ -322,7 +318,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { negotiationPending: make(map[livekit.ParticipantID]bool), eventCh: make(chan event, 50), } - if params.Target == livekit.SignalTarget_SUBSCRIBER { + if params.IsSendSide { t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{ Config: params.CongestionControlConfig, Logger: params.Logger, @@ -1219,7 +1215,7 @@ func (t *PCTransport) handleEvent(e *event) error { } func (t *PCTransport) handleICEGatheringComplete(e *event) error { - if t.params.Target == livekit.SignalTarget_SUBSCRIBER { + if t.params.IsOfferer { return t.handleICEGatheringCompleteOfferer() } else { return t.handleICEGatheringCompleteAnswerer() @@ -1241,10 +1237,15 @@ func (t *PCTransport) handleICEGatheringCompleteAnswerer() error { return nil } - t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering") - err := t.setRemoteDescription(*t.pendingRestartIceOffer) + offer := *t.pendingRestartIceOffer t.pendingRestartIceOffer = nil - return err + + t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering") + if err := t.setRemoteDescription(offer); err != nil { + return err + } + + return t.createAndSendAnswer() } func (t *PCTransport) localDescriptionSent() error { diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index 1bfcfc1af..3de12a353 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -19,8 +19,8 @@ func TestMissingAnswerDuringICERestart(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_SUBSCRIBER, Config: &WebRTCConfig{}, + IsOfferer: true, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -28,7 +28,7 @@ func TestMissingAnswerDuringICERestart(t *testing.T) { require.NoError(t, err) paramsB := params - paramsB.Target = livekit.SignalTarget_PUBLISHER + paramsB.IsOfferer = false transportB, err := NewPCTransport(paramsB) require.NoError(t, err) @@ -69,8 +69,8 @@ func TestNegotiationTiming(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_SUBSCRIBER, Config: &WebRTCConfig{}, + IsOfferer: true, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -78,7 +78,7 @@ func TestNegotiationTiming(t *testing.T) { require.NoError(t, err) paramsB := params - paramsB.Target = livekit.SignalTarget_PUBLISHER + paramsB.IsOfferer = false transportB, err := NewPCTransport(params) require.NoError(t, err) @@ -147,8 +147,8 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_SUBSCRIBER, Config: &WebRTCConfig{}, + IsOfferer: true, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -156,7 +156,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) { require.NoError(t, err) paramsB := params - paramsB.Target = livekit.SignalTarget_PUBLISHER + paramsB.IsOfferer = false transportB, err := NewPCTransport(paramsB) require.NoError(t, err) @@ -215,8 +215,8 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_SUBSCRIBER, Config: &WebRTCConfig{}, + IsOfferer: true, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -224,7 +224,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) { require.NoError(t, err) paramsB := params - paramsB.Target = livekit.SignalTarget_PUBLISHER + paramsB.IsOfferer = false transportB, err := NewPCTransport(paramsB) require.NoError(t, err) @@ -288,8 +288,8 @@ func TestNegotiationFailed(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_SUBSCRIBER, Config: &WebRTCConfig{}, + IsOfferer: true, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -319,7 +319,6 @@ func TestFilteringCandidates(t *testing.T) { params := TransportParams{ ParticipantID: "id", ParticipantIdentity: "identity", - Target: livekit.SignalTarget_PUBLISHER, Config: &WebRTCConfig{}, EnabledCodecs: []*livekit.Codec{ {Mime: webrtc.MimeTypeOpus}, diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 47bc07149..b1fa37dc4 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -80,8 +80,8 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro ParticipantID: params.SID, ParticipantIdentity: params.Identity, ProtocolVersion: params.ProtocolVersion, - Target: livekit.SignalTarget_PUBLISHER, Config: params.Config, + DirectionConfig: params.Config.Publisher, CongestionControlConfig: params.CongestionControlConfig, Telemetry: params.Telemetry, EnabledCodecs: enabledCodecs, @@ -111,13 +111,15 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro ParticipantID: params.SID, ParticipantIdentity: params.Identity, ProtocolVersion: params.ProtocolVersion, - Target: livekit.SignalTarget_SUBSCRIBER, Config: params.Config, + DirectionConfig: params.Config.Subscriber, CongestionControlConfig: params.CongestionControlConfig, Telemetry: params.Telemetry, EnabledCodecs: enabledCodecs, Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER), ClientInfo: params.ClientInfo, + IsOfferer: true, + IsSendSide: true, }) if err != nil { return nil, err diff --git a/test/client/client.go b/test/client/client.go index f3bc175e9..e000f7710 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -141,17 +141,19 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { // Same applies for subscriber transport also // c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{ - Target: livekit.SignalTarget_SUBSCRIBER, - Config: &conf, - EnabledCodecs: codecs, + Config: &conf, + DirectionConfig: conf.Subscriber, + EnabledCodecs: codecs, + IsOfferer: true, + IsSendSide: true, }) if err != nil { return nil, err } c.subscriber, err = rtc.NewPCTransport(rtc.TransportParams{ - Target: livekit.SignalTarget_PUBLISHER, - Config: &conf, - EnabledCodecs: codecs, + Config: &conf, + DirectionConfig: conf.Publisher, + EnabledCodecs: codecs, }) if err != nil { return nil, err