mirror of
https://github.com/livekit/livekit.git
synced 2026-04-01 17:25:46 +00:00
1757 lines
47 KiB
Go
1757 lines
47 KiB
Go
package rtc
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bep/debounce"
|
|
"github.com/go-logr/logr"
|
|
"github.com/pion/ice/v2"
|
|
"github.com/pion/interceptor"
|
|
"github.com/pion/interceptor/pkg/cc"
|
|
"github.com/pion/interceptor/pkg/gcc"
|
|
"github.com/pion/interceptor/pkg/twcc"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/webrtc/v3"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/atomic"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
lksdp "github.com/livekit/protocol/sdp"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
serverlogger "github.com/livekit/livekit-server/pkg/logger"
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/sfu"
|
|
"github.com/livekit/livekit-server/pkg/telemetry"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
)
|
|
|
|
const (
|
|
LossyDataChannel = "_lossy"
|
|
ReliableDataChannel = "_reliable"
|
|
|
|
negotiationFrequency = 150 * time.Millisecond
|
|
negotiationFailedTimeout = 15 * time.Second
|
|
dtlsRetransmissionInterval = 100 * time.Millisecond
|
|
|
|
iceDisconnectedTimeout = 10 * time.Second // compatible for ice-lite with firefox client
|
|
iceFailedTimeout = 25 * time.Second // pion's default
|
|
iceKeepaliveInterval = 2 * time.Second // pion's default
|
|
|
|
shortConnectionThreshold = 90 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrIceRestartWithoutLocalSDP = errors.New("ICE restart without local SDP settled")
|
|
ErrNoTransceiver = errors.New("no transceiver")
|
|
ErrNoSender = errors.New("no sender")
|
|
ErrNoICECandidateHandler = errors.New("no ICE candidate handler")
|
|
ErrNoOfferHandler = errors.New("no offer handler")
|
|
ErrNoAnswerHandler = errors.New("no answer handler")
|
|
ErrMidNotFound = errors.New("mid not found")
|
|
)
|
|
|
|
// -------------------------------------------------------------------------
|
|
|
|
type signal int
|
|
|
|
const (
|
|
signalICEGatheringComplete signal = iota
|
|
signalLocalICECandidate
|
|
signalRemoteICECandidate
|
|
signalLogICECandidates
|
|
signalSendOffer
|
|
signalRemoteDescriptionReceived
|
|
signalICERestart
|
|
)
|
|
|
|
func (s signal) String() string {
|
|
switch s {
|
|
case signalICEGatheringComplete:
|
|
return "ICE_GATHERING_COMPLETE"
|
|
case signalLocalICECandidate:
|
|
return "LOCAL_ICE_CANDIDATE"
|
|
case signalRemoteICECandidate:
|
|
return "REMOTE_ICE_CANDIDATE"
|
|
case signalLogICECandidates:
|
|
return "LOG_ICE_CANDIDATES"
|
|
case signalSendOffer:
|
|
return "SEND_OFFER"
|
|
case signalRemoteDescriptionReceived:
|
|
return "REMOTE_DESCRIPTION_RECEIVED"
|
|
case signalICERestart:
|
|
return "ICE_RESTART"
|
|
default:
|
|
return fmt.Sprintf("%d", int(s))
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------
|
|
|
|
type event struct {
|
|
signal signal
|
|
data interface{}
|
|
}
|
|
|
|
func (e event) String() string {
|
|
return fmt.Sprintf("PCTransport:Event{signal: %s, data: %+v}", e.signal, e.data)
|
|
}
|
|
|
|
// -------------------------------------------------------
|
|
|
|
type NegotiationState int
|
|
|
|
const (
|
|
NegotiationStateNone NegotiationState = iota
|
|
// waiting for remote description
|
|
NegotiationStateRemote
|
|
// need to Negotiate again
|
|
NegotiationStateRetry
|
|
)
|
|
|
|
func (n NegotiationState) String() string {
|
|
switch n {
|
|
case NegotiationStateNone:
|
|
return "NONE"
|
|
case NegotiationStateRemote:
|
|
return "WAITING_FOR_REMOTE"
|
|
case NegotiationStateRetry:
|
|
return "RETRY"
|
|
default:
|
|
return fmt.Sprintf("%d", int(n))
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------
|
|
|
|
type SimulcastTrackInfo struct {
|
|
Mid string
|
|
Rid string
|
|
}
|
|
|
|
type trackDescription struct {
|
|
mid string
|
|
sender *webrtc.RTPSender
|
|
}
|
|
|
|
// PCTransport is a wrapper around PeerConnection, with some helper methods
|
|
type PCTransport struct {
|
|
params TransportParams
|
|
pc *webrtc.PeerConnection
|
|
me *webrtc.MediaEngine
|
|
|
|
lock sync.RWMutex
|
|
|
|
reliableDC *webrtc.DataChannel
|
|
reliableDCOpened bool
|
|
lossyDC *webrtc.DataChannel
|
|
lossyDCOpened bool
|
|
onDataPacket func(kind livekit.DataPacket_Kind, data []byte)
|
|
|
|
iceConnectedAt time.Time
|
|
connectedAt time.Time
|
|
|
|
onFullyEstablished func()
|
|
|
|
debouncedNegotiate func(func())
|
|
debouncePending bool
|
|
|
|
onICECandidate func(c *webrtc.ICECandidate) error
|
|
onOffer func(offer webrtc.SessionDescription) error
|
|
onAnswer func(answer webrtc.SessionDescription) error
|
|
onInitialConnected func()
|
|
onFailed func(isShortLived bool)
|
|
onNegotiationStateChanged func(state NegotiationState)
|
|
onNegotiationFailed func()
|
|
|
|
// stream allocator for subscriber PC
|
|
streamAllocator *sfu.StreamAllocator
|
|
|
|
previousAnswer *webrtc.SessionDescription
|
|
// track id -> description map in previous offer sdp
|
|
previousTrackDescription map[string]*trackDescription
|
|
canReuseTransceiver bool
|
|
|
|
preferTCP atomic.Bool
|
|
isClosed atomic.Bool
|
|
|
|
eventChMu sync.RWMutex
|
|
eventCh chan event
|
|
|
|
// the following should be accessed only in event processing go routine
|
|
cacheLocalCandidates bool
|
|
cachedLocalCandidates []*webrtc.ICECandidate
|
|
pendingRemoteCandidates []*webrtc.ICECandidateInit
|
|
restartAfterGathering bool
|
|
restartAtNextOffer bool
|
|
negotiationState NegotiationState
|
|
negotiateCounter atomic.Int32
|
|
signalStateCheckTimer *time.Timer
|
|
currentOfferIceCredential string // ice user:pwd, for publish side ice restart checking
|
|
pendingRestartIceOffer *webrtc.SessionDescription
|
|
|
|
// for cleaner logging
|
|
allowedLocalCandidates []string
|
|
allowedRemoteCandidates []string
|
|
filteredLocalCandidates []string
|
|
filteredRemoteCandidates []string
|
|
}
|
|
|
|
type TransportParams struct {
|
|
ParticipantID livekit.ParticipantID
|
|
ParticipantIdentity livekit.ParticipantIdentity
|
|
ProtocolVersion types.ProtocolVersion
|
|
Config *WebRTCConfig
|
|
DirectionConfig DirectionConfig
|
|
CongestionControlConfig config.CongestionControlConfig
|
|
Telemetry telemetry.TelemetryService
|
|
EnabledCodecs []*livekit.Codec
|
|
Logger logger.Logger
|
|
SimTracks map[uint32]SimulcastTrackInfo
|
|
ClientInfo ClientInfo
|
|
IsOfferer bool
|
|
IsSendSide bool
|
|
}
|
|
|
|
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
|
|
directionConfig := params.DirectionConfig
|
|
// enable nack if audio red is not support
|
|
if !isCodecEnabled(params.EnabledCodecs, webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed}) || !params.ClientInfo.SupportsAudioRED() {
|
|
directionConfig.RTCPFeedback.Audio = append(directionConfig.RTCPFeedback.Audio, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBNACK})
|
|
}
|
|
|
|
me, err := createMediaEngine(params.EnabledCodecs, directionConfig)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
se := params.Config.SettingEngine
|
|
se.DisableMediaEngineCopy(true)
|
|
//
|
|
// Disable SRTP replay protection (https://datatracker.ietf.org/doc/html/rfc3711#page-15).
|
|
// Needed due to lack of RTX stream support in Pion.
|
|
//
|
|
// When clients probe for bandwidth, there are several possible approaches
|
|
// 1. Use padding packet (Chrome uses this)
|
|
// 2. Use an older packet (Firefox uses this)
|
|
// Typically, these are sent over the RTX stream and hence SRTP replay protection will not
|
|
// trigger. As Pion does not support RTX, when firefox uses older packet for probing, they
|
|
// trigger the replay protection.
|
|
//
|
|
// That results in two issues
|
|
// - Firefox bandwidth probing is not successful
|
|
// - Pion runs out of read buffer capacity - this potentially looks like a Pion issue
|
|
//
|
|
// NOTE: It is not required to disable RTCP replay protection, but doing it to be symmetric.
|
|
//
|
|
se.DisableSRTPReplayProtection(true)
|
|
se.DisableSRTCPReplayProtection(true)
|
|
if !params.ProtocolVersion.SupportsICELite() {
|
|
se.SetLite(false)
|
|
}
|
|
se.SetDTLSRetransmissionInterval(dtlsRetransmissionInterval)
|
|
se.SetICETimeouts(iceDisconnectedTimeout, iceFailedTimeout, iceKeepaliveInterval)
|
|
|
|
lf := serverlogger.NewLoggerFactory(logr.Logger(params.Logger))
|
|
if lf != nil {
|
|
se.LoggerFactory = lf
|
|
}
|
|
|
|
ir := &interceptor.Registry{}
|
|
if params.IsSendSide {
|
|
isSendSideBWE := false
|
|
for _, ext := range directionConfig.RTPHeaderExtension.Video {
|
|
if ext == sdp.TransportCCURI {
|
|
isSendSideBWE = true
|
|
break
|
|
}
|
|
}
|
|
for _, ext := range directionConfig.RTPHeaderExtension.Audio {
|
|
if ext == sdp.TransportCCURI {
|
|
isSendSideBWE = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if isSendSideBWE {
|
|
gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
|
|
return gcc.NewSendSideBWE(
|
|
gcc.SendSideBWEInitialBitrate(1*1000*1000),
|
|
gcc.SendSideBWEPacer(gcc.NewNoOpPacer()),
|
|
)
|
|
})
|
|
if err == nil {
|
|
gf.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
|
|
if onBandwidthEstimator != nil {
|
|
onBandwidthEstimator(estimator)
|
|
}
|
|
})
|
|
ir.Add(gf)
|
|
|
|
tf, err := twcc.NewHeaderExtensionInterceptor()
|
|
if err == nil {
|
|
ir.Add(tf)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(params.SimTracks) > 0 {
|
|
f, err := NewUnhandleSimulcastInterceptorFactory(UnhandleSimulcastTracks(params.SimTracks))
|
|
if err != nil {
|
|
params.Logger.Errorw("NewUnhandleSimulcastInterceptorFactory failed", err)
|
|
} else {
|
|
ir.Add(f)
|
|
}
|
|
}
|
|
api := webrtc.NewAPI(
|
|
webrtc.WithMediaEngine(me),
|
|
webrtc.WithSettingEngine(se),
|
|
webrtc.WithInterceptorRegistry(ir),
|
|
)
|
|
pc, err := api.NewPeerConnection(params.Config.Configuration)
|
|
return pc, me, err
|
|
}
|
|
|
|
func NewPCTransport(params TransportParams) (*PCTransport, error) {
|
|
t := &PCTransport{
|
|
params: params,
|
|
debouncedNegotiate: debounce.New(negotiationFrequency),
|
|
negotiationState: NegotiationStateNone,
|
|
eventCh: make(chan event, 50),
|
|
previousTrackDescription: make(map[string]*trackDescription),
|
|
canReuseTransceiver: true,
|
|
}
|
|
if params.IsSendSide {
|
|
t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{
|
|
Config: params.CongestionControlConfig,
|
|
Logger: params.Logger,
|
|
})
|
|
t.streamAllocator.Start()
|
|
}
|
|
|
|
if err := t.createPeerConnection(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go t.processEvents()
|
|
|
|
return t, nil
|
|
}
|
|
|
|
func (t *PCTransport) createPeerConnection() error {
|
|
var bwe cc.BandwidthEstimator
|
|
pc, me, err := newPeerConnection(t.params, func(estimator cc.BandwidthEstimator) {
|
|
bwe = estimator
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.pc = pc
|
|
t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange)
|
|
t.pc.OnICEConnectionStateChange(t.onICEConnectionStateChange)
|
|
t.pc.OnICECandidate(t.onICECandidateTrickle)
|
|
|
|
t.pc.OnConnectionStateChange(t.onPeerConnectionStateChange)
|
|
|
|
t.pc.OnDataChannel(t.onDataChannel)
|
|
|
|
t.me = me
|
|
|
|
if bwe != nil && t.streamAllocator != nil {
|
|
t.streamAllocator.SetBandwidthEstimator(bwe)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) setICEConnectedAt(at time.Time) {
|
|
t.lock.Lock()
|
|
if t.iceConnectedAt.IsZero() {
|
|
//
|
|
// Record initial connection time.
|
|
// This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`.
|
|
//
|
|
t.iceConnectedAt = at
|
|
prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) isShortConnection(at time.Time) (bool, time.Duration) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
if t.iceConnectedAt.IsZero() {
|
|
return false, 0
|
|
}
|
|
|
|
duration := at.Sub(t.iceConnectedAt)
|
|
return duration < shortConnectionThreshold, duration
|
|
}
|
|
|
|
func (t *PCTransport) getSelectedPair() (*webrtc.ICECandidatePair, error) {
|
|
sctp := t.pc.SCTP()
|
|
if sctp == nil {
|
|
return nil, errors.New("no SCTP")
|
|
}
|
|
|
|
dtlsTransport := sctp.Transport()
|
|
if dtlsTransport == nil {
|
|
return nil, errors.New("no DTLS transport")
|
|
}
|
|
|
|
iceTransport := dtlsTransport.ICETransport()
|
|
if iceTransport == nil {
|
|
return nil, errors.New("no ICE transport")
|
|
}
|
|
|
|
return iceTransport.GetSelectedCandidatePair()
|
|
}
|
|
|
|
func (t *PCTransport) logICECandidates() {
|
|
t.postEvent(event{
|
|
signal: signalLogICECandidates,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) setConnectedAt(at time.Time) bool {
|
|
t.lock.Lock()
|
|
if !t.connectedAt.IsZero() {
|
|
t.lock.Unlock()
|
|
return false
|
|
}
|
|
|
|
t.connectedAt = at
|
|
prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1)
|
|
t.lock.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGathererState) {
|
|
t.params.Logger.Infow("ice gathering state change", "state", state.String())
|
|
if state != webrtc.ICEGathererStateComplete {
|
|
return
|
|
}
|
|
|
|
t.postEvent(event{
|
|
signal: signalICEGatheringComplete,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) onICECandidateTrickle(c *webrtc.ICECandidate) {
|
|
t.postEvent(event{
|
|
signal: signalLocalICECandidate,
|
|
data: c,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) handleConnectionFailed() {
|
|
isShort, duration := t.isShortConnection(time.Now())
|
|
if isShort {
|
|
pair, err := t.getSelectedPair()
|
|
if err != nil {
|
|
t.params.Logger.Errorw("short ICE connection", err, "duration", duration)
|
|
} else {
|
|
t.params.Logger.Infow("short ICE connection", "pair", pair, "duration", duration)
|
|
}
|
|
}
|
|
|
|
if onFailed := t.getOnFailed(); onFailed != nil {
|
|
onFailed(isShort)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState) {
|
|
t.params.Logger.Infow("ice connection state change", "state", state.String())
|
|
switch state {
|
|
case webrtc.ICEConnectionStateConnected:
|
|
t.setICEConnectedAt(time.Now())
|
|
if pair, err := t.getSelectedPair(); err != nil {
|
|
t.params.Logger.Errorw("error getting selected ICE candidate pair", err)
|
|
} else {
|
|
t.params.Logger.Infow("selected ICE candidate pair", "pair", pair)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionState) {
|
|
t.params.Logger.Infow("peer connection state change", "state", state.String())
|
|
switch state {
|
|
case webrtc.PeerConnectionStateConnected:
|
|
t.logICECandidates()
|
|
isInitialConnection := t.setConnectedAt(time.Now())
|
|
if isInitialConnection {
|
|
if onInitialConnected := t.getOnInitialConnected(); onInitialConnected != nil {
|
|
onInitialConnected()
|
|
}
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
}
|
|
case webrtc.PeerConnectionStateFailed:
|
|
t.logICECandidates()
|
|
t.handleConnectionFailed()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
|
|
switch dc.Label() {
|
|
case ReliableDataChannel:
|
|
t.params.Logger.Debugw("reliable data channel open")
|
|
t.lock.Lock()
|
|
t.reliableDC = dc
|
|
t.reliableDCOpened = true
|
|
t.lock.Unlock()
|
|
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
|
|
if onDataPacket := t.getOnDataPacket(); onDataPacket != nil {
|
|
onDataPacket(livekit.DataPacket_RELIABLE, msg.Data)
|
|
}
|
|
})
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
case LossyDataChannel:
|
|
t.params.Logger.Debugw("lossy data channel open")
|
|
t.lock.Lock()
|
|
t.lossyDC = dc
|
|
t.lossyDCOpened = true
|
|
t.lock.Unlock()
|
|
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
|
|
if onDataPacket := t.getOnDataPacket(); onDataPacket != nil {
|
|
onDataPacket(livekit.DataPacket_LOSSY, msg.Data)
|
|
}
|
|
})
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
default:
|
|
t.params.Logger.Warnw("unsupported datachannel added", nil, "label", dc.Label())
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) maybeNotifyFullyEstablished() {
|
|
t.lock.RLock()
|
|
fullyEstablished := t.reliableDCOpened && t.lossyDCOpened && !t.connectedAt.IsZero()
|
|
t.lock.RUnlock()
|
|
|
|
if fullyEstablished {
|
|
if onFullyEstablished := t.getOnFullyEstablished(); onFullyEstablished != nil {
|
|
onFullyEstablished()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) SetPreferTCP(preferTCP bool) {
|
|
t.preferTCP.Store(preferTCP)
|
|
}
|
|
|
|
func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) {
|
|
t.postEvent(event{
|
|
signal: signalRemoteICECandidate,
|
|
data: &candidate,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) AddTrack(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
|
|
t.lock.Lock()
|
|
canReuse := t.canReuseTransceiver
|
|
td, ok := t.previousTrackDescription[trackLocal.ID()]
|
|
if ok {
|
|
delete(t.previousTrackDescription, trackLocal.ID())
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
// keep track use same mid after migration if possible
|
|
if td != nil && td.sender != nil {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() == td.mid {
|
|
return td.sender, tr, tr.SetSender(td.sender, trackLocal)
|
|
}
|
|
}
|
|
}
|
|
|
|
// if never negotiated with client, can't reuse transeiver for track not subscribed before migration
|
|
if !canReuse {
|
|
return t.AddTransceiverFromTrack(trackLocal, params)
|
|
}
|
|
|
|
sender, err = t.pc.AddTrack(trackLocal)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// as there is no way to get transceiver from sender, search
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Sender() == sender {
|
|
transceiver = tr
|
|
break
|
|
}
|
|
}
|
|
|
|
if transceiver == nil {
|
|
err = ErrNoTransceiver
|
|
return
|
|
}
|
|
|
|
configureTransceiverStereo(transceiver, params.Stereo)
|
|
|
|
return
|
|
}
|
|
|
|
func (t *PCTransport) AddTransceiverFromTrack(trackLocal webrtc.TrackLocal, params types.AddTrackParams) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
|
|
transceiver, err = t.pc.AddTransceiverFromTrack(trackLocal)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sender = transceiver.Sender()
|
|
if sender == nil {
|
|
err = ErrNoSender
|
|
return
|
|
}
|
|
|
|
configureTransceiverStereo(transceiver, params.Stereo)
|
|
|
|
return
|
|
}
|
|
|
|
func (t *PCTransport) RemoveTrack(sender *webrtc.RTPSender) error {
|
|
return t.pc.RemoveTrack(sender)
|
|
}
|
|
|
|
func (t *PCTransport) GetMid(rtpReceiver *webrtc.RTPReceiver) string {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Receiver() == rtpReceiver {
|
|
return tr.Mid()
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (t *PCTransport) GetRTPReceiver(mid string) *webrtc.RTPReceiver {
|
|
for _, tr := range t.pc.GetTransceivers() {
|
|
if tr.Mid() == mid {
|
|
return tr.Receiver()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelInit) error {
|
|
dc, err := t.pc.CreateDataChannel(label, dci)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.lock.Lock()
|
|
switch dc.Label() {
|
|
case ReliableDataChannel:
|
|
t.reliableDC = dc
|
|
t.reliableDC.OnOpen(func() {
|
|
t.params.Logger.Debugw("reliable data channel open")
|
|
t.lock.Lock()
|
|
t.reliableDCOpened = true
|
|
t.lock.Unlock()
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
})
|
|
case LossyDataChannel:
|
|
t.lossyDC = dc
|
|
t.lossyDC.OnOpen(func() {
|
|
t.params.Logger.Debugw("lossy data channel open")
|
|
t.lock.Lock()
|
|
t.lossyDCOpened = true
|
|
t.lock.Unlock()
|
|
|
|
t.maybeNotifyFullyEstablished()
|
|
})
|
|
default:
|
|
t.params.Logger.Errorw("unknown data channel label", nil, "label", dc.Label())
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) CreateDataChannelIfEmpty(dcLabel string, dci *webrtc.DataChannelInit) (label string, id uint16, existing bool, err error) {
|
|
t.lock.RLock()
|
|
var dc *webrtc.DataChannel
|
|
switch dcLabel {
|
|
case ReliableDataChannel:
|
|
dc = t.reliableDC
|
|
case LossyDataChannel:
|
|
dc = t.lossyDC
|
|
default:
|
|
t.params.Logger.Errorw("unknown data channel label", nil, "label", label)
|
|
err = errors.New("unknown data channel label")
|
|
}
|
|
t.lock.RUnlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if dc != nil {
|
|
return dc.Label(), *dc.ID(), true, nil
|
|
}
|
|
|
|
dc, err = t.pc.CreateDataChannel(dcLabel, dci)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
t.onDataChannel(dc)
|
|
return dc.Label(), *dc.ID(), false, nil
|
|
}
|
|
|
|
// IsEstablished returns true if the PeerConnection has been established
|
|
func (t *PCTransport) IsEstablished() bool {
|
|
return t.pc.ConnectionState() != webrtc.PeerConnectionStateNew
|
|
}
|
|
|
|
func (t *PCTransport) HasEverConnected() bool {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return !t.connectedAt.IsZero()
|
|
}
|
|
|
|
func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error {
|
|
return t.pc.WriteRTCP(pkts)
|
|
}
|
|
|
|
func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket) error {
|
|
data, err := proto.Marshal(dp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var dc *webrtc.DataChannel
|
|
t.lock.RLock()
|
|
if dp.Kind == livekit.DataPacket_RELIABLE {
|
|
dc = t.reliableDC
|
|
} else {
|
|
dc = t.lossyDC
|
|
}
|
|
t.lock.RUnlock()
|
|
|
|
if dc == nil {
|
|
return ErrDataChannelUnavailable
|
|
}
|
|
|
|
return dc.Send(data)
|
|
}
|
|
|
|
func (t *PCTransport) Close() {
|
|
t.eventChMu.Lock()
|
|
if t.isClosed.Swap(true) {
|
|
t.eventChMu.Unlock()
|
|
return
|
|
}
|
|
|
|
close(t.eventCh)
|
|
t.eventChMu.Unlock()
|
|
|
|
if t.streamAllocator != nil {
|
|
t.streamAllocator.Stop()
|
|
}
|
|
|
|
_ = t.pc.Close()
|
|
}
|
|
|
|
func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription) {
|
|
t.postEvent(event{
|
|
signal: signalRemoteDescriptionReceived,
|
|
data: &sd,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) OnICECandidate(f func(c *webrtc.ICECandidate) error) {
|
|
t.lock.Lock()
|
|
t.onICECandidate = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnICECandidate() func(c *webrtc.ICECandidate) error {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onICECandidate
|
|
}
|
|
|
|
func (t *PCTransport) OnInitialConnected(f func()) {
|
|
t.lock.Lock()
|
|
t.onInitialConnected = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnInitialConnected() func() {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onInitialConnected
|
|
}
|
|
|
|
func (t *PCTransport) OnFullyEstablished(f func()) {
|
|
t.lock.Lock()
|
|
t.onFullyEstablished = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnFullyEstablished() func() {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onFullyEstablished
|
|
}
|
|
|
|
func (t *PCTransport) OnFailed(f func(isShortLived bool)) {
|
|
t.lock.Lock()
|
|
t.onFailed = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnFailed() func(isShortLived bool) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onFailed
|
|
}
|
|
|
|
func (t *PCTransport) OnTrack(f func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver)) {
|
|
t.pc.OnTrack(f)
|
|
}
|
|
|
|
func (t *PCTransport) OnDataPacket(f func(kind livekit.DataPacket_Kind, data []byte)) {
|
|
t.lock.Lock()
|
|
t.onDataPacket = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnDataPacket() func(kind livekit.DataPacket_Kind, data []byte) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onDataPacket
|
|
}
|
|
|
|
// OnOffer is called when the PeerConnection starts negotiation and prepares an offer
|
|
func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription) error) {
|
|
t.lock.Lock()
|
|
t.onOffer = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnOffer() func(sd webrtc.SessionDescription) error {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onOffer
|
|
}
|
|
|
|
func (t *PCTransport) OnAnswer(f func(sd webrtc.SessionDescription) error) {
|
|
t.lock.Lock()
|
|
t.onAnswer = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnAnswer() func(sd webrtc.SessionDescription) error {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onAnswer
|
|
}
|
|
|
|
func (t *PCTransport) OnNegotiationStateChanged(f func(state NegotiationState)) {
|
|
t.lock.Lock()
|
|
t.onNegotiationStateChanged = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnNegotiationStateChanged() func(state NegotiationState) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onNegotiationStateChanged
|
|
}
|
|
|
|
func (t *PCTransport) OnNegotiationFailed(f func()) {
|
|
t.lock.Lock()
|
|
t.onNegotiationFailed = f
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) getOnNegotiationFailed() func() {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.onNegotiationFailed
|
|
}
|
|
|
|
func (t *PCTransport) Negotiate(force bool) {
|
|
if t.isClosed.Load() {
|
|
return
|
|
}
|
|
|
|
if force {
|
|
t.lock.Lock()
|
|
t.debouncedNegotiate(func() {
|
|
// no op to cancel pending negotiation
|
|
})
|
|
t.debouncePending = false
|
|
t.lock.Unlock()
|
|
|
|
t.postEvent(event{
|
|
signal: signalSendOffer,
|
|
})
|
|
} else {
|
|
t.lock.Lock()
|
|
if !t.debouncePending {
|
|
t.debouncedNegotiate(func() {
|
|
t.lock.Lock()
|
|
t.debouncePending = false
|
|
t.lock.Unlock()
|
|
|
|
t.postEvent(event{
|
|
signal: signalSendOffer,
|
|
})
|
|
})
|
|
t.debouncePending = true
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) ICERestart() {
|
|
t.postEvent(event{
|
|
signal: signalICERestart,
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) OnStreamStateChange(f func(update *sfu.StreamStateUpdate) error) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.OnStreamStateChange(f)
|
|
}
|
|
|
|
func (t *PCTransport) AddTrackToStreamAllocator(subTrack types.SubscribedTrack) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.AddTrack(subTrack.DownTrack(), sfu.AddTrackParams{
|
|
Source: subTrack.MediaTrack().Source(),
|
|
IsSimulcast: subTrack.MediaTrack().IsSimulcast(),
|
|
PublisherID: subTrack.MediaTrack().PublisherID(),
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTrack) {
|
|
if t.streamAllocator == nil {
|
|
return
|
|
}
|
|
|
|
t.streamAllocator.RemoveTrack(subTrack.DownTrack())
|
|
}
|
|
|
|
func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType {
|
|
unknown := types.ICEConnectionTypeUnknown
|
|
if t.pc == nil {
|
|
return unknown
|
|
}
|
|
p, err := t.getSelectedPair()
|
|
if err != nil {
|
|
return unknown
|
|
}
|
|
|
|
if p.Remote.Typ == webrtc.ICECandidateTypeRelay {
|
|
return types.ICEConnectionTypeTURN
|
|
} else if p.Remote.Typ == webrtc.ICECandidateTypePrflx {
|
|
// if the remote relay candidate pings us *before* we get a relay candidate,
|
|
// Pion would have created a prflx candidate with the same address as the relay candidate.
|
|
// to report an accurate connection type, we'll compare to see if existing relay candidates match
|
|
t.lock.RLock()
|
|
allowedRemoteCandidates := t.allowedRemoteCandidates
|
|
t.lock.RUnlock()
|
|
|
|
for _, ci := range allowedRemoteCandidates {
|
|
candidateValue := strings.TrimPrefix(ci, "candidate:")
|
|
candidate, err := ice.UnmarshalCandidate(candidateValue)
|
|
if err == nil && candidate.Type() == ice.CandidateTypeRelay {
|
|
if p.Remote.Address == candidate.Address() &&
|
|
p.Remote.Port == uint16(candidate.Port()) &&
|
|
p.Remote.Protocol.String() == candidate.NetworkType().NetworkShort() {
|
|
return types.ICEConnectionTypeTURN
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if p.Remote.Protocol == webrtc.ICEProtocolTCP {
|
|
return types.ICEConnectionTypeTCP
|
|
}
|
|
return types.ICEConnectionTypeUDP
|
|
}
|
|
|
|
func (t *PCTransport) preparePC(previousAnswer webrtc.SessionDescription) error {
|
|
// sticky data channel to first m-lines, if someday we don't send sdp without media streams to
|
|
// client's subscribe pc after joining, should change this step
|
|
parsed, err := previousAnswer.Unmarshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fp, fpHahs, err := lksdp.ExtractFingerprint(parsed)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
offer, err := t.pc.CreateOffer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := t.pc.SetLocalDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
|
|
//
|
|
// Simulate client side peer connection and set DTLS role from previous answer.
|
|
// Role needs to be set properly (one side needs to be server and the other side
|
|
// needs to be the client) for DTLS connection to form properly. As this is
|
|
// trying to replicate previous setup, read from previous answer and use that role.
|
|
//
|
|
se := webrtc.SettingEngine{}
|
|
se.SetAnsweringDTLSRole(lksdp.ExtractDTLSRole(parsed))
|
|
api := webrtc.NewAPI(
|
|
webrtc.WithSettingEngine(se),
|
|
webrtc.WithMediaEngine(t.me),
|
|
)
|
|
pc2, err := api.NewPeerConnection(webrtc.Configuration{
|
|
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer pc2.Close()
|
|
|
|
if err := pc2.SetRemoteDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
ans, err := pc2.CreateAnswer(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// replace client's fingerprint into dump pc's answer, for pion's dtls process, it will
|
|
// keep the fingerprint at first call of SetRemoteDescription, if dumb pc and client pc use
|
|
// different fingerprint, that will cause pion denied dtls data after handshake with client
|
|
// complete (can't pass fingerprint change).
|
|
// in this step, we don't established connection with dump pc(no candidate swap), just use
|
|
// sdp negotiation to sticky data channel and keep client's fingerprint
|
|
parsedAns, _ := ans.Unmarshal()
|
|
fpLine := fpHahs + " " + fp
|
|
replaceFP := func(attrs []sdp.Attribute, fpLine string) {
|
|
for k := range attrs {
|
|
if attrs[k].Key == "fingerprint" {
|
|
attrs[k].Value = fpLine
|
|
}
|
|
}
|
|
}
|
|
replaceFP(parsedAns.Attributes, fpLine)
|
|
for _, m := range parsedAns.MediaDescriptions {
|
|
replaceFP(m.Attributes, fpLine)
|
|
}
|
|
bytes, err := parsedAns.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ans.SDP = string(bytes)
|
|
|
|
return t.pc.SetRemoteDescription(ans)
|
|
}
|
|
|
|
func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDescription) (map[string]*webrtc.RTPSender, error) {
|
|
senders := make(map[string]*webrtc.RTPSender)
|
|
parsed, err := previousAnswer.Unmarshal()
|
|
if err != nil {
|
|
return senders, err
|
|
}
|
|
for _, m := range parsed.MediaDescriptions {
|
|
var codecType webrtc.RTPCodecType
|
|
switch m.MediaName.Media {
|
|
case "video":
|
|
codecType = webrtc.RTPCodecTypeVideo
|
|
case "audio":
|
|
codecType = webrtc.RTPCodecTypeAudio
|
|
case "application":
|
|
// for pion generate unmatched sdp, it always appends data channel to last m-lines,
|
|
// that not consistent with our previous answer that data channel might at middle-line
|
|
// because sdp can negotiate multi times before migration.(it will sticky to the last m-line atfirst negotiate)
|
|
// so use a dumb pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer
|
|
if err := t.preparePC(previousAnswer); err != nil {
|
|
t.params.Logger.Errorw("prepare pc for migration failed", err)
|
|
return senders, err
|
|
}
|
|
continue
|
|
default:
|
|
continue
|
|
}
|
|
tr, err := t.pc.AddTransceiverFromKind(codecType, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly})
|
|
if err != nil {
|
|
return senders, err
|
|
}
|
|
mid := lksdp.GetMidValue(m)
|
|
if mid == "" {
|
|
return senders, ErrMidNotFound
|
|
}
|
|
tr.SetMid(mid)
|
|
|
|
// save mid -> senders for migration resue
|
|
sender := tr.Sender()
|
|
senders[mid] = sender
|
|
|
|
// set transceiver to inactive
|
|
tr.SetSender(tr.Sender(), nil)
|
|
}
|
|
return senders, nil
|
|
}
|
|
|
|
func (t *PCTransport) SetPreviousSdp(offer, answer *webrtc.SessionDescription) {
|
|
t.lock.Lock()
|
|
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
|
|
t.previousAnswer = answer
|
|
if senders, err := t.initPCWithPreviousAnswer(*t.previousAnswer); err != nil {
|
|
t.params.Logger.Errorw("initPCWithPreviousAnswer failed", err)
|
|
t.lock.Unlock()
|
|
|
|
if onNegotiationFailed := t.getOnNegotiationFailed(); onNegotiationFailed != nil {
|
|
onNegotiationFailed()
|
|
}
|
|
return
|
|
} else if offer != nil {
|
|
// in migration case, can't reuse tranceiver before negotiated except track subscribed at previous node
|
|
t.canReuseTransceiver = false
|
|
if err := t.parseTrackMid(*offer, senders); err != nil {
|
|
t.params.Logger.Errorw("parse previous offer failed", err, "offer", offer.SDP)
|
|
}
|
|
}
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
func (t *PCTransport) parseTrackMid(offer webrtc.SessionDescription, senders map[string]*webrtc.RTPSender) error {
|
|
parsed, err := offer.Unmarshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.previousTrackDescription = make(map[string]*trackDescription)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
msid, ok := m.Attribute(sdp.AttrKeyMsid)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if split := strings.Split(msid, " "); len(split) == 2 {
|
|
trackid := split[1]
|
|
mid := lksdp.GetMidValue(m)
|
|
if mid == "" {
|
|
return ErrMidNotFound
|
|
}
|
|
t.previousTrackDescription[trackid] = &trackDescription{
|
|
mid: mid,
|
|
sender: senders[mid],
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) postEvent(event event) {
|
|
t.eventChMu.RLock()
|
|
if t.isClosed.Load() {
|
|
t.eventChMu.RUnlock()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case t.eventCh <- event:
|
|
default:
|
|
t.params.Logger.Warnw("event queue full", nil, "event", event.String())
|
|
}
|
|
t.eventChMu.RUnlock()
|
|
}
|
|
|
|
func (t *PCTransport) processEvents() {
|
|
for event := range t.eventCh {
|
|
err := t.handleEvent(&event)
|
|
if err != nil {
|
|
t.params.Logger.Errorw("error handling event", err, "event", event.String())
|
|
if onNegotiationFailed := t.getOnNegotiationFailed(); onNegotiationFailed != nil {
|
|
onNegotiationFailed()
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
t.clearSignalStateCheckTimer()
|
|
t.params.Logger.Debugw("leaving events processor")
|
|
}
|
|
|
|
func (t *PCTransport) handleEvent(e *event) error {
|
|
switch e.signal {
|
|
case signalICEGatheringComplete:
|
|
return t.handleICEGatheringComplete(e)
|
|
case signalLocalICECandidate:
|
|
return t.handleLocalICECandidate(e)
|
|
case signalRemoteICECandidate:
|
|
return t.handleRemoteICECandidate(e)
|
|
case signalLogICECandidates:
|
|
return t.handleLogICECandidates(e)
|
|
case signalSendOffer:
|
|
return t.handleSendOffer(e)
|
|
case signalRemoteDescriptionReceived:
|
|
return t.handleRemoteDescriptionReceived(e)
|
|
case signalICERestart:
|
|
return t.handleICERestart(e)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringComplete(e *event) error {
|
|
if t.params.IsOfferer {
|
|
return t.handleICEGatheringCompleteOfferer()
|
|
} else {
|
|
return t.handleICEGatheringCompleteAnswerer()
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringCompleteOfferer() error {
|
|
if !t.restartAfterGathering {
|
|
return nil
|
|
}
|
|
|
|
t.params.Logger.Debugw("restarting ICE after ICE gathering")
|
|
t.restartAfterGathering = false
|
|
return t.doICERestart()
|
|
}
|
|
|
|
func (t *PCTransport) handleICEGatheringCompleteAnswerer() error {
|
|
if t.pendingRestartIceOffer == nil {
|
|
return nil
|
|
}
|
|
|
|
offer := *t.pendingRestartIceOffer
|
|
t.pendingRestartIceOffer = nil
|
|
|
|
t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering")
|
|
if err := t.setRemoteDescription(offer); err != nil {
|
|
return err
|
|
}
|
|
|
|
return t.createAndSendAnswer()
|
|
}
|
|
|
|
func (t *PCTransport) localDescriptionSent() error {
|
|
if !t.cacheLocalCandidates {
|
|
return nil
|
|
}
|
|
|
|
t.cacheLocalCandidates = false
|
|
|
|
cachedLocalCandidates := t.cachedLocalCandidates
|
|
t.cachedLocalCandidates = nil
|
|
|
|
if onICECandidate := t.getOnICECandidate(); onICECandidate != nil {
|
|
for _, c := range cachedLocalCandidates {
|
|
if err := onICECandidate(c); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return ErrNoICECandidateHandler
|
|
}
|
|
|
|
func (t *PCTransport) clearLocalDescriptionSent() {
|
|
t.cacheLocalCandidates = true
|
|
t.cachedLocalCandidates = nil
|
|
|
|
t.allowedLocalCandidates = nil
|
|
t.lock.Lock()
|
|
t.allowedRemoteCandidates = nil
|
|
t.lock.Unlock()
|
|
t.filteredLocalCandidates = nil
|
|
t.filteredRemoteCandidates = nil
|
|
}
|
|
|
|
func (t *PCTransport) handleLocalICECandidate(e *event) error {
|
|
c := e.data.(*webrtc.ICECandidate)
|
|
|
|
filtered := false
|
|
if t.preferTCP.Load() && c != nil && c.Protocol != webrtc.ICEProtocolTCP {
|
|
cstr := c.String()
|
|
t.params.Logger.Debugw("filtering out local candidate", "candidate", cstr)
|
|
t.filteredLocalCandidates = append(t.filteredLocalCandidates, cstr)
|
|
filtered = true
|
|
}
|
|
|
|
if filtered {
|
|
return nil
|
|
}
|
|
|
|
if c != nil {
|
|
t.allowedLocalCandidates = append(t.allowedLocalCandidates, c.String())
|
|
}
|
|
if t.cacheLocalCandidates {
|
|
t.cachedLocalCandidates = append(t.cachedLocalCandidates, c)
|
|
return nil
|
|
}
|
|
|
|
if onICECandidate := t.getOnICECandidate(); onICECandidate != nil {
|
|
return onICECandidate(c)
|
|
}
|
|
|
|
return ErrNoICECandidateHandler
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteICECandidate(e *event) error {
|
|
c := e.data.(*webrtc.ICECandidateInit)
|
|
|
|
filtered := false
|
|
if t.preferTCP.Load() && !strings.Contains(c.Candidate, "tcp") {
|
|
t.params.Logger.Debugw("filtering out remote candidate", "candidate", c.Candidate)
|
|
t.filteredRemoteCandidates = append(t.filteredRemoteCandidates, c.Candidate)
|
|
filtered = true
|
|
}
|
|
|
|
if filtered {
|
|
return nil
|
|
}
|
|
|
|
t.lock.Lock()
|
|
t.allowedRemoteCandidates = append(t.allowedRemoteCandidates, c.Candidate)
|
|
t.lock.Unlock()
|
|
|
|
if t.pc.RemoteDescription() == nil {
|
|
t.pendingRemoteCandidates = append(t.pendingRemoteCandidates, c)
|
|
return nil
|
|
}
|
|
|
|
if err := t.pc.AddICECandidate(*c); err != nil {
|
|
return errors.Wrap(err, "add ice candidate failed")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) handleLogICECandidates(e *event) error {
|
|
t.params.Logger.Infow(
|
|
"ice candidates",
|
|
"lc", t.allowedLocalCandidates,
|
|
"rc", t.allowedRemoteCandidates,
|
|
"lc (filtered)", t.filteredLocalCandidates,
|
|
"rc (filtered)", t.filteredRemoteCandidates,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) setNegotiationState(state NegotiationState) {
|
|
t.negotiationState = state
|
|
if onNegotiationStateChanged := t.getOnNegotiationStateChanged(); onNegotiationStateChanged != nil {
|
|
onNegotiationStateChanged(t.negotiationState)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP bool) webrtc.SessionDescription {
|
|
parsed, err := sd.Unmarshal()
|
|
if err != nil {
|
|
t.params.Logger.Errorw("could not unmarshal SDP to filter candidates", err)
|
|
return sd
|
|
}
|
|
|
|
filterAttributes := func(attrs []sdp.Attribute) []sdp.Attribute {
|
|
filteredAttrs := make([]sdp.Attribute, 0, len(attrs))
|
|
for _, a := range attrs {
|
|
if a.Key == sdp.AttrKeyCandidate {
|
|
if preferTCP {
|
|
if strings.Contains(a.Value, "tcp") {
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
}
|
|
} else {
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
}
|
|
} else {
|
|
filteredAttrs = append(filteredAttrs, a)
|
|
}
|
|
}
|
|
|
|
return filteredAttrs
|
|
}
|
|
|
|
parsed.Attributes = filterAttributes(parsed.Attributes)
|
|
for _, m := range parsed.MediaDescriptions {
|
|
m.Attributes = filterAttributes(m.Attributes)
|
|
}
|
|
|
|
bytes, err := parsed.Marshal()
|
|
if err != nil {
|
|
t.params.Logger.Errorw("could not marshal SDP to filter candidates", err)
|
|
return sd
|
|
}
|
|
sd.SDP = string(bytes)
|
|
return sd
|
|
}
|
|
|
|
func (t *PCTransport) clearSignalStateCheckTimer() {
|
|
if t.signalStateCheckTimer != nil {
|
|
t.signalStateCheckTimer.Stop()
|
|
t.signalStateCheckTimer = nil
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) setupSignalStateCheckTimer() {
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
negotiateVersion := t.negotiateCounter.Inc()
|
|
t.signalStateCheckTimer = time.AfterFunc(negotiationFailedTimeout, func() {
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
failed := t.negotiationState != NegotiationStateNone
|
|
|
|
if t.negotiateCounter.Load() == negotiateVersion && failed {
|
|
if onNegotiationFailed := t.getOnNegotiationFailed(); onNegotiationFailed != nil {
|
|
onNegotiationFailed()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
t.params.Logger.Warnw("trying to send offer on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
// when there's an ongoing negotiation, let it finish and not disrupt its state
|
|
if t.negotiationState == NegotiationStateRemote {
|
|
t.params.Logger.Infow("skipping negotiation, trying again later")
|
|
t.setNegotiationState(NegotiationStateRetry)
|
|
return nil
|
|
} else if t.negotiationState == NegotiationStateRetry {
|
|
// already set to retry, we can safely skip this attempt
|
|
return nil
|
|
}
|
|
|
|
ensureICERestart := func(options *webrtc.OfferOptions) *webrtc.OfferOptions {
|
|
if options == nil {
|
|
options = &webrtc.OfferOptions{}
|
|
}
|
|
options.ICERestart = true
|
|
return options
|
|
}
|
|
|
|
t.lock.Lock()
|
|
if t.previousAnswer != nil {
|
|
t.previousAnswer = nil
|
|
options = ensureICERestart(options)
|
|
t.params.Logger.Infow("ice restart due to previous answer")
|
|
}
|
|
t.lock.Unlock()
|
|
|
|
if t.restartAtNextOffer {
|
|
t.restartAtNextOffer = false
|
|
options = ensureICERestart(options)
|
|
t.params.Logger.Infow("ice restart at next offer")
|
|
}
|
|
|
|
if options != nil && options.ICERestart {
|
|
t.clearLocalDescriptionSent()
|
|
}
|
|
|
|
offer, err := t.pc.CreateOffer(options)
|
|
if err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
|
|
return errors.Wrap(err, "create offer failed")
|
|
}
|
|
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local offer (unfiltered)", "sdp", offer.SDP)
|
|
}
|
|
|
|
err = t.pc.SetLocalDescription(offer)
|
|
if err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
|
|
return errors.Wrap(err, "setting local description failed")
|
|
}
|
|
|
|
//
|
|
// Filter after setting local description as pion expects the offer
|
|
// to match between CreateOffer and SetLocalDescription.
|
|
// Filtered offer is sent to remote so that remote does not
|
|
// see filtered candidates.
|
|
//
|
|
offer = t.filterCandidates(offer, preferTCP)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local offer (filtered)", "sdp", offer.SDP)
|
|
}
|
|
|
|
// indicate waiting for remote
|
|
t.setNegotiationState(NegotiationStateRemote)
|
|
|
|
t.setupSignalStateCheckTimer()
|
|
|
|
if onOffer := t.getOnOffer(); onOffer != nil {
|
|
if err := onOffer(offer); err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
|
|
return errors.Wrap(err, "could not send offer")
|
|
}
|
|
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
|
|
return t.localDescriptionSent()
|
|
}
|
|
|
|
return ErrNoOfferHandler
|
|
}
|
|
|
|
func (t *PCTransport) handleSendOffer(e *event) error {
|
|
return t.createAndSendOffer(nil)
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteDescriptionReceived(e *event) error {
|
|
sd := e.data.(*webrtc.SessionDescription)
|
|
if sd.Type == webrtc.SDPTypeOffer {
|
|
return t.handleRemoteOfferReceived(sd)
|
|
} else {
|
|
return t.handleRemoteAnswerReceived(sd)
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) isRemoteOfferRestartICE(sd *webrtc.SessionDescription) (string, bool, error) {
|
|
parsed, err := sd.Unmarshal()
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
user, pwd, err := lksdp.ExtractICECredential(parsed)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
|
|
credential := fmt.Sprintf("%s:%s", user, pwd)
|
|
// ice credential changed, remote offer restart ice
|
|
restartICE := t.currentOfferIceCredential != "" && t.currentOfferIceCredential != credential
|
|
return credential, restartICE, nil
|
|
}
|
|
|
|
func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error {
|
|
// filter before setting remote description so that pion does not see filtered remote candidates
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP)
|
|
}
|
|
sd = t.filterCandidates(sd, preferTCP)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP)
|
|
}
|
|
|
|
if err := t.pc.SetRemoteDescription(sd); err != nil {
|
|
sdpType := "offer"
|
|
if sd.Type == webrtc.SDPTypeAnswer {
|
|
sdpType = "answer"
|
|
}
|
|
prometheus.ServiceOperationCounter.WithLabelValues(sdpType, "error", "remote_description").Add(1)
|
|
return errors.Wrap(err, "setting remote description failed")
|
|
} else if sd.Type == webrtc.SDPTypeAnswer {
|
|
t.lock.Lock()
|
|
if !t.canReuseTransceiver {
|
|
t.canReuseTransceiver = true
|
|
t.previousTrackDescription = make(map[string]*trackDescription)
|
|
}
|
|
t.lock.Unlock()
|
|
}
|
|
|
|
for _, c := range t.pendingRemoteCandidates {
|
|
if err := t.pc.AddICECandidate(*c); err != nil {
|
|
return errors.Wrap(err, "add ice candidate failed")
|
|
}
|
|
}
|
|
t.pendingRemoteCandidates = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) createAndSendAnswer() error {
|
|
answer, err := t.pc.CreateAnswer(nil)
|
|
if err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1)
|
|
return errors.Wrap(err, "create answer failed")
|
|
}
|
|
|
|
preferTCP := t.preferTCP.Load()
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local answer (unfiltered)", "sdp", answer.SDP)
|
|
}
|
|
|
|
if err = t.pc.SetLocalDescription(answer); err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
|
|
return errors.Wrap(err, "setting local description failed")
|
|
}
|
|
|
|
//
|
|
// Filter after setting local description as pion expects the answer
|
|
// to match between CreateAnswer and SetLocalDescription.
|
|
// Filtered answer is sent to remote so that remote does not
|
|
// see filtered candidates.
|
|
//
|
|
answer = t.filterCandidates(answer, preferTCP)
|
|
if preferTCP {
|
|
t.params.Logger.Debugw("local answer (filtered)", "sdp", answer.SDP)
|
|
}
|
|
|
|
if onAnswer := t.getOnAnswer(); onAnswer != nil {
|
|
if err := onAnswer(answer); err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
|
|
return errors.Wrap(err, "could not send answer")
|
|
}
|
|
|
|
prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1)
|
|
return t.localDescriptionSent()
|
|
}
|
|
|
|
return ErrNoAnswerHandler
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteOfferReceived(sd *webrtc.SessionDescription) error {
|
|
iceCredential, offerRestartICE, err := t.isRemoteOfferRestartICE(sd)
|
|
if err != nil {
|
|
return errors.Wrap(err, "check remote offer restart ice failed")
|
|
}
|
|
|
|
if offerRestartICE && t.pendingRestartIceOffer == nil {
|
|
t.clearLocalDescriptionSent()
|
|
}
|
|
|
|
if offerRestartICE && t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
|
|
t.params.Logger.Debugw("remote offer restart ice while ice gathering")
|
|
t.pendingRestartIceOffer = sd
|
|
return nil
|
|
}
|
|
|
|
if err := t.setRemoteDescription(*sd); err != nil {
|
|
return err
|
|
}
|
|
|
|
if t.currentOfferIceCredential == "" || offerRestartICE {
|
|
t.currentOfferIceCredential = iceCredential
|
|
}
|
|
|
|
return t.createAndSendAnswer()
|
|
}
|
|
|
|
func (t *PCTransport) handleRemoteAnswerReceived(sd *webrtc.SessionDescription) error {
|
|
if err := t.setRemoteDescription(*sd); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.clearSignalStateCheckTimer()
|
|
|
|
if t.negotiationState == NegotiationStateRetry {
|
|
t.setNegotiationState(NegotiationStateNone)
|
|
|
|
t.params.Logger.Debugw("re-negotiate after receiving answer")
|
|
return t.createAndSendOffer(nil)
|
|
}
|
|
|
|
t.setNegotiationState(NegotiationStateNone)
|
|
return nil
|
|
}
|
|
|
|
func (t *PCTransport) doICERestart() error {
|
|
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
|
t.params.Logger.Warnw("trying to restart ICE on closed peer connection", nil)
|
|
return nil
|
|
}
|
|
|
|
// if restart is requested, and we are not ready, then continue afterwards
|
|
if t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
|
|
t.params.Logger.Debugw("deferring ICE restart to after gathering")
|
|
t.restartAfterGathering = true
|
|
return nil
|
|
}
|
|
|
|
if t.negotiationState == NegotiationStateNone {
|
|
return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
|
|
}
|
|
|
|
currentRemoteDescription := t.pc.CurrentRemoteDescription()
|
|
if currentRemoteDescription == nil {
|
|
// restart without current remote description, send current local description again to try recover
|
|
offer := t.pc.LocalDescription()
|
|
if offer == nil {
|
|
// it should not happen, log just in case
|
|
t.params.Logger.Warnw("ice restart without local offer", nil)
|
|
return ErrIceRestartWithoutLocalSDP
|
|
} else {
|
|
t.params.Logger.Infow("deferring ice restart to next offer")
|
|
t.setNegotiationState(NegotiationStateRetry)
|
|
t.restartAtNextOffer = true
|
|
if onOffer := t.getOnOffer(); onOffer != nil {
|
|
err := onOffer(*offer)
|
|
if err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
|
|
} else {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
|
|
}
|
|
return err
|
|
}
|
|
return ErrNoOfferHandler
|
|
}
|
|
} else {
|
|
// recover by re-applying the last answer
|
|
t.params.Logger.Infow("recovering from client negotiation state on ICE restart")
|
|
if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil {
|
|
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
|
|
return errors.Wrap(err, "set remote description failed")
|
|
} else {
|
|
t.setNegotiationState(NegotiationStateNone)
|
|
return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *PCTransport) handleICERestart(e *event) error {
|
|
return t.doICERestart()
|
|
}
|
|
|
|
// configure subscriber tranceiver for audio stereo
|
|
func configureTransceiverStereo(tr *webrtc.RTPTransceiver, stereo bool) {
|
|
sender := tr.Sender()
|
|
if sender == nil {
|
|
return
|
|
}
|
|
// enable stereo
|
|
codecs := sender.GetParameters().Codecs
|
|
configCodecs := make([]webrtc.RTPCodecParameters, 0, len(codecs))
|
|
for _, c := range codecs {
|
|
if strings.EqualFold(c.MimeType, webrtc.MimeTypeOpus) {
|
|
c.SDPFmtpLine = strings.ReplaceAll(c.SDPFmtpLine, ";sprop-stereo=1", "")
|
|
if stereo {
|
|
c.SDPFmtpLine += ";sprop-stereo=1"
|
|
}
|
|
}
|
|
configCodecs = append(configCodecs, c)
|
|
}
|
|
|
|
tr.SetCodecPreferences(configCodecs)
|
|
}
|