Replace Target with params to indicate direction (#955)

* Replace Target with params to indicate direction

* Add missed send answer call
This commit is contained in:
Raja Subramanian
2022-08-25 08:33:06 +05:30
committed by GitHub
parent 5223c8292e
commit 06a46d5de0
4 changed files with 36 additions and 32 deletions

View File

@@ -199,24 +199,20 @@ type TransportParams struct {
ParticipantID livekit.ParticipantID
ParticipantIdentity livekit.ParticipantIdentity
ProtocolVersion types.ProtocolVersion
Target livekit.SignalTarget
Config *WebRTCConfig
DirectionConfig DirectionConfig
CongestionControlConfig config.CongestionControlConfig
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
ClientInfo ClientInfo
IsOfferer bool
IsSendSide bool
}
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
var directionConfig DirectionConfig
if params.Target == livekit.SignalTarget_PUBLISHER {
directionConfig = params.Config.Publisher
} else {
directionConfig = params.Config.Subscriber
}
directionConfig := params.DirectionConfig
// enable nack if audio red is not support
if !isCodecEnabled(params.EnabledCodecs, webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed}) || !params.ClientInfo.SupportsAudioRED() {
directionConfig.RTCPFeedback.Audio = append(directionConfig.RTCPFeedback.Audio, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBNACK})
@@ -260,7 +256,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
}
ir := &interceptor.Registry{}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
if params.IsSendSide {
isSendSideBWE := false
for _, ext := range directionConfig.RTPHeaderExtension.Video {
if ext == sdp.TransportCCURI {
@@ -322,7 +318,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
negotiationPending: make(map[livekit.ParticipantID]bool),
eventCh: make(chan event, 50),
}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
if params.IsSendSide {
t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{
Config: params.CongestionControlConfig,
Logger: params.Logger,
@@ -1219,7 +1215,7 @@ func (t *PCTransport) handleEvent(e *event) error {
}
func (t *PCTransport) handleICEGatheringComplete(e *event) error {
if t.params.Target == livekit.SignalTarget_SUBSCRIBER {
if t.params.IsOfferer {
return t.handleICEGatheringCompleteOfferer()
} else {
return t.handleICEGatheringCompleteAnswerer()
@@ -1241,10 +1237,15 @@ func (t *PCTransport) handleICEGatheringCompleteAnswerer() error {
return nil
}
t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering")
err := t.setRemoteDescription(*t.pendingRestartIceOffer)
offer := *t.pendingRestartIceOffer
t.pendingRestartIceOffer = nil
return err
t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering")
if err := t.setRemoteDescription(offer); err != nil {
return err
}
return t.createAndSendAnswer()
}
func (t *PCTransport) localDescriptionSent() error {

View File

@@ -19,8 +19,8 @@ func TestMissingAnswerDuringICERestart(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
IsOfferer: true,
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -28,7 +28,7 @@ func TestMissingAnswerDuringICERestart(t *testing.T) {
require.NoError(t, err)
paramsB := params
paramsB.Target = livekit.SignalTarget_PUBLISHER
paramsB.IsOfferer = false
transportB, err := NewPCTransport(paramsB)
require.NoError(t, err)
@@ -69,8 +69,8 @@ func TestNegotiationTiming(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
IsOfferer: true,
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -78,7 +78,7 @@ func TestNegotiationTiming(t *testing.T) {
require.NoError(t, err)
paramsB := params
paramsB.Target = livekit.SignalTarget_PUBLISHER
paramsB.IsOfferer = false
transportB, err := NewPCTransport(params)
require.NoError(t, err)
@@ -147,8 +147,8 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
IsOfferer: true,
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -156,7 +156,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) {
require.NoError(t, err)
paramsB := params
paramsB.Target = livekit.SignalTarget_PUBLISHER
paramsB.IsOfferer = false
transportB, err := NewPCTransport(paramsB)
require.NoError(t, err)
@@ -215,8 +215,8 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
IsOfferer: true,
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -224,7 +224,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) {
require.NoError(t, err)
paramsB := params
paramsB.Target = livekit.SignalTarget_PUBLISHER
paramsB.IsOfferer = false
transportB, err := NewPCTransport(paramsB)
require.NoError(t, err)
@@ -288,8 +288,8 @@ func TestNegotiationFailed(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &WebRTCConfig{},
IsOfferer: true,
}
transportA, err := NewPCTransport(params)
require.NoError(t, err)
@@ -319,7 +319,6 @@ func TestFilteringCandidates(t *testing.T) {
params := TransportParams{
ParticipantID: "id",
ParticipantIdentity: "identity",
Target: livekit.SignalTarget_PUBLISHER,
Config: &WebRTCConfig{},
EnabledCodecs: []*livekit.Codec{
{Mime: webrtc.MimeTypeOpus},

View File

@@ -80,8 +80,8 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: enabledCodecs,
@@ -111,13 +111,15 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
ParticipantID: params.SID,
ParticipantIdentity: params.Identity,
ProtocolVersion: params.ProtocolVersion,
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
DirectionConfig: params.Config.Subscriber,
CongestionControlConfig: params.CongestionControlConfig,
Telemetry: params.Telemetry,
EnabledCodecs: enabledCodecs,
Logger: LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER),
ClientInfo: params.ClientInfo,
IsOfferer: true,
IsSendSide: true,
})
if err != nil {
return nil, err

View File

@@ -141,17 +141,19 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
// Same applies for subscriber transport also
//
c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{
Target: livekit.SignalTarget_SUBSCRIBER,
Config: &conf,
EnabledCodecs: codecs,
Config: &conf,
DirectionConfig: conf.Subscriber,
EnabledCodecs: codecs,
IsOfferer: true,
IsSendSide: true,
})
if err != nil {
return nil, err
}
c.subscriber, err = rtc.NewPCTransport(rtc.TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: &conf,
EnabledCodecs: codecs,
Config: &conf,
DirectionConfig: conf.Publisher,
EnabledCodecs: codecs,
})
if err != nil {
return nil, err