mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 11:19:52 +00:00
3233 lines
93 KiB
Go
3233 lines
93 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rtc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"math/rand"
|
|
"net"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/dtls/v3/pkg/crypto/elliptic"
|
|
"github.com/pion/ice/v4"
|
|
"github.com/pion/interceptor"
|
|
"github.com/pion/interceptor/pkg/cc"
|
|
"github.com/pion/interceptor/pkg/gcc"
|
|
"github.com/pion/interceptor/pkg/twcc"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/webrtc/v4"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/rtc/transport"
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
|
"github.com/livekit/livekit-server/pkg/sfu/bwe"
|
|
"github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe"
|
|
"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
|
|
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
|
|
sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
|
|
"github.com/livekit/livekit-server/pkg/sfu/pacer"
|
|
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
|
|
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
|
|
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
"github.com/livekit/livekit-server/pkg/utils"
|
|
lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor"
|
|
lktwcc "github.com/livekit/mediatransportutil/pkg/twcc"
|
|
"github.com/livekit/protocol/codecs/mime"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/logger/pionlogger"
|
|
lksdp "github.com/livekit/protocol/sdp"
|
|
"github.com/livekit/protocol/utils/mono"
|
|
)
|
|
|
|
const (
|
|
LossyDataChannel = "_lossy"
|
|
ReliableDataChannel = "_reliable"
|
|
DataTrackDataChannel = "_data_track"
|
|
|
|
fastNegotiationFrequency = 10 * time.Millisecond
|
|
negotiationFrequency = 150 * time.Millisecond
|
|
negotiationFailedTimeout = 15 * time.Second
|
|
dtlsRetransmissionInterval = 100 * time.Millisecond
|
|
dtlsHandshakeTimeout = time.Minute
|
|
|
|
iceDisconnectedTimeout = 10 * time.Second // compatible for ice-lite with firefox client
|
|
iceFailedTimeout = 5 * time.Second // time between disconnected and failed
|
|
iceFailedTimeoutTotal = iceFailedTimeout + iceDisconnectedTimeout // total time between connecting and failure
|
|
iceKeepaliveInterval = 2 * time.Second // pion's default
|
|
|
|
minTcpICEConnectTimeout = 5 * time.Second
|
|
maxTcpICEConnectTimeout = 12 * time.Second // js-sdk has a default 15s timeout for first connection, let server detect failure earlier before that
|
|
|
|
minConnectTimeoutAfterICE = 10 * time.Second
|
|
maxConnectTimeoutAfterICE = 20 * time.Second // max duration for waiting pc to connect after ICE is connected
|
|
|
|
shortConnectionThreshold = 90 * time.Second
|
|
|
|
dataChannelBufferSize = 65535
|
|
lossyDataChannelMinBufferedAmount = 8 * 1024
|
|
)
|
|
|
|
var (
|
|
ErrNoICETransport = errors.New("no ICE transport")
|
|
ErrIceRestartWithoutLocalSDP = errors.New("ICE restart without local SDP settled")
|
|
ErrIceRestartOnClosedPeerConnection = errors.New("ICE restart on closed peer connection")
|
|
ErrNoTransceiver = errors.New("no transceiver")
|
|
ErrNoSender = errors.New("no sender")
|
|
ErrMidNotFound = errors.New("mid not found")
|
|
ErrNotSynchronousLocalCandidatesMode = errors.New("not using synchronous local candidates mode")
|
|
ErrNoRemoteDescription = errors.New("no remote description")
|
|
ErrNoLocalDescription = errors.New("no local description")
|
|
ErrInvalidSDPFragment = errors.New("invalid sdp fragment")
|
|
ErrNoBundleMid = errors.New("could not get bundle mid")
|
|
ErrMidMismatch = errors.New("media mid does not match bundle mid")
|
|
ErrICECredentialMismatch = errors.New("ice credential mismatch")
|
|
)
|
|
|
|
// -------------------------------------------------------------------------
|
|
|
|
type signal int
|
|
|
|
const (
|
|
signalICEGatheringComplete signal = iota
|
|
signalLocalICECandidate
|
|
signalRemoteICECandidate
|
|
signalSendOffer
|
|
signalRemoteDescriptionReceived
|
|
signalICERestart
|
|
)
|
|
|
|
func (s signal) String() string {
|
|
switch s {
|
|
case signalICEGatheringComplete:
|
|
return "ICE_GATHERING_COMPLETE"
|
|
case signalLocalICECandidate:
|
|
return "LOCAL_ICE_CANDIDATE"
|
|
case signalRemoteICECandidate:
|
|
return "REMOTE_ICE_CANDIDATE"
|
|
case signalSendOffer:
|
|
return "SEND_OFFER"
|
|
case signalRemoteDescriptionReceived:
|
|
return "REMOTE_DESCRIPTION_RECEIVED"
|
|
case signalICERestart:
|
|
return "ICE_RESTART"
|
|
default:
|
|
return fmt.Sprintf("%d", int(s))
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------
|
|
|
|
type event struct {
|
|
*PCTransport
|
|
signal signal
|
|
data any
|
|
}
|
|
|
|
func (e event) String() string {
|
|
return fmt.Sprintf("PCTransport:Event{signal: %s, data: %+v}", e.signal, e.data)
|
|
}
|
|
|
|
// -------------------------------------------------------
|
|
|
|
type wrappedICECandidatePairLogger struct {
|
|
pair *webrtc.ICECandidatePair
|
|
}
|
|
|
|
func (w wrappedICECandidatePairLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
|
|
if w.pair == nil {
|
|
return nil
|
|
}
|
|
|
|
if w.pair.Local != nil {
|
|
e.AddString("localProtocol", w.pair.Local.Protocol.String())
|
|
e.AddString("localCandidateType", w.pair.Local.Typ.String())
|
|
e.AddString("localAddress", w.pair.Local.Address)
|
|
e.AddUint16("localPort", w.pair.Local.Port)
|
|
}
|
|
if w.pair.Remote != nil {
|
|
e.AddString("remoteProtocol", w.pair.Remote.Protocol.String())
|
|
e.AddString("remoteCandidateType", w.pair.Remote.Typ.String())
|
|
e.AddString("remoteAddress", MaybeTruncateIP(w.pair.Remote.Address))
|
|
e.AddUint16("remotePort", w.pair.Remote.Port)
|
|
if w.pair.Remote.RelatedAddress != "" {
|
|
e.AddString("relatedAddress", MaybeTruncateIP(w.pair.Remote.RelatedAddress))
|
|
e.AddUint16("relatedPort", w.pair.Remote.RelatedPort)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
|
|
type trackDescription struct {
|
|
mid string
|
|
sender *webrtc.RTPSender
|
|
}
|
|
|
|
// PCTransport is a wrapper around PeerConnection, with some helper methods
|
|
type PCTransport struct {
|
|
params TransportParams
|
|
pc *webrtc.PeerConnection
|
|
iceTransport *webrtc.ICETransport
|
|
me *webrtc.MediaEngine
|
|
|
|
lock sync.RWMutex
|
|
|
|
firstOfferReceived bool
|
|
firstOfferNoDataChannel bool
|
|
reliableDC *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
reliableDCOpened bool
|
|
lossyDC *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
lossyDCOpened bool
|
|
dataTrackDC *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
unlabeledDataChannels []*datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
|
|
iceStartedAt time.Time
|
|
iceConnectedAt time.Time
|
|
firstConnectedAt time.Time
|
|
connectedAt time.Time
|
|
tcpICETimer *time.Timer
|
|
connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected
|
|
resetShortConnOnICERestart atomic.Bool
|
|
signalingRTT atomic.Uint32 // milliseconds
|
|
|
|
debouncedNegotiate *sfuutils.Debouncer
|
|
debouncePending bool
|
|
lastNegotiate time.Time
|
|
|
|
onNegotiationStateChanged func(state transport.NegotiationState)
|
|
|
|
rtxInfoExtractorFactory *sfuinterceptor.RTXInfoExtractorFactory
|
|
|
|
// stream allocator for subscriber PC
|
|
streamAllocator *streamallocator.StreamAllocator
|
|
|
|
// only for subscriber PC
|
|
bwe bwe.BWE
|
|
pacer pacer.Pacer
|
|
|
|
// transceivers (senders) waiting for SetRemoteDescription (offer) to happen before
|
|
// SetCodecPreferences can be invoked on them.
|
|
// Pion adapts codecs/payload types from remote description.
|
|
// If SetCodecPreferences are done before the remote description is processed,
|
|
// it is possible that the transceiver gets payload types from media engine.
|
|
// Subssequently if the peer sends an offer with different payload type for the
|
|
// same codec, there could be two payload types for the same codec and the wrong
|
|
// one could be used in the forwarding path. So, wait for `SetRemoteDescription`
|
|
// to happen so that remote side payload types are adapted.
|
|
sendersPendingConfigMu sync.Mutex
|
|
sendersPendingConfig []configureSenderParams
|
|
|
|
previousAnswer *webrtc.SessionDescription
|
|
// track id -> description map in previous offer sdp
|
|
previousTrackDescription map[string]*trackDescription
|
|
canReuseTransceiver bool
|
|
|
|
preferTCP atomic.Bool
|
|
isClosed atomic.Bool
|
|
|
|
// used to check for offer/answer pairing,
|
|
// i. e. every offer should have an answer before another offer can be sent
|
|
localOfferId atomic.Uint32
|
|
remoteAnswerId atomic.Uint32
|
|
|
|
remoteOfferId atomic.Uint32
|
|
localAnswerId atomic.Uint32
|
|
|
|
eventsQueue *utils.TypedOpsQueue[event]
|
|
|
|
connectionDetails *types.ICEConnectionDetails
|
|
selectedPair atomic.Pointer[webrtc.ICECandidatePair]
|
|
mayFailedICEStats []iceCandidatePairStats
|
|
mayFailedICEStatsTimer *time.Timer
|
|
|
|
numOutstandingAudios uint32
|
|
numRequestSentAudios uint32
|
|
numOutstandingVideos uint32
|
|
numRequestSentVideos uint32
|
|
|
|
// the following should be accessed only in event processing go routine
|
|
cacheLocalCandidates bool
|
|
cachedLocalCandidates []*webrtc.ICECandidate
|
|
pendingRemoteCandidates []*webrtc.ICECandidateInit
|
|
restartAfterGathering bool
|
|
restartAtNextOffer bool
|
|
negotiationState transport.NegotiationState
|
|
negotiateCounter atomic.Int32
|
|
signalStateCheckTimer *time.Timer
|
|
currentOfferIceCredential string // ice user:pwd, for publish side ice restart checking
|
|
pendingRestartIceOffer *webrtc.SessionDescription
|
|
}
|
|
|
|
type TransportParams struct {
|
|
Handler transport.Handler
|
|
ProtocolVersion types.ProtocolVersion
|
|
Config *WebRTCConfig
|
|
Twcc *lktwcc.Responder
|
|
DirectionConfig DirectionConfig
|
|
CongestionControlConfig config.CongestionControlConfig
|
|
EnabledCodecs []*livekit.Codec
|
|
Logger logger.Logger
|
|
Transport livekit.SignalTarget
|
|
SimTracks map[uint32]sfuinterceptor.SimulcastTrackInfo
|
|
ClientInfo ClientInfo
|
|
IsOfferer bool
|
|
IsSendSide bool
|
|
AllowPlayoutDelay bool
|
|
UseOneShotSignallingMode bool
|
|
FireOnTrackBySdp bool
|
|
DataChannelMaxBufferedAmount uint64
|
|
DatachannelSlowThreshold int
|
|
DatachannelLossyTargetLatency time.Duration
|
|
|
|
// for development test
|
|
DatachannelMaxReceiverBufferSize int
|
|
|
|
EnableDataTracks bool
|
|
}
|
|
|
|
func newPeerConnection(
|
|
params TransportParams,
|
|
onBandwidthEstimator func(estimator cc.BandwidthEstimator),
|
|
) (*webrtc.PeerConnection, *webrtc.MediaEngine, *sfuinterceptor.RTXInfoExtractorFactory, error) {
|
|
directionConfig := params.DirectionConfig
|
|
if params.AllowPlayoutDelay {
|
|
directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, pd.PlayoutDelayURI)
|
|
}
|
|
|
|
// Some of the browser clients do not handle H.264 High Profile in signalling properly.
|
|
// They still decode if the actual stream is H.264 High Profile, but do not handle it well in signalling.
|
|
// So, disable H.264 High Profile for SUBSCRIBER peer connection to ensure it is not offered.
|
|
me, err := createMediaEngine(params.EnabledCodecs, directionConfig, params.IsOfferer)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
se := params.Config.SettingEngine
|
|
se.DisableMediaEngineCopy(true)
|
|
// simulcast layer disable/enable signalled via signalling channel,
|
|
// so disable rid pause in SDP
|
|
se.SetIgnoreRidPauseForRecv(true)
|
|
|
|
// Change elliptic curve to improve connectivity
|
|
// https://github.com/pion/dtls/pull/474
|
|
se.SetDTLSEllipticCurves(elliptic.X25519, elliptic.P384, elliptic.P256)
|
|
|
|
// Disable close by dtls to avoid peerconnection close too early in migration
|
|
// https://github.com/pion/webrtc/pull/2961
|
|
se.DisableCloseByDTLS(true)
|
|
|
|
se.DetachDataChannels()
|
|
if params.DatachannelSlowThreshold > 0 {
|
|
se.EnableDataChannelBlockWrite(true)
|
|
}
|
|
if params.DatachannelMaxReceiverBufferSize > 0 {
|
|
se.SetSCTPMaxReceiveBufferSize(uint32(params.DatachannelMaxReceiverBufferSize))
|
|
}
|
|
if params.FireOnTrackBySdp {
|
|
se.SetFireOnTrackBeforeFirstRTP(true)
|
|
}
|
|
|
|
if params.ClientInfo.SupportsSctpZeroChecksum() {
|
|
se.EnableSCTPZeroChecksum(true)
|
|
}
|
|
|
|
//
|
|
// Disable SRTP replay protection (https://datatracker.ietf.org/doc/html/rfc3711#page-15).
|
|
// Needed due to lack of RTX stream support in Pion.
|
|
//
|
|
// When clients probe for bandwidth, there are several possible approaches
|
|
// 1. Use padding packet (Chrome uses this)
|
|
// 2. Use an older packet (Firefox uses this)
|
|
// Typically, these are sent over the RTX stream and hence SRTP replay protection will not
|
|
// trigger. As Pion does not support RTX, when firefox uses older packet for probing, they
|
|
// trigger the replay protection.
|
|
//
|
|
// That results in two issues
|
|
// - Firefox bandwidth probing is not successful
|
|
// - Pion runs out of read buffer capacity - this potentially looks like a Pion issue
|
|
//
|
|
// NOTE: It is not required to disable RTCP replay protection, but doing it to be symmetric.
|
|
//
|
|
se.DisableSRTPReplayProtection(true)
|
|
se.DisableSRTCPReplayProtection(true)
|
|
if !params.ProtocolVersion.SupportsICELite() || !params.ClientInfo.SupportsPrflxOverRelay() {
|
|
// if client don't support prflx over relay which is only Firefox, disable ICE Lite to ensure that
|
|
// aggressive nomination is handled properly. Firefox does aggressive nomination even if peer is
|
|
// ICE Lite (see comment as to historical reasons: https://github.com/pion/ice/pull/739#issuecomment-2452245066).
|
|
// pion/ice (as of v2.3.37) will accept all use-candidate switches when in ICE Lite mode.
|
|
// That combined with aggressive nomination from Firefox could potentially lead to the two ends
|
|
// ending up with different candidates.
|
|
// As Firefox does not support migration, ICE Lite can be disabled.
|
|
se.SetLite(false)
|
|
}
|
|
se.SetDTLSRetransmissionInterval(dtlsRetransmissionInterval)
|
|
se.SetDTLSConnectContextMaker(func() (context.Context, func()) {
|
|
return context.WithTimeout(context.Background(), dtlsHandshakeTimeout)
|
|
})
|
|
se.SetICETimeouts(iceDisconnectedTimeout, iceFailedTimeout, iceKeepaliveInterval)
|
|
|
|
// if client don't support prflx over relay, we should not expose private address to it, use single external ip as host candidate
|
|
if !params.ClientInfo.SupportsPrflxOverRelay() && len(params.Config.NAT1To1IPs) > 0 {
|
|
var nat1to1Ips []string
|
|
var includeIps []string
|
|
for _, mapping := range params.Config.NAT1To1IPs {
|
|
if ips := strings.Split(mapping, "/"); len(ips) == 2 {
|
|
if ips[0] != ips[1] {
|
|
nat1to1Ips = append(nat1to1Ips, mapping)
|
|
includeIps = append(includeIps, ips[1])
|
|
}
|
|
}
|
|
}
|
|
if len(nat1to1Ips) > 0 {
|
|
params.Logger.Infow("client doesn't support prflx over relay, use external ip only as host candidate", "ips", nat1to1Ips)
|
|
se.SetNAT1To1IPs(nat1to1Ips, webrtc.ICECandidateTypeHost)
|
|
se.SetIPFilter(func(ip net.IP) bool {
|
|
if ip.To4() == nil {
|
|
return true
|
|
}
|
|
ipstr := ip.String()
|
|
return slices.Contains(includeIps, ipstr)
|
|
})
|
|
}
|
|
}
|
|
|
|
lf := pionlogger.NewLoggerFactory(params.Logger)
|
|
if lf != nil {
|
|
se.LoggerFactory = lf
|
|
}
|
|
|
|
ir := &interceptor.Registry{}
|
|
if params.IsSendSide {
|
|
if params.CongestionControlConfig.UseSendSideBWEInterceptor && !params.CongestionControlConfig.UseSendSideBWE {
|
|
params.Logger.Infow("using send side BWE - interceptor")
|
|
gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
|
|
return gcc.NewSendSideBWE(
|
|
gcc.SendSideBWEInitialBitrate(1*1000*1000),
|
|
gcc.SendSideBWEPacer(gcc.NewNoOpPacer()),
|
|
)
|
|
})
|
|
if err == nil {
|
|
gf.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
|
|
if onBandwidthEstimator != nil {
|
|
onBandwidthEstimator(estimator)
|
|
}
|
|
})
|
|
ir.Add(gf)
|
|
|
|
tf, err := twcc.NewHeaderExtensionInterceptor()
|
|
if err == nil {
|
|
ir.Add(tf)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !params.IsOfferer {
|
|
// sfu only use interceptor to send XR but don't read response from it (use buffer instead),
|
|
// so use a empty callback here
|
|
ir.Add(lkinterceptor.NewRTTFromXRFactory(func(rtt uint32) {}))
|
|
}
|
|
if len(params.SimTracks) > 0 {
|
|
f, err := sfuinterceptor.NewUnhandleSimulcastInterceptorFactory(sfuinterceptor.UnhandleSimulcastTracks(params.Logger, params.SimTracks))
|
|
if err != nil {
|
|
params.Logger.Warnw("NewUnhandleSimulcastInterceptorFactory failed", err)
|
|
} else {
|
|
ir.Add(f)
|
|
}
|
|
}
|
|
|
|
setTWCCForVideo := func(info *interceptor.StreamInfo) {
|
|
if !mime.IsMimeTypeStringVideo(info.MimeType) {
|
|
return
|
|
}
|
|
// rtx stream don't have rtcp feedback, always set twcc for rtx stream
|
|
twccFb := mime.GetMimeTypeCodec(info.MimeType) == mime.MimeTypeCodecRTX
|
|
if !twccFb {
|
|
for _, fb := range info.RTCPFeedback {
|
|
if fb.Type == webrtc.TypeRTCPFBTransportCC {
|
|
twccFb = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !twccFb {
|
|
return
|
|
}
|
|
|
|
twccExtID := sfuutils.GetHeaderExtensionID(info.RTPHeaderExtensions, webrtc.RTPHeaderExtensionCapability{URI: sdp.TransportCCURI})
|
|
if twccExtID != 0 {
|
|
if buffer := params.Config.BufferFactory.GetBuffer(info.SSRC); buffer != nil {
|
|
params.Logger.Debugw(
|
|
"set twcc and ext id",
|
|
"ssrc", info.SSRC,
|
|
"isRTX", mime.GetMimeTypeCodec(info.MimeType) == mime.MimeTypeCodecRTX,
|
|
"twccExtID", twccExtID,
|
|
)
|
|
buffer.SetTWCCAndExtID(params.Twcc, uint8(twccExtID))
|
|
} else {
|
|
params.Logger.Warnw("failed to get buffer for stream", nil, "ssrc", info.SSRC)
|
|
}
|
|
}
|
|
}
|
|
rtxInfoExtractorFactory := sfuinterceptor.NewRTXInfoExtractorFactory(
|
|
setTWCCForVideo,
|
|
func(repair, base uint32, rsid string) {
|
|
params.Logger.Debugw("rtx pair found from extension", "repair", repair, "base", base, "rsid", rsid)
|
|
params.Config.BufferFactory.SetRTXPair(repair, base, rsid)
|
|
},
|
|
params.Logger,
|
|
)
|
|
// put rtx interceptor behind unhandle simulcast interceptor so it can get the correct mid & rid
|
|
ir.Add(rtxInfoExtractorFactory)
|
|
|
|
api := webrtc.NewAPI(
|
|
webrtc.WithMediaEngine(me),
|
|
webrtc.WithSettingEngine(se),
|
|
webrtc.WithInterceptorRegistry(ir),
|
|
)
|
|
pc, err := api.NewPeerConnection(params.Config.Configuration)
|
|
return pc, me, rtxInfoExtractorFactory, err
|
|
}
|
|
|
|
func NewPCTransport(params TransportParams) (*PCTransport, error) {
|
|
if params.Logger == nil {
|
|
params.Logger = logger.GetLogger()
|
|
}
|
|
t := &PCTransport{
|
|
params: params,
|
|
debouncedNegotiate: sfuutils.NewDebouncer(negotiationFrequency),
|
|
negotiationState: transport.NegotiationStateNone,
|
|
eventsQueue: utils.NewTypedOpsQueue[event](utils.OpsQueueParams{
|
|
Name: "transport",
|
|
MinSize: 64,
|
|
Logger: params.Logger,
|
|
}),
|
|
previousTrackDescription: make(map[string]*trackDescription),
|
|
canReuseTransceiver: true,
|
|
connectionDetails: types.NewICEConnectionDetails(params.Transport, params.Logger),
|
|
lastNegotiate: time.Now(),
|
|
}
|
|
t.localOfferId.Store(uint32(rand.Intn(1<<8) + 1))
|
|
|
|
bwe, err := t.createPeerConnection()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if params.IsSendSide {
|
|
if params.CongestionControlConfig.UseSendSideBWE {
|
|
params.Logger.Infow("using send side BWE", "pacerBehavior", params.CongestionControlConfig.SendSideBWEPacer)
|
|
t.bwe = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
|
|
Config: params.CongestionControlConfig.SendSideBWE,
|
|
Logger: params.Logger,
|
|
})
|
|
switch pacer.PacerBehavior(params.CongestionControlConfig.SendSideBWEPacer) {
|
|
case pacer.PacerBehaviorPassThrough:
|
|
t.pacer = pacer.NewPassThrough(params.Logger, t.bwe)
|
|
case pacer.PacerBehaviorNoQueue:
|
|
t.pacer = pacer.NewNoQueue(params.Logger, t.bwe)
|
|
default:
|
|
t.pacer = pacer.NewNoQueue(params.Logger, t.bwe)
|
|
}
|
|
} else {
|
|
t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{
|
|
Config: params.CongestionControlConfig.RemoteBWE,
|
|
Logger: params.Logger,
|
|
})
|
|
t.pacer = pacer.NewPassThrough(params.Logger, nil)
|
|
}
|
|
|
|
t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{
|
|
Config: params.CongestionControlConfig.StreamAllocator,
|
|
BWE: t.bwe,
|
|
Pacer: t.pacer,
|
|
RTTGetter: t.GetRTT,
|
|
Logger: params.Logger.WithComponent(utils.ComponentCongestionControl),
|
|
}, params.CongestionControlConfig.Enabled, params.CongestionControlConfig.AllowPause)
|
|
t.streamAllocator.OnStreamStateChange(params.Handler.OnStreamStateChange)
|
|
t.streamAllocator.Start()
|
|
|
|
if bwe != nil {
|
|
t.streamAllocator.SetSendSideBWEInterceptor(bwe)
|
|
}
|
|
}
|
|
|
|
t.eventsQueue.Start()
|
|
|
|
return t, nil
|
|
}
|
|
|
|
func (t *PCTransport) createPeerConnection() (cc.BandwidthEstimator, error) {
|
|
var bwe cc.BandwidthEstimator
|
|
pc, me, rtxInfoExtractorFactory, err := newPeerConnection(t.params, func(estimator cc.BandwidthEstimator) {
|
|
bwe = estimator
|
|
})
|
|
if err != nil {
|
|
return bwe, err
|
|
}
|
|
|
|
t.pc = pc
|
|
if !t.params.UseOneShotSignallingMode {
|
|
// one shot signalling mode gathers all candidates and sends in answer
|
|
t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange)
|
|
t.pc.OnICECandidate(t.onICECandidateTrickle)
|
|
}
|
|
t.pc.OnICEConnectionStateChange(t.onICEConnectionStateChange)
|
|
t.pc.OnConnectionStateChange(t.onPeerConnectionStateChange)
|
|
|
|
t.pc.OnDataChannel(t.onDataChannel)
|
|
t.pc.OnTrack(t.params.Handler.OnTrack)
|
|
|
|
t.iceTransport = t.pc.SCTP().Transport().ICETransport()
|
|
if t.iceTransport == nil {
|
|
return bwe, ErrNoICETransport
|
|
}
|
|
t.iceTransport.OnSelectedCandidatePairChange(func(pair *webrtc.ICECandidatePair) {
|
|
t.params.Logger.Debugw("selected ICE candidate pair changed", "pair", wrappedICECandidatePairLogger{pair})
|
|
t.connectionDetails.SetSelectedPair(pair)
|
|
existingPair := t.selectedPair.Load()
|
|
if existingPair != nil {
|
|
t.params.Logger.Infow(
|
|
"ice reconnected or switched pair",
|
|
"existingPair", wrappedICECandidatePairLogger{existingPair},
|
|
"newPair", wrappedICECandidatePairLogger{pair})
|
|
}
|
|
t.selectedPair.Store(pair)
|
|
})
|
|
|
|
t.me = me
|
|
|
|
t.rtxInfoExtractorFactory = rtxInfoExtractorFactory
|
|
return bwe, nil
|
|
}
|
|
|
|
func (t *PCTransport) RTPStreamPublished(ssrc uint32, mid, rid string) {
|
|
t.rtxInfoExtractorFactory.SetStreamInfo(ssrc, mid, rid, "")
|
|
}
|
|
|
|
func (t *PCTransport) GetPacer() pacer.Pacer {
|
|
return t.pacer
|
|
}
|
|
|
|
func (t *PCTransport) SetSignalingRTT(rtt uint32) {
|
|
t.signalingRTT.Store(rtt)
|
|
}
|
|
|
|
func (t *PCTransport) setICEStartedAt(at time.Time) {
|
|
t.lock.Lock()
|
|
if t.iceStartedAt.IsZero() {
|
|
t.iceStartedAt = at
|
|
|
|
// checklist of ice agent will be cleared on ice failed, get stats before that
|
|
t.mayFailedICEStatsTimer = time.AfterFunc(iceFailedTimeoutTotal-time.Second, t.logMayFailedICEStats)
|
|
|
|
// set failure timer for tcp ice connection based on signaling RTT
|
|
if t.preferTCP.Load() {
|
|
signalingRTT := t.signalingRTT.Load()
|
|
if signalingRTT < 1000 {
|
|
tcpICETimeout := time.Duration(signalingRTT*8) * time.Millisecond
|
|
if tcpICETimeout < minTcpICEConnectTimeout {
|
|
tcpICETimeout = minTcpICEConnectTimeout
|
|
} else if tcpICETimeout > maxTcpICEConnectTimeout {
|
|
tcpICETimeout = maxTcpICEConnectTimeout
|
|
}
|
|
t.params.Logger.Debugw("set TCP ICE connect timer", "timeout", tcpICETimeout, "signalRTT", signalingRTT)
|
|
t.tcpICETimer = time.AfterFunc(tcpICETimeout, func() {
|
|
if t.pc.ICEConnectionState() == webrtc.ICEConnectionStateChecking {
|
|
t.params.Logger.Infow("TCP ICE connect timeout", "timeout", tcpICETimeout, "signalRTT", signalingRTT)
|
|
t.logMayFailedICEStats()
|
|
t.handleConnectionFailed(true)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) setICEConnectedAt(at time.Time) {
|
|
t.lock.Lock()
|
|
if t.iceConnectedAt.IsZero() {
|
|
//
|
|
// Record initial connection time.
|
|
// This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`.
|
|
//
|
|
t.iceConnectedAt = at
|
|
|
|
// set failure timer for dtls handshake
|
|
iceDuration := at.Sub(t.iceStartedAt)
|
|
connTimeoutAfterICE := min(max(minConnectTimeoutAfterICE, 3*iceDuration), maxConnectTimeoutAfterICE)
|
|
t.params.Logger.Debugw("setting connection timer after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
|
|
t.connectAfterICETimer = time.AfterFunc(connTimeoutAfterICE, func() {
|
|
state := t.pc.ConnectionState()
|
|
// if pc is still checking or connected but not fully established after timeout, then fire connection fail
|
|
if state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && !t.isFullyEstablished() {
|
|
t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
|
|
t.handleConnectionFailed(false)
|
|
}
|
|
})
|
|
|
|
// clear tcp ice connect timer
|
|
if t.tcpICETimer != nil {
|
|
t.tcpICETimer.Stop()
|
|
t.tcpICETimer = nil
|
|
}
|
|
}
|
|
|
|
if t.mayFailedICEStatsTimer != nil {
|
|
t.mayFailedICEStatsTimer.Stop()
|
|
t.mayFailedICEStatsTimer = nil
|
|
}
|
|
t.mayFailedICEStats = nil
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) logMayFailedICEStats() {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
return
|
|
}
|
|
|
|
var candidatePairStats []webrtc.ICECandidatePairStats
|
|
pairStats := t.pc.GetStats()
|
|
candidateStats := make(map[string]webrtc.ICECandidateStats)
|
|
for _, stat := range pairStats {
|
|
switch stat := stat.(type) {
|
|
case webrtc.ICECandidatePairStats:
|
|
candidatePairStats = append(candidatePairStats, stat)
|
|
case webrtc.ICECandidateStats:
|
|
candidateStats[stat.ID] = stat
|
|
}
|
|
}
|
|
|
|
iceStats := make([]iceCandidatePairStats, 0, len(candidatePairStats))
|
|
for _, pairStat := range candidatePairStats {
|
|
iceStat := iceCandidatePairStats{ICECandidatePairStats: pairStat}
|
|
if local, ok := candidateStats[pairStat.LocalCandidateID]; ok {
|
|
iceStat.local = local
|
|
}
|
|
if remote, ok := candidateStats[pairStat.RemoteCandidateID]; ok {
|
|
remote.IP = MaybeTruncateIP(remote.IP)
|
|
iceStat.remote = remote
|
|
}
|
|
iceStats = append(iceStats, iceStat)
|
|
}
|
|
|
|
t.lock.Lock()
|
|
t.mayFailedICEStats = iceStats
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) resetShortConn() {
|
|
t.params.Logger.Infow("resetting short connection on ICE restart")
|
|
t.lock.Lock()
|
|
t.iceStartedAt = time.Time{}
|
|
t.iceConnectedAt = time.Time{}
|
|
t.connectedAt = time.Time{}
|
|
if t.connectAfterICETimer != nil {
|
|
t.connectAfterICETimer.Stop()
|
|
t.connectAfterICETimer = nil
|
|
}
|
|
if t.tcpICETimer != nil {
|
|
t.tcpICETimer.Stop()
|
|
t.tcpICETimer = nil
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) IsShortConnection(at time.Time) (bool, time.Duration) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
if t.iceConnectedAt.IsZero() {
|
|
return false, 0
|
|
}
|
|
|
|
duration := at.Sub(t.iceConnectedAt)
|
|
return duration < shortConnectionThreshold, duration
|
|
}
|
|
|
|
func (t *PCTransport) setConnectedAt(at time.Time) bool {
|
|
t.lock.Lock()
|
|
t.connectedAt = at
|
|
if !t.firstConnectedAt.IsZero() {
|
|
t.lock.Unlock()
|
|
return false
|
|
}
|
|
|
|
t.firstConnectedAt = at
|
|
prometheus.RecordServiceOperationSuccess("peer_connection")
|
|
t.lock.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGatheringState) {
|
|
t.params.Logger.Debugw("ice gathering state change", "state", state.String())
|
|
if state != webrtc.ICEGatheringStateComplete {
|
|
return
|
|
}
|
|
|
|
t.postEvent(event{
|
|
signal: signalICEGatheringComplete,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) onICECandidateTrickle(c *webrtc.ICECandidate) {
|
|
t.postEvent(event{
|
|
signal: signalLocalICECandidate,
|
|
data: c,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) handleConnectionFailed(forceShortConn bool) {
|
|
isShort := forceShortConn
|
|
if !isShort {
|
|
var duration time.Duration
|
|
isShort, duration = t.IsShortConnection(time.Now())
|
|
if isShort {
|
|
t.params.Logger.Debugw("short ICE connection", "pair", wrappedICECandidatePairLogger{t.selectedPair.Load()}, "duration", duration)
|
|
}
|
|
}
|
|
|
|
t.params.Handler.OnFailed(isShort, t.GetICEConnectionInfo())
|
|
}
|
|
|
|
func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState) {
|
|
t.params.Logger.Debugw("ice connection state change", "state", state.String())
|
|
switch state {
|
|
case webrtc.ICEConnectionStateConnected:
|
|
t.setICEConnectedAt(time.Now())
|
|
|
|
case webrtc.ICEConnectionStateChecking:
|
|
t.setICEStartedAt(time.Now())
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionState) {
|
|
t.params.Logger.Debugw("peer connection state change", "state", state.String())
|
|
switch state {
|
|
case webrtc.PeerConnectionStateConnected:
|
|
t.clearConnTimer()
|
|
isInitialConnection := t.setConnectedAt(time.Now())
|
|
if isInitialConnection {
|
|
t.params.Handler.OnInitialConnected()
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
}
|
|
case webrtc.PeerConnectionStateFailed:
|
|
t.clearConnTimer()
|
|
t.handleConnectionFailed(false)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
|
|
dc.OnOpen(func() {
|
|
t.params.Logger.Debugw(dc.Label() + " data channel open")
|
|
var kind livekit.DataPacket_Kind
|
|
var isDataTrack bool
|
|
var isUnlabeled bool
|
|
switch dc.Label() {
|
|
case ReliableDataChannel:
|
|
kind = livekit.DataPacket_RELIABLE
|
|
|
|
case LossyDataChannel:
|
|
kind = livekit.DataPacket_LOSSY
|
|
|
|
case DataTrackDataChannel:
|
|
isDataTrack = true
|
|
|
|
default:
|
|
t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label())
|
|
isUnlabeled = true
|
|
}
|
|
|
|
rawDC, err := dc.DetachWithDeadline()
|
|
if err != nil {
|
|
t.params.Logger.Errorw("failed to detach data channel", err, "label", dc.Label())
|
|
return
|
|
}
|
|
|
|
isHandled := true
|
|
t.lock.Lock()
|
|
switch {
|
|
case isUnlabeled:
|
|
t.unlabeledDataChannels = append(
|
|
t.unlabeledDataChannels,
|
|
datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold),
|
|
)
|
|
|
|
case isDataTrack:
|
|
if !t.params.EnableDataTracks {
|
|
t.params.Logger.Debugw("data tracks not enabled")
|
|
isHandled = false
|
|
} else {
|
|
if t.dataTrackDC != nil {
|
|
t.dataTrackDC.Close()
|
|
}
|
|
t.dataTrackDC = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, 0, 0)
|
|
}
|
|
|
|
case kind == livekit.DataPacket_RELIABLE:
|
|
if t.reliableDC != nil {
|
|
t.reliableDC.Close()
|
|
}
|
|
t.reliableDC = datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold)
|
|
t.reliableDCOpened = true
|
|
|
|
case kind == livekit.DataPacket_LOSSY:
|
|
if t.lossyDC != nil {
|
|
t.lossyDC.Close()
|
|
}
|
|
t.lossyDC = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, t.params.DatachannelLossyTargetLatency, uint64(lossyDataChannelMinBufferedAmount))
|
|
t.lossyDCOpened = true
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
if !isHandled {
|
|
rawDC.Close()
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
defer rawDC.Close()
|
|
buffer := make([]byte, dataChannelBufferSize)
|
|
for {
|
|
n, _, err := rawDC.ReadDataChannel(buffer)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
|
|
t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
|
|
}
|
|
return
|
|
}
|
|
|
|
switch {
|
|
case isUnlabeled:
|
|
t.params.Handler.OnDataMessageUnlabeled(buffer[:n])
|
|
|
|
case isDataTrack:
|
|
t.params.Handler.OnDataTrackMessage(buffer[:n], mono.UnixNano())
|
|
|
|
default:
|
|
t.params.Handler.OnDataMessage(kind, buffer[:n])
|
|
}
|
|
}
|
|
}()
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) maybeNotifyFullyEstablished() {
|
|
if t.isFullyEstablished() {
|
|
t.params.Handler.OnFullyEstablished()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) isFullyEstablished() bool {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
dataChannelReady := t.params.UseOneShotSignallingMode || t.firstOfferNoDataChannel || (t.reliableDCOpened && t.lossyDCOpened)
|
|
|
|
return dataChannelReady && !t.connectedAt.IsZero()
|
|
}
|
|
|
|
func (t *PCTransport) SetPreferTCP(preferTCP bool) {
|
|
t.preferTCP.Store(preferTCP)
|
|
}
|
|
|
|
func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) {
|
|
t.postEvent(event{
|
|
signal: signalRemoteICECandidate,
|
|
data: &candidate,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) queueOrConfigureSender(
|
|
transceiver *webrtc.RTPTransceiver,
|
|
enabledCodecs []*livekit.Codec,
|
|
rtcpFeedbackConfig RTCPFeedbackConfig,
|
|
enableAudioStereo bool,
|
|
enableAudioNACK bool,
|
|
) {
|
|
params := configureSenderParams{
|
|
transceiver,
|
|
enabledCodecs,
|
|
rtcpFeedbackConfig,
|
|
!t.params.IsOfferer,
|
|
enableAudioStereo,
|
|
enableAudioNACK,
|
|
}
|
|
if !t.params.IsOfferer {
|
|
t.sendersPendingConfigMu.Lock()
|
|
t.sendersPendingConfig = append(t.sendersPendingConfig, params)
|
|
t.sendersPendingConfigMu.Unlock()
|
|
return
|
|
}
|
|
|
|
configureSender(params)
|
|
}
|
|
|
|
func (t *PCTransport) processSendersPendingConfig() {
|
|
t.sendersPendingConfigMu.Lock()
|
|
pending := t.sendersPendingConfig
|
|
t.sendersPendingConfig = nil
|
|
t.sendersPendingConfigMu.Unlock()
|
|
|
|
var unprocessed []configureSenderParams
|
|
for _, p := range pending {
|
|
if p.transceiver.Mid() == "" {
|
|
unprocessed = append(unprocessed, p)
|
|
continue
|
|
}
|
|
|
|
configureSender(p)
|
|
}
|
|
|
|
if len(unprocessed) != 0 {
|
|
t.sendersPendingConfigMu.Lock()
|
|
t.sendersPendingConfig = append(t.sendersPendingConfig, unprocessed...)
|
|
t.sendersPendingConfigMu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) AddTrack(
|
|
trackLocal webrtc.TrackLocal,
|
|
params types.AddTrackParams,
|
|
enabledCodecs []*livekit.Codec,
|
|
rtcpFeedbackConfig RTCPFeedbackConfig,
|
|
) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
|
|
t.lock.Lock()
|
|
canReuse := t.canReuseTransceiver
|
|
td, ok := t.previousTrackDescription[trackLocal.ID()]
|
|
if ok {
|
|
delete(t.previousTrackDescription, trackLocal.ID())
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
// keep track use same mid after migration if possible
|
|
if td != nil && td.sender != nil {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() == td.mid {
|
|
return td.sender, tr, tr.SetSender(td.sender, trackLocal)
|
|
}
|
|
}
|
|
}
|
|
|
|
// if never negotiated with client, can't reuse transceiver for track not subscribed before migration
|
|
if !canReuse {
|
|
return t.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
|
|
}
|
|
|
|
sender, err = t.pc.AddTrack(trackLocal)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Sender() == sender {
|
|
transceiver = tr
|
|
break
|
|
}
|
|
}
|
|
|
|
if transceiver == nil {
|
|
err = ErrNoTransceiver
|
|
return
|
|
}
|
|
|
|
t.queueOrConfigureSender(
|
|
transceiver,
|
|
enabledCodecs,
|
|
rtcpFeedbackConfig,
|
|
params.Stereo,
|
|
!params.Red || !t.params.ClientInfo.SupportsAudioRED(),
|
|
)
|
|
|
|
t.adjustNumOutstandingMedia(transceiver)
|
|
return
|
|
}
|
|
|
|
func (t *PCTransport) AddTransceiverFromTrack(
|
|
trackLocal webrtc.TrackLocal,
|
|
params types.AddTrackParams,
|
|
enabledCodecs []*livekit.Codec,
|
|
rtcpFeedbackConfig RTCPFeedbackConfig,
|
|
) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
|
|
transceiver, err = t.pc.AddTransceiverFromTrack(trackLocal)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sender = transceiver.Sender()
|
|
if sender == nil {
|
|
err = ErrNoSender
|
|
return
|
|
}
|
|
|
|
t.queueOrConfigureSender(
|
|
transceiver,
|
|
enabledCodecs,
|
|
rtcpFeedbackConfig,
|
|
params.Stereo,
|
|
!params.Red || !t.params.ClientInfo.SupportsAudioRED(),
|
|
)
|
|
|
|
t.adjustNumOutstandingMedia(transceiver)
|
|
return
|
|
}
|
|
|
|
func (t *PCTransport) AddTransceiverFromKind(
|
|
kind webrtc.RTPCodecType,
|
|
init webrtc.RTPTransceiverInit,
|
|
) (*webrtc.RTPTransceiver, error) {
|
|
return t.pc.AddTransceiverFromKind(kind, init)
|
|
}
|
|
|
|
func (t *PCTransport) RemoveTrack(sender *webrtc.RTPSender) error {
|
|
return t.pc.RemoveTrack(sender)
|
|
}
|
|
|
|
func (t *PCTransport) CurrentLocalDescription() *webrtc.SessionDescription {
|
|
cld := t.pc.CurrentLocalDescription()
|
|
if cld == nil {
|
|
return nil
|
|
}
|
|
|
|
ld := *cld
|
|
return &ld
|
|
}
|
|
|
|
func (t *PCTransport) CurrentRemoteDescription() *webrtc.SessionDescription {
|
|
crd := t.pc.CurrentRemoteDescription()
|
|
if crd == nil {
|
|
return nil
|
|
}
|
|
|
|
rd := *crd
|
|
return &rd
|
|
}
|
|
|
|
func (t *PCTransport) PendingRemoteDescription() *webrtc.SessionDescription {
|
|
prd := t.pc.PendingRemoteDescription()
|
|
if prd == nil {
|
|
return nil
|
|
}
|
|
|
|
rd := *prd
|
|
return &rd
|
|
}
|
|
|
|
func (t *PCTransport) GetMid(rtpReceiver *webrtc.RTPReceiver) string {
|
|
tr := rtpReceiver.RTPTransceiver()
|
|
if tr != nil {
|
|
return tr.Mid()
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (t *PCTransport) GetRTPTransceiver(mid string) *webrtc.RTPTransceiver {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() == mid {
|
|
return tr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) GetRTPReceiver(mid string) *webrtc.RTPReceiver {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() == mid {
|
|
return tr.Receiver()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) getNumUnmatchedTransceivers() (uint32, uint32) {
|
|
if t.isClosed.Load() || t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
return 0, 0
|
|
}
|
|
|
|
numAudios := uint32(0)
|
|
numVideos := uint32(0)
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() != "" {
|
|
continue
|
|
}
|
|
|
|
switch tr.Kind() {
|
|
case webrtc.RTPCodecTypeAudio:
|
|
numAudios++
|
|
|
|
case webrtc.RTPCodecTypeVideo:
|
|
numVideos++
|
|
}
|
|
}
|
|
|
|
return numAudios, numVideos
|
|
}
|
|
|
|
func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelInit) error {
|
|
if label == DataTrackDataChannel && !t.params.EnableDataTracks {
|
|
t.params.Logger.Debugw("data tracks not enabled")
|
|
return nil
|
|
}
|
|
|
|
dc, err := t.pc.CreateDataChannel(label, dci)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var (
|
|
dcPtr **datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
dcReady *bool
|
|
isDataTrack bool
|
|
isUnlabeled bool
|
|
kind livekit.DataPacket_Kind
|
|
)
|
|
switch dc.Label() {
|
|
default:
|
|
isUnlabeled = true
|
|
t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label())
|
|
|
|
case ReliableDataChannel:
|
|
dcPtr = &t.reliableDC
|
|
dcReady = &t.reliableDCOpened
|
|
kind = livekit.DataPacket_RELIABLE
|
|
|
|
case LossyDataChannel:
|
|
dcPtr = &t.lossyDC
|
|
dcReady = &t.lossyDCOpened
|
|
kind = livekit.DataPacket_LOSSY
|
|
|
|
case DataTrackDataChannel:
|
|
dcPtr = &t.dataTrackDC
|
|
isDataTrack = true
|
|
}
|
|
|
|
dc.OnOpen(func() {
|
|
rawDC, err := dc.DetachWithDeadline()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("failed to detach data channel", err)
|
|
return
|
|
}
|
|
|
|
var slowThreshold int
|
|
if dc.Label() == ReliableDataChannel || isUnlabeled {
|
|
slowThreshold = t.params.DatachannelSlowThreshold
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if isUnlabeled {
|
|
t.unlabeledDataChannels = append(
|
|
t.unlabeledDataChannels,
|
|
datachannel.NewDataChannelWriterReliable(dc, rawDC, slowThreshold),
|
|
)
|
|
} else {
|
|
if *dcPtr != nil {
|
|
(*dcPtr).Close()
|
|
}
|
|
switch {
|
|
case dcPtr == &t.reliableDC:
|
|
*dcPtr = datachannel.NewDataChannelWriterReliable(dc, rawDC, slowThreshold)
|
|
case dcPtr == &t.lossyDC:
|
|
*dcPtr = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, t.params.DatachannelLossyTargetLatency, uint64(lossyDataChannelMinBufferedAmount))
|
|
case dcPtr == &t.dataTrackDC:
|
|
*dcPtr = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, 0, 0)
|
|
}
|
|
if dcReady != nil {
|
|
*dcReady = true
|
|
}
|
|
}
|
|
t.lock.Unlock()
|
|
t.params.Logger.Debugw(dc.Label() + " data channel open")
|
|
|
|
go func() {
|
|
defer rawDC.Close()
|
|
buffer := make([]byte, dataChannelBufferSize)
|
|
for {
|
|
n, _, err := rawDC.ReadDataChannel(buffer)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
|
|
t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
|
|
}
|
|
return
|
|
}
|
|
|
|
switch {
|
|
case isUnlabeled:
|
|
t.params.Handler.OnDataMessageUnlabeled(buffer[:n])
|
|
|
|
case isDataTrack:
|
|
t.params.Handler.OnDataTrackMessage(buffer[:n], mono.UnixNano())
|
|
|
|
default:
|
|
t.params.Handler.OnDataMessage(kind, buffer[:n])
|
|
}
|
|
}
|
|
}()
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// for testing only
|
|
func (t *PCTransport) CreateReadableDataChannel(label string, dci *webrtc.DataChannelInit) error {
|
|
dc, err := t.pc.CreateDataChannel(label, dci)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dc.OnOpen(func() {
|
|
t.params.Logger.Debugw(dc.Label() + " data channel open")
|
|
rawDC, err := dc.DetachWithDeadline()
|
|
if err != nil {
|
|
t.params.Logger.Errorw("failed to detach data channel", err, "label", dc.Label())
|
|
return
|
|
}
|
|
|
|
t.lock.Lock()
|
|
t.unlabeledDataChannels = append(
|
|
t.unlabeledDataChannels,
|
|
datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold),
|
|
)
|
|
t.lock.Unlock()
|
|
|
|
go func() {
|
|
defer rawDC.Close()
|
|
buffer := make([]byte, dataChannelBufferSize)
|
|
for {
|
|
n, _, err := rawDC.ReadDataChannel(buffer)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
|
|
t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
|
|
}
|
|
return
|
|
}
|
|
|
|
t.params.Handler.OnDataMessageUnlabeled(buffer[:n])
|
|
}
|
|
}()
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) CreateDataChannelIfEmpty(dcLabel string, dci *webrtc.DataChannelInit) (label string, id uint16, existing bool, err error) {
|
|
if dcLabel == DataTrackDataChannel && !t.params.EnableDataTracks {
|
|
t.params.Logger.Debugw("data tracks not enabled")
|
|
err = errors.New("data tracks not enabled")
|
|
return
|
|
}
|
|
|
|
t.lock.RLock()
|
|
var dcw *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
switch dcLabel {
|
|
case ReliableDataChannel:
|
|
dcw = t.reliableDC
|
|
case LossyDataChannel:
|
|
dcw = t.lossyDC
|
|
case DataTrackDataChannel:
|
|
dcw = t.dataTrackDC
|
|
default:
|
|
t.params.Logger.Warnw("unknown data channel label", nil, "label", label)
|
|
err = errors.New("unknown data channel label")
|
|
}
|
|
t.lock.RUnlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if dcw != nil {
|
|
dc := dcw.BufferedAmountGetter()
|
|
return dc.Label(), *dc.ID(), true, nil
|
|
}
|
|
|
|
dc, err := t.pc.CreateDataChannel(dcLabel, dci)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
t.onDataChannel(dc)
|
|
return dc.Label(), *dc.ID(), false, nil
|
|
}
|
|
|
|
func (t *PCTransport) GetRTT() (float64, bool) {
|
|
scps, ok := t.iceTransport.GetSelectedCandidatePairStats()
|
|
if !ok {
|
|
return 0.0, false
|
|
}
|
|
|
|
return scps.CurrentRoundTripTime, true
|
|
}
|
|
|
|
func (t *PCTransport) IsEstablished() bool {
|
|
return t.pc.ConnectionState() != webrtc.PeerConnectionStateNew
|
|
}
|
|
|
|
func (t *PCTransport) HasEverConnected() bool {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return !t.firstConnectedAt.IsZero()
|
|
}
|
|
|
|
func (t *PCTransport) GetICEConnectionInfo() *types.ICEConnectionInfo {
|
|
return t.connectionDetails.GetInfo()
|
|
}
|
|
|
|
func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType {
|
|
return t.connectionDetails.GetConnectionType()
|
|
}
|
|
|
|
func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error {
|
|
return t.pc.WriteRTCP(pkts)
|
|
}
|
|
|
|
func (t *PCTransport) SendDataMessage(kind livekit.DataPacket_Kind, data []byte) error {
|
|
convertFromUserPacket := false
|
|
var dc *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
t.lock.RLock()
|
|
if t.params.UseOneShotSignallingMode {
|
|
if len(t.unlabeledDataChannels) > 0 {
|
|
// use the first unlabeled to send
|
|
dc = t.unlabeledDataChannels[0]
|
|
}
|
|
convertFromUserPacket = true
|
|
} else {
|
|
if kind == livekit.DataPacket_RELIABLE {
|
|
dc = t.reliableDC
|
|
} else {
|
|
dc = t.lossyDC
|
|
}
|
|
}
|
|
t.lock.RUnlock()
|
|
|
|
if convertFromUserPacket {
|
|
dp := &livekit.DataPacket{}
|
|
if err := proto.Unmarshal(data, dp); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch payload := dp.Value.(type) {
|
|
case *livekit.DataPacket_User:
|
|
return t.sendDataMessage(dc, payload.User.Payload)
|
|
default:
|
|
return errors.New("cannot forward non user data packet")
|
|
}
|
|
}
|
|
|
|
return t.sendDataMessage(dc, data)
|
|
}
|
|
|
|
func (t *PCTransport) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error {
|
|
convertToUserPacket := false
|
|
var dc *datachannel.DataChannelWriter[*webrtc.DataChannel]
|
|
t.lock.RLock()
|
|
if t.params.UseOneShotSignallingMode || useRaw {
|
|
if len(t.unlabeledDataChannels) > 0 {
|
|
// use the first unlabeled to send
|
|
dc = t.unlabeledDataChannels[0]
|
|
}
|
|
} else {
|
|
if t.reliableDC != nil {
|
|
dc = t.reliableDC
|
|
} else if t.lossyDC != nil {
|
|
dc = t.lossyDC
|
|
}
|
|
|
|
convertToUserPacket = true
|
|
}
|
|
t.lock.RUnlock()
|
|
|
|
if convertToUserPacket {
|
|
dpData, err := proto.Marshal(&livekit.DataPacket{
|
|
ParticipantIdentity: string(sender),
|
|
Value: &livekit.DataPacket_User{
|
|
User: &livekit.UserPacket{Payload: data},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return t.sendDataMessage(dc, dpData)
|
|
}
|
|
|
|
return t.sendDataMessage(dc, data)
|
|
}
|
|
|
|
func (t *PCTransport) SendDataTrackMessage(data []byte) error {
|
|
t.lock.RLock()
|
|
dc := t.dataTrackDC
|
|
t.lock.RUnlock()
|
|
|
|
return t.sendDataMessage(dc, data)
|
|
}
|
|
|
|
func (t *PCTransport) sendDataMessage(dc *datachannel.DataChannelWriter[*webrtc.DataChannel], data []byte) error {
|
|
if dc == nil {
|
|
return ErrDataChannelUnavailable
|
|
}
|
|
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateFailed {
|
|
return ErrTransportFailure
|
|
}
|
|
|
|
if t.params.DatachannelSlowThreshold == 0 && t.params.DataChannelMaxBufferedAmount > 0 && dc.BufferedAmountGetter().BufferedAmount() > t.params.DataChannelMaxBufferedAmount {
|
|
return ErrDataChannelBufferFull
|
|
}
|
|
_, err := dc.Write(data)
|
|
return err
|
|
}
|
|
|
|
func (t *PCTransport) Close() {
|
|
if t.isClosed.Swap(true) {
|
|
return
|
|
}
|
|
|
|
<-t.eventsQueue.Stop()
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
if t.streamAllocator != nil {
|
|
t.streamAllocator.Stop()
|
|
}
|
|
|
|
if t.pacer != nil {
|
|
t.pacer.Stop()
|
|
}
|
|
|
|
t.clearConnTimer()
|
|
|
|
t.lock.Lock()
|
|
if t.mayFailedICEStatsTimer != nil {
|
|
t.mayFailedICEStatsTimer.Stop()
|
|
t.mayFailedICEStatsTimer = nil
|
|
}
|
|
|
|
if t.reliableDC != nil {
|
|
t.reliableDC.Close()
|
|
t.reliableDC = nil
|
|
}
|
|
|
|
if t.lossyDC != nil {
|
|
t.lossyDC.Close()
|
|
t.lossyDC = nil
|
|
}
|
|
|
|
if t.dataTrackDC != nil {
|
|
t.dataTrackDC.Close()
|
|
t.dataTrackDC = nil
|
|
}
|
|
|
|
for _, dc := range t.unlabeledDataChannels {
|
|
dc.Close()
|
|
}
|
|
t.unlabeledDataChannels = nil
|
|
t.lock.Unlock()
|
|
|
|
if err := t.pc.Close(); err != nil {
|
|
t.params.Logger.Warnw("unclean close of peer connection", err)
|
|
}
|
|
|
|
t.outputAndClearICEStats()
|
|
}
|
|
|
|
func (t *PCTransport) clearConnTimer() {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if t.connectAfterICETimer != nil {
|
|
t.connectAfterICETimer.Stop()
|
|
t.connectAfterICETimer = nil
|
|
}
|
|
|
|
if t.tcpICETimer != nil {
|
|
t.tcpICETimer.Stop()
|
|
t.tcpICETimer = nil
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription, remoteId uint32) error {
|
|
if t.params.UseOneShotSignallingMode {
|
|
if sd.Type == webrtc.SDPTypeOffer {
|
|
remoteOfferId := t.remoteOfferId.Load()
|
|
if remoteOfferId != 0 && remoteOfferId != t.localAnswerId.Load() {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: multiple offers without answer", nil,
|
|
"remoteOfferId", remoteOfferId,
|
|
"localAnswerId", t.localAnswerId.Load(),
|
|
"receivedRemoteOfferId", remoteId,
|
|
)
|
|
}
|
|
t.remoteOfferId.Store(remoteId)
|
|
} else {
|
|
if remoteId != 0 && remoteId != t.localOfferId.Load() {
|
|
t.params.Logger.Warnw("sdp state: answer id mismatch", nil, "expected", t.localOfferId.Load(), "got", remoteId)
|
|
}
|
|
t.remoteAnswerId.Store(remoteId)
|
|
}
|
|
|
|
// add remote candidates to ICE connection details
|
|
parsed, err := sd.Unmarshal()
|
|
if err == nil {
|
|
addRemoteICECandidates := func(attrs []sdp.Attribute) {
|
|
for _, a := range attrs {
|
|
if a.IsICECandidate() {
|
|
c, err := ice.UnmarshalCandidate(a.Value)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
addRemoteICECandidates(parsed.Attributes)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
addRemoteICECandidates(m.Attributes)
|
|
}
|
|
}
|
|
|
|
err = t.pc.SetRemoteDescription(sd)
|
|
if err != nil {
|
|
t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err)
|
|
return err
|
|
}
|
|
|
|
rtxRepairs := nonSimulcastRTXRepairsFromSDP(parsed, t.params.Logger)
|
|
if len(rtxRepairs) > 0 {
|
|
t.params.Logger.Debugw("rtx pairs found from sdp", "ssrcs", rtxRepairs)
|
|
for repair, base := range rtxRepairs {
|
|
t.params.Config.BufferFactory.SetRTXPair(repair, base, "")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
t.postEvent(event{
|
|
signal: signalRemoteDescriptionReceived,
|
|
data: remoteDescriptionData{
|
|
sessionDescription: &sd,
|
|
remoteId: remoteId,
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, uint32, error) {
|
|
if !t.params.UseOneShotSignallingMode {
|
|
return webrtc.SessionDescription{}, 0, ErrNotSynchronousLocalCandidatesMode
|
|
}
|
|
|
|
prd := t.pc.PendingRemoteDescription()
|
|
if prd == nil || prd.Type != webrtc.SDPTypeOffer {
|
|
return webrtc.SessionDescription{}, 0, ErrNoRemoteDescription
|
|
}
|
|
|
|
answer, err := t.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
return webrtc.SessionDescription{}, 0, err
|
|
}
|
|
|
|
if err = t.pc.SetLocalDescription(answer); err != nil {
|
|
return webrtc.SessionDescription{}, 0, err
|
|
}
|
|
|
|
// wait for gathering to complete to include all candidates in the answer
|
|
<-webrtc.GatheringCompletePromise(t.pc)
|
|
|
|
cld := t.pc.CurrentLocalDescription()
|
|
|
|
// add local candidates to ICE connection details
|
|
parsed, err := cld.Unmarshal()
|
|
if err == nil {
|
|
addLocalICECandidates := func(attrs []sdp.Attribute) {
|
|
for _, a := range attrs {
|
|
if a.IsICECandidate() {
|
|
c, err := ice.UnmarshalCandidate(a.Value)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
t.connectionDetails.AddLocalICECandidate(c, false, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
addLocalICECandidates(parsed.Attributes)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
addLocalICECandidates(m.Attributes)
|
|
}
|
|
}
|
|
|
|
answerId := t.remoteOfferId.Load()
|
|
t.localAnswerId.Store(answerId)
|
|
|
|
return *cld, answerId, nil
|
|
}
|
|
|
|
func (t *PCTransport) GetICESessionUfrag() (string, error) {
|
|
cld := t.pc.CurrentLocalDescription()
|
|
if cld == nil {
|
|
return "", ErrNoLocalDescription
|
|
}
|
|
|
|
parsed, err := cld.Unmarshal()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
ufrag, _, err := lksdp.ExtractICECredential(parsed)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return ufrag, nil
|
|
}
|
|
|
|
// Handles SDP Fragment for ICE Trickle in WHIP
|
|
func (t *PCTransport) HandleICETrickleSDPFragment(sdpFragment string) error {
|
|
if !t.params.UseOneShotSignallingMode {
|
|
return ErrNotSynchronousLocalCandidatesMode
|
|
}
|
|
|
|
parsedFragment := &lksdp.SDPFragment{}
|
|
if err := parsedFragment.Unmarshal(sdpFragment); err != nil {
|
|
t.params.Logger.Warnw("could not parse SDP fragment", err, "sdpFragment", sdpFragment)
|
|
return ErrInvalidSDPFragment
|
|
}
|
|
|
|
crd := t.pc.CurrentRemoteDescription()
|
|
if crd == nil {
|
|
t.params.Logger.Warnw("no remote description", nil)
|
|
return ErrNoRemoteDescription
|
|
}
|
|
|
|
parsedRemote, err := crd.Unmarshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not parse remote description", err, "offer", crd)
|
|
return err
|
|
}
|
|
|
|
// check if BUNDLE mid matches the "mid" in the SDP fragment
|
|
bundleMid, found := lksdp.GetBundleMid(parsedRemote)
|
|
if !found {
|
|
return ErrNoBundleMid
|
|
}
|
|
|
|
if parsedFragment.Mid() != bundleMid {
|
|
t.params.Logger.Warnw("incorrect mid", nil, "sdpFragment", sdpFragment)
|
|
return ErrMidMismatch
|
|
}
|
|
|
|
fragmentICEUfrag, fragmentICEPwd, err := parsedFragment.ExtractICECredential()
|
|
if err != nil {
|
|
t.params.Logger.Warnw(
|
|
"could not get ICE crendential from fragment", err,
|
|
"sdpFragment", sdpFragment,
|
|
)
|
|
return ErrInvalidSDPFragment
|
|
}
|
|
remoteICEUfrag, remoteICEPwd, err := lksdp.ExtractICECredential(parsedRemote)
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not get ICE crendential from remote description", err, "sdpFragment", sdpFragment, "remoteDescription", crd)
|
|
return err
|
|
}
|
|
if fragmentICEUfrag != "" && fragmentICEUfrag != remoteICEUfrag {
|
|
t.params.Logger.Warnw(
|
|
"ice ufrag mismatch", nil,
|
|
"remoteICEUfrag", remoteICEUfrag,
|
|
"fragmentICEUfrag", fragmentICEUfrag,
|
|
"sdpFragment", sdpFragment,
|
|
"remoteDescription", crd,
|
|
)
|
|
return ErrICECredentialMismatch
|
|
}
|
|
if fragmentICEPwd != "" && fragmentICEPwd != remoteICEPwd {
|
|
t.params.Logger.Warnw(
|
|
"ice pwd mismatch", nil,
|
|
"remoteICEPwd", remoteICEPwd,
|
|
"fragmentICEPwd", fragmentICEPwd,
|
|
"sdpFragment", sdpFragment,
|
|
"remoteDescription", crd,
|
|
)
|
|
return ErrICECredentialMismatch
|
|
}
|
|
|
|
// add candidates from media description
|
|
for _, ic := range parsedFragment.Candidates() {
|
|
c, err := ice.UnmarshalCandidate(ic)
|
|
if err == nil {
|
|
t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
|
|
}
|
|
|
|
candidate := webrtc.ICECandidateInit{
|
|
Candidate: ic,
|
|
}
|
|
if err := t.pc.AddICECandidate(candidate); err != nil {
|
|
t.params.Logger.Warnw("failed to add ICE candidate", err, "candidate", candidate)
|
|
} else {
|
|
t.params.Logger.Debugw("added ICE candidate", "candidate", candidate)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Handles SDP Fragment for ICE Restart in WHIP
|
|
func (t *PCTransport) HandleICERestartSDPFragment(sdpFragment string) (string, error) {
|
|
if !t.params.UseOneShotSignallingMode {
|
|
return "", ErrNotSynchronousLocalCandidatesMode
|
|
}
|
|
|
|
parsedFragment := &lksdp.SDPFragment{}
|
|
if err := parsedFragment.Unmarshal(sdpFragment); err != nil {
|
|
t.params.Logger.Warnw("could not parse SDP fragment", err, "sdpFragment", sdpFragment)
|
|
return "", ErrInvalidSDPFragment
|
|
}
|
|
|
|
crd := t.pc.CurrentRemoteDescription()
|
|
if crd == nil {
|
|
t.params.Logger.Warnw("no remote description", nil)
|
|
return "", ErrNoRemoteDescription
|
|
}
|
|
|
|
parsedRemote, err := crd.Unmarshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not parse remote description", err, "offer", crd)
|
|
return "", err
|
|
}
|
|
|
|
if err := parsedFragment.PatchICECredentialAndCandidatesIntoSDP(parsedRemote); err != nil {
|
|
t.params.Logger.Warnw("could not patch SDP fragment into remote description", err, "offer", crd, "sdpFragment", sdpFragment)
|
|
return "", err
|
|
}
|
|
|
|
bytes, err := parsedRemote.Marshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not marshal SDP with patched remote", err)
|
|
return "", err
|
|
}
|
|
sd := webrtc.SessionDescription{
|
|
SDP: string(bytes),
|
|
Type: webrtc.SDPTypeOffer,
|
|
}
|
|
if err := t.pc.SetRemoteDescription(sd); err != nil {
|
|
t.params.Logger.Warnw("could not set remote description", err)
|
|
return "", err
|
|
}
|
|
|
|
// clear out connection details on ICE restart and re-populate
|
|
t.connectionDetails.Clear()
|
|
for _, candidate := range parsedFragment.Candidates() {
|
|
c, err := ice.UnmarshalCandidate(candidate)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
|
|
}
|
|
|
|
ans, err := t.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not create answer", err)
|
|
return "", err
|
|
}
|
|
|
|
if err = t.pc.SetLocalDescription(ans); err != nil {
|
|
t.params.Logger.Warnw("could not set local description", err)
|
|
return "", err
|
|
}
|
|
|
|
// wait for gathering to complete to include all candidates in the answer
|
|
<-webrtc.GatheringCompletePromise(t.pc)
|
|
|
|
cld := t.pc.CurrentLocalDescription()
|
|
|
|
// add local candidates to ICE connection details
|
|
parsedAnswer, err := cld.Unmarshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not parse local description", err)
|
|
return "", err
|
|
}
|
|
|
|
addLocalICECandidates := func(attrs []sdp.Attribute) {
|
|
for _, a := range attrs {
|
|
if a.IsICECandidate() {
|
|
c, err := ice.UnmarshalCandidate(a.Value)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
t.connectionDetails.AddLocalICECandidate(c, false, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
addLocalICECandidates(parsedAnswer.Attributes)
|
|
for _, m := range parsedAnswer.MediaDescriptions {
|
|
addLocalICECandidates(m.Attributes)
|
|
}
|
|
|
|
parsedFragmentAnswer, err := lksdp.ExtractSDPFragment(parsedAnswer)
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not extract SDP fragment", err)
|
|
return "", err
|
|
}
|
|
|
|
answerFragment, err := parsedFragmentAnswer.Marshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not marshal answer SDP fragment", err)
|
|
return "", err
|
|
}
|
|
|
|
return answerFragment, nil
|
|
}
|
|
|
|
func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {
|
|
t.lock.Lock()
|
|
t.onNegotiationStateChanged = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnNegotiationStateChanged() func(state transport.NegotiationState) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onNegotiationStateChanged
|
|
}
|
|
|
|
func (t *PCTransport) Negotiate(force bool) {
|
|
if t.isClosed.Load() {
|
|
return
|
|
}
|
|
|
|
var postEvent bool
|
|
t.lock.Lock()
|
|
if force {
|
|
t.debouncedNegotiate.Add(func() {
|
|
// no op to cancel pending negotiation
|
|
})
|
|
t.debouncePending = false
|
|
t.updateLastNegotiateLocked()
|
|
|
|
postEvent = true
|
|
} else {
|
|
if !t.debouncePending {
|
|
if time.Since(t.lastNegotiate) > negotiationFrequency {
|
|
t.debouncedNegotiate.SetDuration(fastNegotiationFrequency)
|
|
} else {
|
|
t.debouncedNegotiate.SetDuration(negotiationFrequency)
|
|
}
|
|
|
|
t.debouncedNegotiate.Add(func() {
|
|
t.lock.Lock()
|
|
t.debouncePending = false
|
|
t.updateLastNegotiateLocked()
|
|
t.lock.Unlock()
|
|
|
|
t.postEvent(event{
|
|
signal: signalSendOffer,
|
|
})
|
|
})
|
|
t.debouncePending = true
|
|
}
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
if postEvent {
|
|
t.postEvent(event{
|
|
signal: signalSendOffer,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) updateLastNegotiateLocked() {
|
|
if now := time.Now(); now.After(t.lastNegotiate) {
|
|
t.lastNegotiate = now
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) ICERestart() error {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
t.params.Logger.Warnw("trying to restart ICE on closed peer connection", nil)
|
|
return ErrIceRestartOnClosedPeerConnection
|
|
}
|
|
|
|
t.postEvent(event{
|
|
signal: signalICERestart,
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) ResetShortConnOnICERestart() {
|
|
t.resetShortConnOnICERestart.Store(true)
|
|
}
|
|
|
|
func (t *PCTransport) AddTrackToStreamAllocator(subTrack types.SubscribedTrack) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
layers := buffer.GetVideoLayersForMimeType(
|
|
subTrack.DownTrack().Mime(),
|
|
subTrack.MediaTrack().ToProto(),
|
|
)
|
|
t.streamAllocator.AddTrack(subTrack.DownTrack(), streamallocator.AddTrackParams{
|
|
Source: subTrack.MediaTrack().Source(),
|
|
IsMultiLayered: len(layers) > 1,
|
|
PublisherID: subTrack.MediaTrack().PublisherID(),
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTrack) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.RemoveTrack(subTrack.DownTrack())
|
|
}
|
|
|
|
func (t *PCTransport) SetAllowPauseOfStreamAllocator(allowPause bool) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.SetAllowPause(allowPause)
|
|
}
|
|
|
|
func (t *PCTransport) SetChannelCapacityOfStreamAllocator(channelCapacity int64) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.SetChannelCapacity(channelCapacity)
|
|
}
|
|
|
|
func (t *PCTransport) preparePC(previousAnswer webrtc.SessionDescription) error {
|
|
// sticky data channel to first m-lines, if someday we don't send sdp without media streams to
|
|
// client's subscribe pc after joining, should change this step
|
|
parsed, err := previousAnswer.Unmarshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fp, fpHahs, err := lksdp.ExtractFingerprint(parsed)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
offer, err := t.pc.CreateOffer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := t.pc.SetLocalDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
|
|
//
|
|
// Simulate client side peer connection and set DTLS role from previous answer.
|
|
// Role needs to be set properly (one side needs to be server and the other side
|
|
// needs to be the client) for DTLS connection to form properly. As this is
|
|
// trying to replicate previous setup, read from previous answer and use that role.
|
|
//
|
|
se := webrtc.SettingEngine{}
|
|
_ = se.SetAnsweringDTLSRole(lksdp.ExtractDTLSRole(parsed))
|
|
se.SetIgnoreRidPauseForRecv(true)
|
|
api := webrtc.NewAPI(
|
|
webrtc.WithSettingEngine(se),
|
|
webrtc.WithMediaEngine(t.me),
|
|
)
|
|
pc2, err := api.NewPeerConnection(webrtc.Configuration{
|
|
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer pc2.Close()
|
|
|
|
if err := pc2.SetRemoteDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
ans, err := pc2.CreateAnswer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// replace client's fingerprint into dummy pc's answer, for pion's dtls process, it will
|
|
// keep the fingerprint at first call of SetRemoteDescription, if dummy pc and client pc use
|
|
// different fingerprint, that will cause pion denied dtls data after handshake with client
|
|
// complete (can't pass fingerprint change).
|
|
// in this step, we don't established connection with dummy pc(no candidate swap), just use
|
|
// sdp negotiation to sticky data channel and keep client's fingerprint
|
|
parsedAns, _ := ans.Unmarshal()
|
|
fpLine := fpHahs + " " + fp
|
|
replaceFP := func(attrs []sdp.Attribute, fpLine string) {
|
|
for k := range attrs {
|
|
if attrs[k].Key == "fingerprint" {
|
|
attrs[k].Value = fpLine
|
|
}
|
|
}
|
|
}
|
|
replaceFP(parsedAns.Attributes, fpLine)
|
|
for _, m := range parsedAns.MediaDescriptions {
|
|
replaceFP(m.Attributes, fpLine)
|
|
}
|
|
bytes, err := parsedAns.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ans.SDP = string(bytes)
|
|
|
|
return t.pc.SetRemoteDescription(ans)
|
|
}
|
|
|
|
func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDescription) (map[string]*webrtc.RTPSender, error) {
|
|
senders := make(map[string]*webrtc.RTPSender)
|
|
parsed, err := previousAnswer.Unmarshal()
|
|
if err != nil {
|
|
return senders, err
|
|
}
|
|
for _, m := range parsed.MediaDescriptions {
|
|
var codecType webrtc.RTPCodecType
|
|
switch m.MediaName.Media {
|
|
case "video":
|
|
codecType = webrtc.RTPCodecTypeVideo
|
|
case "audio":
|
|
codecType = webrtc.RTPCodecTypeAudio
|
|
case "application":
|
|
if t.params.IsOfferer {
|
|
// for pion generate unmatched sdp, it always appends data channel to last m-lines,
|
|
// that not consistent with our previous answer that data channel might at middle-line
|
|
// because sdp can negotiate multi times before migration.(it will sticky to the last m-line at first negotiate)
|
|
// so use a dummy pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer
|
|
if err := t.preparePC(previousAnswer); err != nil {
|
|
t.params.Logger.Warnw("prepare pc for migration failed", err)
|
|
return senders, err
|
|
}
|
|
}
|
|
continue
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if !t.params.IsOfferer {
|
|
// `sendrecv` or `sendonly` means this transceiver is used for sending
|
|
|
|
// Note that a transceiver previously used to send could be `inactive`.
|
|
// Let those transceivers be created when remote description is set.
|
|
_, ok1 := m.Attribute(webrtc.RTPTransceiverDirectionSendrecv.String())
|
|
_, ok2 := m.Attribute(webrtc.RTPTransceiverDirectionSendonly.String())
|
|
if !ok1 && !ok2 {
|
|
continue
|
|
}
|
|
}
|
|
|
|
tr, err := t.pc.AddTransceiverFromKind(
|
|
codecType,
|
|
webrtc.RTPTransceiverInit{
|
|
Direction: webrtc.RTPTransceiverDirectionSendonly,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return senders, err
|
|
}
|
|
mid := lksdp.GetMidValue(m)
|
|
if mid == "" {
|
|
return senders, ErrMidNotFound
|
|
}
|
|
tr.SetMid(mid)
|
|
|
|
// save mid -> senders for migration reuse
|
|
sender := tr.Sender()
|
|
senders[mid] = sender
|
|
|
|
// set transceiver to inactive
|
|
tr.SetSender(sender, nil)
|
|
}
|
|
return senders, nil
|
|
}
|
|
|
|
func (t *PCTransport) SetPreviousSdp(localDescription, remoteDescription *webrtc.SessionDescription) {
|
|
// when there is no answer, cannot migrate, force a full reconnect
|
|
if (t.params.IsOfferer && remoteDescription == nil) || (!t.params.IsOfferer && localDescription == nil) {
|
|
t.onNegotiationFailed(true, "no previous answer")
|
|
return
|
|
}
|
|
|
|
t.lock.Lock()
|
|
var (
|
|
senders map[string]*webrtc.RTPSender
|
|
err error
|
|
parseMids bool
|
|
)
|
|
if t.params.IsOfferer {
|
|
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
|
|
t.previousAnswer = remoteDescription
|
|
senders, err = t.initPCWithPreviousAnswer(*remoteDescription)
|
|
parseMids = true
|
|
}
|
|
} else {
|
|
if t.pc.LocalDescription() == nil {
|
|
senders, err = t.initPCWithPreviousAnswer(*localDescription)
|
|
parseMids = true
|
|
}
|
|
}
|
|
if err != nil {
|
|
t.lock.Unlock()
|
|
t.onNegotiationFailed(true, fmt.Sprintf("initPCWithPreviousAnswer failed, error: %s", err))
|
|
return
|
|
}
|
|
|
|
if localDescription != nil && parseMids {
|
|
// in migration case, can't reuse transceiver before negotiating excepted tracks
|
|
// that were subscribed at previous node
|
|
t.canReuseTransceiver = false
|
|
if err := t.parseTrackMid(*localDescription, senders); err != nil {
|
|
t.params.Logger.Warnw(
|
|
"parse previous local description failed", err,
|
|
"localDescription", localDescription.SDP,
|
|
)
|
|
}
|
|
}
|
|
|
|
if t.params.IsOfferer {
|
|
// disable fast negotiation temporarily after migration to avoid sending offer
|
|
// contains part of subscribed tracks before migration, let the subscribed track
|
|
// resume at the same time.
|
|
t.lastNegotiate = time.Now().Add(iceFailedTimeoutTotal)
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) parseTrackMid(sd webrtc.SessionDescription, senders map[string]*webrtc.RTPSender) error {
|
|
parsed, err := sd.Unmarshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.previousTrackDescription = make(map[string]*trackDescription)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
msid, ok := m.Attribute(sdp.AttrKeyMsid)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if split := strings.Split(msid, " "); len(split) == 2 {
|
|
trackID := split[1]
|
|
mid := lksdp.GetMidValue(m)
|
|
if mid == "" {
|
|
return ErrMidNotFound
|
|
}
|
|
if sender, ok := senders[mid]; ok {
|
|
t.previousTrackDescription[trackID] = &trackDescription{mid, sender}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) postEvent(e event) {
|
|
e.PCTransport = t
|
|
t.eventsQueue.Enqueue(func(e event) {
|
|
var err error
|
|
switch e.signal {
|
|
case signalICEGatheringComplete:
|
|
err = e.handleICEGatheringComplete(e)
|
|
case signalLocalICECandidate:
|
|
err = e.handleLocalICECandidate(e)
|
|
case signalRemoteICECandidate:
|
|
err = e.handleRemoteICECandidate(e)
|
|
case signalSendOffer:
|
|
err = e.handleSendOffer(e)
|
|
case signalRemoteDescriptionReceived:
|
|
err = e.handleRemoteDescriptionReceived(e)
|
|
case signalICERestart:
|
|
err = e.handleICERestart(e)
|
|
}
|
|
if err != nil {
|
|
if !e.isClosed.Load() {
|
|
e.onNegotiationFailed(true, fmt.Sprintf("error handling event. err: %s, event: %s", err, e))
|
|
}
|
|
}
|
|
}, e)
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringComplete(_ event) error {
|
|
if t.params.IsOfferer {
|
|
return t.handleICEGatheringCompleteOfferer()
|
|
} else {
|
|
return t.handleICEGatheringCompleteAnswerer()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringCompleteOfferer() error {
|
|
if !t.restartAfterGathering {
|
|
return nil
|
|
}
|
|
|
|
t.params.Logger.Debugw("restarting ICE after ICE gathering")
|
|
t.restartAfterGathering = false
|
|
return t.doICERestart()
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringCompleteAnswerer() error {
|
|
if t.pendingRestartIceOffer == nil {
|
|
return nil
|
|
}
|
|
|
|
offer := *t.pendingRestartIceOffer
|
|
t.pendingRestartIceOffer = nil
|
|
|
|
t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering")
|
|
if err := t.setRemoteDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
t.params.Handler.OnSetRemoteDescriptionOffer()
|
|
t.processSendersPendingConfig()
|
|
|
|
return t.createAndSendAnswer()
|
|
}
|
|
|
|
func (t *PCTransport) localDescriptionSent() error {
|
|
if !t.cacheLocalCandidates {
|
|
return nil
|
|
}
|
|
|
|
t.cacheLocalCandidates = false
|
|
|
|
cachedLocalCandidates := t.cachedLocalCandidates
|
|
t.cachedLocalCandidates = nil
|
|
|
|
for _, c := range cachedLocalCandidates {
|
|
if err := t.params.Handler.OnICECandidate(c, t.params.Transport); err != nil {
|
|
t.params.Logger.Warnw("failed to send cached ICE candidate", err, "candidate", c)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) clearLocalDescriptionSent() {
|
|
t.cacheLocalCandidates = true
|
|
t.cachedLocalCandidates = nil
|
|
t.connectionDetails.Clear()
|
|
}
|
|
|
|
func (t *PCTransport) handleLocalICECandidate(e event) error {
|
|
c := e.data.(*webrtc.ICECandidate)
|
|
|
|
filtered := false
|
|
if c != nil {
|
|
if t.preferTCP.Load() && c.Protocol != webrtc.ICEProtocolTCP {
|
|
t.params.Logger.Debugw("filtering out local candidate", "candidate", c.String())
|
|
filtered = true
|
|
}
|
|
t.connectionDetails.AddLocalCandidate(c, filtered, true)
|
|
}
|
|
|
|
if filtered {
|
|
return nil
|
|
}
|
|
|
|
if t.cacheLocalCandidates {
|
|
t.cachedLocalCandidates = append(t.cachedLocalCandidates, c)
|
|
return nil
|
|
}
|
|
|
|
if err := t.params.Handler.OnICECandidate(c, t.params.Transport); err != nil {
|
|
t.params.Logger.Warnw("failed to send ICE candidate", err, "candidate", c)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteICECandidate(e event) error {
|
|
c := e.data.(*webrtc.ICECandidateInit)
|
|
|
|
filtered := false
|
|
if t.preferTCP.Load() && !strings.Contains(strings.ToLower(c.Candidate), "tcp") {
|
|
t.params.Logger.Debugw("filtering out remote candidate", "candidate", c.Candidate)
|
|
filtered = true
|
|
}
|
|
|
|
if !t.params.Config.UseMDNS && types.IsCandidateMDNS(*c) {
|
|
t.params.Logger.Debugw("ignoring mDNS candidate", "candidate", c.Candidate)
|
|
filtered = true
|
|
}
|
|
|
|
t.connectionDetails.AddRemoteCandidate(*c, filtered, true, false)
|
|
if filtered {
|
|
return nil
|
|
}
|
|
|
|
if t.pc.RemoteDescription() == nil {
|
|
t.pendingRemoteCandidates = append(t.pendingRemoteCandidates, c)
|
|
return nil
|
|
}
|
|
|
|
if err := t.pc.AddICECandidate(*c); err != nil {
|
|
t.params.Logger.Warnw("failed to add ICE candidate", err, "candidate", c)
|
|
// ignore ParseAddr error as it does not affect ICE connectivity
|
|
if !strings.Contains(err.Error(), "ParseAddr") {
|
|
return errors.Wrap(err, "add ice candidate failed")
|
|
}
|
|
} else {
|
|
t.params.Logger.Debugw("added ICE candidate", "candidate", c)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) setNegotiationState(state transport.NegotiationState) {
|
|
t.negotiationState = state
|
|
if onNegotiationStateChanged := t.getOnNegotiationStateChanged(); onNegotiationStateChanged != nil {
|
|
onNegotiationStateChanged(t.negotiationState)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP, isLocal bool) webrtc.SessionDescription {
|
|
parsed, err := sd.Unmarshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not unmarshal SDP to filter candidates", err)
|
|
return sd
|
|
}
|
|
|
|
filterAttributes := func(attrs []sdp.Attribute) []sdp.Attribute {
|
|
filteredAttrs := make([]sdp.Attribute, 0, len(attrs))
|
|
for _, a := range attrs {
|
|
if a.IsICECandidate() {
|
|
c, err := ice.UnmarshalCandidate(a.Value)
|
|
if err != nil {
|
|
t.params.Logger.Errorw("failed to unmarshal candidate in sdp", err, "isLocal", isLocal, "sdp", sd.SDP)
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
continue
|
|
}
|
|
excluded := preferTCP && !c.NetworkType().IsTCP()
|
|
if !excluded {
|
|
if !t.params.Config.UseMDNS && types.IsICECandidateMDNS(c) {
|
|
excluded = true
|
|
}
|
|
}
|
|
if !excluded {
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
}
|
|
|
|
if isLocal {
|
|
t.connectionDetails.AddLocalICECandidate(c, excluded, false)
|
|
} else {
|
|
t.connectionDetails.AddRemoteICECandidate(c, excluded, false, false)
|
|
}
|
|
} else {
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
}
|
|
}
|
|
|
|
return filteredAttrs
|
|
}
|
|
|
|
parsed.Attributes = filterAttributes(parsed.Attributes)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
m.Attributes = filterAttributes(m.Attributes)
|
|
}
|
|
|
|
bytes, err := parsed.Marshal()
|
|
if err != nil {
|
|
t.params.Logger.Warnw("could not marshal SDP to filter candidates", err)
|
|
return sd
|
|
}
|
|
sd.SDP = string(bytes)
|
|
return sd
|
|
}
|
|
|
|
func (t *PCTransport) clearSignalStateCheckTimer() {
|
|
if t.signalStateCheckTimer != nil {
|
|
t.signalStateCheckTimer.Stop()
|
|
t.signalStateCheckTimer = nil
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) setupSignalStateCheckTimer() {
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
negotiateVersion := t.negotiateCounter.Inc()
|
|
t.signalStateCheckTimer = time.AfterFunc(negotiationFailedTimeout, func() {
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
failed := t.negotiationState != transport.NegotiationStateNone
|
|
|
|
if t.negotiateCounter.Load() == negotiateVersion && failed && t.pc.ConnectionState() == webrtc.PeerConnectionStateConnected {
|
|
t.onNegotiationFailed(false, "negotiation timed out")
|
|
}
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) adjustNumOutstandingMedia(transceiver *webrtc.RTPTransceiver) {
|
|
if transceiver.Mid() != "" {
|
|
return
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if transceiver.Kind() == webrtc.RTPCodecTypeAudio {
|
|
t.numOutstandingAudios++
|
|
} else {
|
|
t.numOutstandingVideos++
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) sendUnmatchedMediaRequirement(force bool) error {
|
|
// if there are unmatched media sections, notify remote peer to generate offer with
|
|
// enough media section in subsequent offers
|
|
t.lock.Lock()
|
|
numAudios := t.numOutstandingAudios - t.numRequestSentAudios
|
|
t.numRequestSentAudios += numAudios
|
|
|
|
numVideos := t.numOutstandingVideos - t.numRequestSentVideos
|
|
t.numRequestSentVideos += numVideos
|
|
t.lock.Unlock()
|
|
|
|
if force || (numAudios+numVideos) != 0 {
|
|
if err := t.params.Handler.OnUnmatchedMedia(numAudios, numVideos); err != nil {
|
|
return errors.Wrap(err, "could not send unmatched media requirements")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
t.params.Logger.Warnw("trying to send offer on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
// when there's an ongoing negotiation, let it finish and not disrupt its state
|
|
if t.negotiationState == transport.NegotiationStateRemote {
|
|
t.params.Logger.Debugw("skipping negotiation, trying again later")
|
|
t.setNegotiationState(transport.NegotiationStateRetry)
|
|
return nil
|
|
} else if t.negotiationState == transport.NegotiationStateRetry {
|
|
// already set to retry, we can safely skip this attempt
|
|
return nil
|
|
}
|
|
|
|
ensureICERestart := func(options *webrtc.OfferOptions) *webrtc.OfferOptions {
|
|
if options == nil {
|
|
options = &webrtc.OfferOptions{}
|
|
}
|
|
options.ICERestart = true
|
|
return options
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if t.previousAnswer != nil {
|
|
t.previousAnswer = nil
|
|
options = ensureICERestart(options)
|
|
t.params.Logger.Infow("ice restart due to previous answer")
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
if t.restartAtNextOffer {
|
|
t.restartAtNextOffer = false
|
|
options = ensureICERestart(options)
|
|
t.params.Logger.Infow("ice restart at next offer")
|
|
}
|
|
|
|
if options != nil && options.ICERestart {
|
|
t.clearLocalDescriptionSent()
|
|
}
|
|
|
|
offer, err := t.pc.CreateOffer(options)
|
|
if err != nil {
|
|
if errors.Is(err, webrtc.ErrConnectionClosed) {
|
|
t.params.Logger.Warnw("trying to create offer on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
prometheus.RecordServiceOperationError("offer", "create")
|
|
return errors.Wrap(err, "create offer failed")
|
|
}
|
|
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local offer (unfiltered)", "sdp", offer.SDP)
|
|
}
|
|
|
|
err = t.pc.SetLocalDescription(offer)
|
|
if err != nil {
|
|
if errors.Is(err, webrtc.ErrConnectionClosed) {
|
|
t.params.Logger.Warnw("trying to set local description on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
prometheus.RecordServiceOperationError("offer", "local_description")
|
|
return errors.Wrap(err, "setting local description failed")
|
|
}
|
|
|
|
//
|
|
// Filter after setting local description as pion expects the offer
|
|
// to match between CreateOffer and SetLocalDescription.
|
|
// Filtered offer is sent to remote so that remote does not
|
|
// see filtered candidates.
|
|
//
|
|
offer = t.filterCandidates(offer, preferTCP, true)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local offer (filtered)", "sdp", offer.SDP)
|
|
}
|
|
|
|
// indicate waiting for remote
|
|
t.setNegotiationState(transport.NegotiationStateRemote)
|
|
|
|
t.setupSignalStateCheckTimer()
|
|
|
|
remoteAnswerId := t.remoteAnswerId.Load()
|
|
if remoteAnswerId != 0 && remoteAnswerId != t.localOfferId.Load() {
|
|
if options == nil || !options.ICERestart {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: sending offer before receiving answer", nil,
|
|
"localOfferId", t.localOfferId.Load(),
|
|
"remoteAnswerId", remoteAnswerId,
|
|
)
|
|
}
|
|
}
|
|
|
|
if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping()); err != nil {
|
|
prometheus.RecordServiceOperationError("offer", "write_message")
|
|
return errors.Wrap(err, "could not send offer")
|
|
}
|
|
prometheus.RecordServiceOperationSuccess("offer")
|
|
|
|
return t.localDescriptionSent()
|
|
}
|
|
|
|
func (t *PCTransport) handleSendOffer(_ event) error {
|
|
if !t.params.IsOfferer {
|
|
return t.sendUnmatchedMediaRequirement(true)
|
|
}
|
|
|
|
return t.createAndSendOffer(nil)
|
|
}
|
|
|
|
type remoteDescriptionData struct {
|
|
sessionDescription *webrtc.SessionDescription
|
|
remoteId uint32
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteDescriptionReceived(e event) error {
|
|
rdd := e.data.(remoteDescriptionData)
|
|
if rdd.sessionDescription.Type == webrtc.SDPTypeOffer {
|
|
return t.handleRemoteOfferReceived(rdd.sessionDescription, rdd.remoteId)
|
|
} else {
|
|
return t.handleRemoteAnswerReceived(rdd.sessionDescription, rdd.remoteId)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) isRemoteOfferRestartICE(parsed *sdp.SessionDescription) (string, bool, error) {
|
|
user, pwd, err := lksdp.ExtractICECredential(parsed)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
|
|
credential := fmt.Sprintf("%s:%s", user, pwd)
|
|
// ice credential changed, remote offer restart ice
|
|
restartICE := t.currentOfferIceCredential != "" && t.currentOfferIceCredential != credential
|
|
return credential, restartICE, nil
|
|
}
|
|
|
|
func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error {
|
|
// filter before setting remote description so that pion does not see filtered remote candidates
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP)
|
|
}
|
|
sd = t.filterCandidates(sd, preferTCP, false)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP)
|
|
}
|
|
|
|
if err := t.pc.SetRemoteDescription(sd); err != nil {
|
|
if errors.Is(err, webrtc.ErrConnectionClosed) {
|
|
t.params.Logger.Warnw("trying to set remote description on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
sdpType := "offer"
|
|
if sd.Type == webrtc.SDPTypeAnswer {
|
|
sdpType = "answer"
|
|
}
|
|
prometheus.RecordServiceOperationError(sdpType, "remote_description")
|
|
return errors.Wrap(err, "setting remote description failed")
|
|
} else if sd.Type == webrtc.SDPTypeAnswer {
|
|
t.lock.Lock()
|
|
if !t.canReuseTransceiver {
|
|
t.canReuseTransceiver = true
|
|
t.previousTrackDescription = make(map[string]*trackDescription)
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
for _, c := range t.pendingRemoteCandidates {
|
|
if err := t.pc.AddICECandidate(*c); err != nil {
|
|
t.params.Logger.Warnw("failed to add cached ICE candidate", err, "candidate", c)
|
|
return errors.Wrap(err, "add ice candidate failed")
|
|
} else {
|
|
t.params.Logger.Debugw("added cached ICE candidate", "candidate", c)
|
|
}
|
|
}
|
|
t.pendingRemoteCandidates = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) createAndSendAnswer() error {
|
|
numOutstandingAudios, numOutstandingVideos := t.getNumUnmatchedTransceivers()
|
|
t.lock.Lock()
|
|
t.numOutstandingAudios, t.numOutstandingVideos = numOutstandingAudios, numOutstandingVideos
|
|
t.numRequestSentAudios, t.numRequestSentVideos = 0, 0
|
|
t.lock.Unlock()
|
|
|
|
answer, err := t.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
if errors.Is(err, webrtc.ErrConnectionClosed) {
|
|
t.params.Logger.Warnw("trying to create answer on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
prometheus.RecordServiceOperationError("answer", "create")
|
|
return errors.Wrap(err, "create answer failed")
|
|
}
|
|
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local answer (unfiltered)", "sdp", answer.SDP)
|
|
}
|
|
|
|
if err = t.pc.SetLocalDescription(answer); err != nil {
|
|
prometheus.RecordServiceOperationError("answer", "local_description")
|
|
return errors.Wrap(err, "setting local description failed")
|
|
}
|
|
|
|
//
|
|
// Filter after setting local description as pion expects the answer
|
|
// to match between CreateAnswer and SetLocalDescription.
|
|
// Filtered answer is sent to remote so that remote does not
|
|
// see filtered candidates.
|
|
//
|
|
answer = t.filterCandidates(answer, preferTCP, true)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local answer (filtered)", "sdp", answer.SDP)
|
|
}
|
|
|
|
localAnswerId := t.localAnswerId.Load()
|
|
if localAnswerId != 0 && localAnswerId >= t.remoteOfferId.Load() {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: duplicate answer", nil,
|
|
"localAnswerId", localAnswerId,
|
|
"remoteOfferId", t.remoteOfferId.Load(),
|
|
)
|
|
}
|
|
|
|
answerId := t.remoteOfferId.Load()
|
|
|
|
if err := t.params.Handler.OnAnswer(answer, answerId, t.getMidToTrackIDMapping()); err != nil {
|
|
prometheus.RecordServiceOperationError("answer", "write_message")
|
|
return errors.Wrap(err, "could not send answer")
|
|
}
|
|
t.localAnswerId.Store(answerId)
|
|
prometheus.RecordServiceOperationSuccess("asnwer")
|
|
|
|
if err := t.sendUnmatchedMediaRequirement(false); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if !t.canReuseTransceiver {
|
|
t.canReuseTransceiver = true
|
|
t.previousTrackDescription = make(map[string]*trackDescription)
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
return t.localDescriptionSent()
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteOfferReceived(sd *webrtc.SessionDescription, offerId uint32) error {
|
|
t.params.Logger.Debugw("processing offer", "offerId", offerId)
|
|
remoteOfferId := t.remoteOfferId.Load()
|
|
if remoteOfferId != 0 && remoteOfferId != t.localAnswerId.Load() {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: multiple offers without answer", nil,
|
|
"remoteOfferId", remoteOfferId,
|
|
"localAnswerId", t.localAnswerId.Load(),
|
|
"receivedRemoteOfferId", offerId,
|
|
)
|
|
}
|
|
t.remoteOfferId.Store(offerId)
|
|
|
|
parsed, err := sd.Unmarshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if !t.firstOfferReceived {
|
|
t.firstOfferReceived = true
|
|
var dataChannelFound bool
|
|
for _, media := range parsed.MediaDescriptions {
|
|
if strings.EqualFold(media.MediaName.Media, "application") {
|
|
dataChannelFound = true
|
|
break
|
|
}
|
|
}
|
|
t.firstOfferNoDataChannel = !dataChannelFound
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
iceCredential, offerRestartICE, err := t.isRemoteOfferRestartICE(parsed)
|
|
if err != nil {
|
|
return errors.Wrap(err, "check remote offer restart ice failed")
|
|
}
|
|
|
|
if offerRestartICE && t.pendingRestartIceOffer == nil {
|
|
t.clearLocalDescriptionSent()
|
|
}
|
|
|
|
if offerRestartICE && t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
|
|
t.params.Logger.Debugw("remote offer restart ice while ice gathering")
|
|
t.pendingRestartIceOffer = sd
|
|
return nil
|
|
}
|
|
|
|
if offerRestartICE && t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
|
|
t.resetShortConn()
|
|
}
|
|
|
|
if offerRestartICE {
|
|
t.outputAndClearICEStats()
|
|
}
|
|
|
|
if err := t.setRemoteDescription(*sd); err != nil {
|
|
return err
|
|
}
|
|
t.params.Handler.OnSetRemoteDescriptionOffer()
|
|
t.processSendersPendingConfig()
|
|
|
|
rtxRepairs := nonSimulcastRTXRepairsFromSDP(parsed, t.params.Logger)
|
|
if len(rtxRepairs) > 0 {
|
|
t.params.Logger.Debugw("rtx pairs found from sdp", "ssrcs", rtxRepairs)
|
|
for repair, base := range rtxRepairs {
|
|
t.params.Config.BufferFactory.SetRTXPair(repair, base, "")
|
|
}
|
|
}
|
|
|
|
if t.currentOfferIceCredential == "" || offerRestartICE {
|
|
t.currentOfferIceCredential = iceCredential
|
|
}
|
|
|
|
return t.createAndSendAnswer()
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteAnswerReceived(sd *webrtc.SessionDescription, answerId uint32) error {
|
|
t.params.Logger.Debugw("processing answer", "answerId", answerId)
|
|
if answerId != 0 && answerId != t.localOfferId.Load() {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: answer id mismatch", nil,
|
|
"expected", t.localOfferId.Load(),
|
|
"got", answerId,
|
|
)
|
|
}
|
|
t.remoteAnswerId.Store(answerId)
|
|
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
if err := t.setRemoteDescription(*sd); err != nil {
|
|
// Pion will call RTPSender.Send method for each new added Downtrack, and return error if the DownTrack.Bind
|
|
// returns error. In case of Downtrack.Bind returns ErrUnsupportedCodec, the signal state will be stable as negotiation is aleady compelted
|
|
// before startRTPSenders, and the peerconnection state can be recovered by next negotiation which will be triggered
|
|
// by the SubscriptionManager unsubscribe the failure DownTrack. So don't treat this error as negotiation failure.
|
|
if !errors.Is(err, webrtc.ErrUnsupportedCodec) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if t.negotiationState == transport.NegotiationStateRetry {
|
|
t.setNegotiationState(transport.NegotiationStateNone)
|
|
|
|
t.params.Logger.Debugw("re-negotiate after receiving answer")
|
|
return t.createAndSendOffer(nil)
|
|
}
|
|
|
|
t.setNegotiationState(transport.NegotiationStateNone)
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) doICERestart() error {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
t.params.Logger.Warnw("trying to restart ICE on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
// if restart is requested, but negotiation never started
|
|
iceGatheringState := t.pc.ICEGatheringState()
|
|
if iceGatheringState == webrtc.ICEGatheringStateNew {
|
|
t.params.Logger.Debugw("skipping ICE restart on not yet started peer connection")
|
|
return nil
|
|
}
|
|
|
|
// if restart is requested, and we are not ready, then continue afterwards
|
|
if iceGatheringState == webrtc.ICEGatheringStateGathering {
|
|
t.params.Logger.Debugw("deferring ICE restart to after gathering")
|
|
t.restartAfterGathering = true
|
|
return nil
|
|
}
|
|
|
|
if t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
|
|
t.resetShortConn()
|
|
}
|
|
|
|
if t.negotiationState == transport.NegotiationStateNone {
|
|
t.outputAndClearICEStats()
|
|
return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
|
|
}
|
|
|
|
currentRemoteDescription := t.pc.CurrentRemoteDescription()
|
|
if currentRemoteDescription == nil {
|
|
// restart without current remote description, send current local description again to try recover
|
|
offer := t.pc.LocalDescription()
|
|
if offer == nil {
|
|
// it should not happen, log just in case
|
|
t.params.Logger.Warnw("ice restart without local offer", nil)
|
|
return ErrIceRestartWithoutLocalSDP
|
|
} else {
|
|
t.params.Logger.Infow("deferring ice restart to next offer")
|
|
t.setNegotiationState(transport.NegotiationStateRetry)
|
|
t.restartAtNextOffer = true
|
|
|
|
remoteAnswerId := t.remoteAnswerId.Load()
|
|
if remoteAnswerId != 0 && remoteAnswerId != t.localOfferId.Load() {
|
|
t.params.Logger.Warnw(
|
|
"sdp state: answer not received in ICE restart", nil,
|
|
"localOfferId", t.localOfferId.Load(),
|
|
"remoteAnswerId", remoteAnswerId,
|
|
)
|
|
}
|
|
|
|
err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping())
|
|
if err != nil {
|
|
prometheus.RecordServiceOperationError("offer", "write_message")
|
|
} else {
|
|
prometheus.RecordServiceOperationSuccess("offer")
|
|
}
|
|
return err
|
|
}
|
|
} else {
|
|
// recover by re-applying the last answer
|
|
t.params.Logger.Infow("recovering from client negotiation state on ICE restart")
|
|
if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil {
|
|
prometheus.RecordServiceOperationError("offer", "remote_description")
|
|
return errors.Wrap(err, "set remote description failed")
|
|
} else {
|
|
t.setNegotiationState(transport.NegotiationStateNone)
|
|
t.outputAndClearICEStats()
|
|
return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) handleICERestart(_ event) error {
|
|
return t.doICERestart()
|
|
}
|
|
|
|
func (t *PCTransport) onNegotiationFailed(warning bool, reason string) {
|
|
logFields := []any{
|
|
"reason", reason,
|
|
"localCurrent", t.pc.CurrentLocalDescription(),
|
|
"localPending", t.pc.PendingLocalDescription(),
|
|
"remoteCurrent", t.pc.CurrentRemoteDescription(),
|
|
"remotePending", t.pc.PendingRemoteDescription(),
|
|
}
|
|
if warning {
|
|
t.params.Logger.Warnw(
|
|
"negotiation failed",
|
|
nil,
|
|
logFields...,
|
|
)
|
|
} else {
|
|
t.params.Logger.Infow("negotiation failed", logFields...)
|
|
}
|
|
t.params.Handler.OnNegotiationFailed()
|
|
}
|
|
|
|
func (t *PCTransport) outputAndClearICEStats() {
|
|
t.lock.Lock()
|
|
stats := t.mayFailedICEStats
|
|
t.mayFailedICEStats = nil
|
|
t.lock.Unlock()
|
|
|
|
if len(stats) > 0 {
|
|
t.params.Logger.Infow("ICE candidate pair stats", "stats", iceCandidatePairStatsEncoder{stats})
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) getMidToTrackIDMapping() map[string]string {
|
|
transceivers := t.pc.GetTransceivers()
|
|
midToTrackID := make(map[string]string, len(transceivers))
|
|
for _, tr := range transceivers {
|
|
if mid := tr.Mid(); mid != "" {
|
|
if sender := tr.Sender(); sender != nil {
|
|
if track := sender.Track(); track != nil {
|
|
midToTrackID[mid] = track.ID()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return midToTrackID
|
|
}
|
|
|
|
// ----------------------
|
|
|
|
type configureSenderParams struct {
|
|
transceiver *webrtc.RTPTransceiver
|
|
enabledCodecs []*livekit.Codec
|
|
rtcpFeedbackConfig RTCPFeedbackConfig
|
|
filterOutH264HighProfile bool
|
|
enableAudioStereo bool
|
|
enableAudioNACK bool
|
|
}
|
|
|
|
func configureSender(params configureSenderParams) {
|
|
configureSenderCodecs(
|
|
params.transceiver,
|
|
params.enabledCodecs,
|
|
params.rtcpFeedbackConfig,
|
|
params.filterOutH264HighProfile,
|
|
)
|
|
|
|
if params.transceiver.Kind() == webrtc.RTPCodecTypeAudio {
|
|
configureSenderAudio(params.transceiver, params.enableAudioStereo, params.enableAudioNACK)
|
|
}
|
|
}
|
|
|
|
// configure subscriber transceiver for audio stereo and nack
|
|
// pion doesn't support per transciver codec configuration, so the nack of this session will be disabled
|
|
// forever once it is first disabled by a transceiver.
|
|
func configureSenderAudio(tr *webrtc.RTPTransceiver, stereo bool, nack bool) {
|
|
sender := tr.Sender()
|
|
if sender == nil {
|
|
return
|
|
}
|
|
|
|
// enable stereo
|
|
codecs := sender.GetParameters().Codecs
|
|
configCodecs := make([]webrtc.RTPCodecParameters, 0, len(codecs))
|
|
for _, c := range codecs {
|
|
if mime.IsMimeTypeStringOpus(c.MimeType) {
|
|
c.SDPFmtpLine = strings.ReplaceAll(c.SDPFmtpLine, ";sprop-stereo=1", "")
|
|
if stereo {
|
|
c.SDPFmtpLine += ";sprop-stereo=1"
|
|
}
|
|
if !nack {
|
|
for i, fb := range c.RTCPFeedback {
|
|
if fb.Type == webrtc.TypeRTCPFBNACK {
|
|
c.RTCPFeedback = append(c.RTCPFeedback[:i], c.RTCPFeedback[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
configCodecs = append(configCodecs, c)
|
|
}
|
|
|
|
tr.SetCodecPreferences(configCodecs)
|
|
}
|
|
|
|
// In single peer connection mode, set up enebled codecs for sender.
|
|
// The config provides config of direction.
|
|
// For publisher peer connection those are publish enabled codecs
|
|
// and for subscriber peer connection those are subscribe enabled codecs.
|
|
//
|
|
// But, in single peer connection mode, if setting up a transceiver where the media is
|
|
// flowing in the other direction, the other direction codec config needs to be set.
|
|
func configureSenderCodecs(
|
|
tr *webrtc.RTPTransceiver,
|
|
enabledCodecs []*livekit.Codec,
|
|
rtcpFeedbackConfig RTCPFeedbackConfig,
|
|
filterOutH264HighProfile bool,
|
|
) {
|
|
if len(enabledCodecs) == 0 {
|
|
return
|
|
}
|
|
|
|
sender := tr.Sender()
|
|
if sender == nil {
|
|
return
|
|
}
|
|
|
|
filteredCodecs := filterCodecs(
|
|
sender.GetParameters().Codecs,
|
|
enabledCodecs,
|
|
rtcpFeedbackConfig,
|
|
filterOutH264HighProfile,
|
|
)
|
|
tr.SetCodecPreferences(filteredCodecs)
|
|
}
|
|
|
|
func configureReceiverCodecs(
|
|
tr *webrtc.RTPTransceiver,
|
|
preferredMimeType string,
|
|
compliesWithCodecOrderInSDPAnswer bool,
|
|
) {
|
|
receiver := tr.Receiver()
|
|
if receiver == nil {
|
|
return
|
|
}
|
|
|
|
var preferredCodecs, leftCodecs []webrtc.RTPCodecParameters
|
|
for _, c := range receiver.GetParameters().Codecs {
|
|
if tr.Kind() == webrtc.RTPCodecTypeAudio {
|
|
nackFound := false
|
|
for _, fb := range c.RTCPFeedback {
|
|
if fb.Type == webrtc.TypeRTCPFBNACK {
|
|
nackFound = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !nackFound {
|
|
c.RTCPFeedback = append(c.RTCPFeedback, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBNACK})
|
|
}
|
|
}
|
|
|
|
if mime.GetMimeTypeCodec(preferredMimeType) == mime.GetMimeTypeCodec(c.RTPCodecCapability.MimeType) {
|
|
preferredCodecs = append(preferredCodecs, c)
|
|
} else {
|
|
leftCodecs = append(leftCodecs, c)
|
|
}
|
|
}
|
|
if len(preferredCodecs) == 0 {
|
|
return
|
|
}
|
|
|
|
reorderedCodecs := append([]webrtc.RTPCodecParameters{}, preferredCodecs...)
|
|
if tr.Kind() == webrtc.RTPCodecTypeVideo {
|
|
// if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it
|
|
if compliesWithCodecOrderInSDPAnswer {
|
|
reorderedCodecs = append(reorderedCodecs, leftCodecs...)
|
|
}
|
|
} else {
|
|
reorderedCodecs = append(reorderedCodecs, leftCodecs...)
|
|
}
|
|
tr.SetCodecPreferences(reorderedCodecs)
|
|
}
|
|
|
|
func nonSimulcastRTXRepairsFromSDP(s *sdp.SessionDescription, logger logger.Logger) map[uint32]uint32 {
|
|
rtxRepairFlows := map[uint32]uint32{}
|
|
for _, media := range s.MediaDescriptions {
|
|
// extract rtx repair flows from the media section for non-simulcast stream,
|
|
// pion will handle simulcast streams by rid probe, don't need handle it here.
|
|
var ridFound bool
|
|
rtxPairs := make(map[uint32]uint32)
|
|
findRTX:
|
|
for _, attr := range media.Attributes {
|
|
switch attr.Key {
|
|
case "rid":
|
|
ridFound = true
|
|
break findRTX
|
|
case sdp.AttrKeySSRCGroup:
|
|
split := strings.Split(attr.Value, " ")
|
|
if split[0] == sdp.SemanticTokenFlowIdentification {
|
|
// Essentially lines like `a=ssrc-group:FID 2231627014 632943048` are processed by this section
|
|
// as this declares that the second SSRC (632943048) is a rtx repair flow (RFC4588) for the first
|
|
// (2231627014) as specified in RFC5576
|
|
if len(split) == 3 {
|
|
baseSsrc, err := strconv.ParseUint(split[1], 10, 32)
|
|
if err != nil {
|
|
logger.Warnw("Failed to parse SSRC", err, "ssrc", split[1])
|
|
continue
|
|
}
|
|
rtxRepairFlow, err := strconv.ParseUint(split[2], 10, 32)
|
|
if err != nil {
|
|
logger.Warnw("Failed to parse SSRC", err, "ssrc", split[2])
|
|
continue
|
|
}
|
|
rtxPairs[uint32(rtxRepairFlow)] = uint32(baseSsrc)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !ridFound {
|
|
maps.Copy(rtxRepairFlows, rtxPairs)
|
|
}
|
|
}
|
|
|
|
return rtxRepairFlows
|
|
}
|
|
|
|
// ----------------------
|
|
|
|
type iceCandidatePairStatsEncoder struct {
|
|
stats []iceCandidatePairStats
|
|
}
|
|
|
|
func (e iceCandidatePairStatsEncoder) MarshalLogArray(arr zapcore.ArrayEncoder) error {
|
|
for _, s := range e.stats {
|
|
if err := arr.AppendObject(s); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type iceCandidatePairStats struct {
|
|
webrtc.ICECandidatePairStats
|
|
local, remote webrtc.ICECandidateStats
|
|
}
|
|
|
|
func (r iceCandidatePairStats) MarshalLogObject(e zapcore.ObjectEncoder) error {
|
|
candidateToString := func(c webrtc.ICECandidateStats) string {
|
|
return fmt.Sprintf("%s:%d %s type(%s/%s), priority(%d)", c.IP, c.Port, c.Protocol, c.CandidateType, c.RelayProtocol, c.Priority)
|
|
}
|
|
e.AddString("state", string(r.State))
|
|
e.AddBool("nominated", r.Nominated)
|
|
e.AddString("local", candidateToString(r.local))
|
|
e.AddString("remote", candidateToString(r.remote))
|
|
e.AddUint64("requestsSent", r.RequestsSent)
|
|
e.AddUint64("responsesReceived", r.ResponsesReceived)
|
|
e.AddUint64("requestsReceived", r.RequestsReceived)
|
|
e.AddUint64("responsesSent", r.ResponsesSent)
|
|
e.AddTime("firstRequestSentAt", r.FirstRequestTimestamp.Time())
|
|
e.AddTime("lastRequestSentAt", r.LastRequestTimestamp.Time())
|
|
e.AddTime("firstResponseReceivedAt", r.FirstResponseTimestamp.Time())
|
|
e.AddTime("lastResponseReceivedAt", r.LastResponseTimestamp.Time())
|
|
e.AddTime("firstRequestReceivedAt", r.FirstRequestReceivedTimestamp.Time())
|
|
e.AddTime("lastRequestReceivedAt", r.LastRequestReceivedTimestamp.Time())
|
|
|
|
return nil
|
|
}
|