Files
livekit/pkg/rtc/transport.go
David Colburn 0b8a180554 Code inspection (#581)
* Code inspection

* fix [4]int64 conversiong
2022-03-30 13:49:53 -07:00

587 lines
17 KiB
Go

package rtc
import (
"errors"
"strings"
"sync"
"time"
"github.com/bep/debounce"
"github.com/go-logr/logr"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/interceptor/pkg/twcc"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
serverlogger "github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
const (
negotiationFrequency = 150 * time.Millisecond
dtlsRetransmissionInterval = 100 * time.Millisecond
iceDisconnectedTimeout = 10 * time.Second // compatible for ice-lite with firefox client
iceFailedTimeout = 25 * time.Second // pion's default
iceKeepaliveInterval = 2 * time.Second // pion's default
)
const (
negotiationStateNone = iota
// waiting for client answer
negotiationStateClient
// need to Negotiate again
negotiationRetry
)
type SimulcastTrackInfo struct {
Mid string
Rid string
}
// PCTransport is a wrapper around PeerConnection, with some helper methods
type PCTransport struct {
pc *webrtc.PeerConnection
me *webrtc.MediaEngine
lock sync.Mutex
pendingCandidates []webrtc.ICECandidateInit
debouncedNegotiate func(func())
onOffer func(offer webrtc.SessionDescription)
restartAfterGathering bool
negotiationState int
// stream allocator for subscriber PC
streamAllocator *sfu.StreamAllocator
logger logger.Logger
previousAnswer *webrtc.SessionDescription
}
type TransportParams struct {
ParticipantID livekit.ParticipantID
ParticipantIdentity livekit.ParticipantIdentity
ProtocolVersion types.ProtocolVersion
Target livekit.SignalTarget
Config *WebRTCConfig
CongestionControlConfig config.CongestionControlConfig
Telemetry telemetry.TelemetryService
EnabledCodecs []*livekit.Codec
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
}
func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimator cc.BandwidthEstimator)) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
var directionConfig DirectionConfig
if params.Target == livekit.SignalTarget_PUBLISHER {
directionConfig = params.Config.Publisher
} else {
directionConfig = params.Config.Subscriber
}
me, err := createMediaEngine(params.EnabledCodecs, directionConfig)
if err != nil {
return nil, nil, err
}
se := params.Config.SettingEngine
se.DisableMediaEngineCopy(true)
//
// Disable SRTP replay protection (https://datatracker.ietf.org/doc/html/rfc3711#page-15).
// Needed due to lack of RTX stream support in Pion.
//
// When clients probe for bandwidth, there are several possible approaches
// 1. Use padding packet (Chrome uses this)
// 2. Use an older packet (Firefox uses this)
// Typically, these are sent over the RTX stream and hence SRTP replay protection will not
// trigger. As Pion does not support RTX, when firefox uses older packet for probing, they
// trigger the replay protection.
//
// That results in two issues
// - Firefox bandwidth probing is not successful
// - Pion runs out of read buffer capacity - this potentially looks like a Pion issue
//
// NOTE: It is not required to disable RTCP replay protection, but doing it to be symmetric.
//
se.DisableSRTPReplayProtection(true)
se.DisableSRTCPReplayProtection(true)
if !params.ProtocolVersion.SupportsICELite() {
se.SetLite(false)
}
se.SetDTLSRetransmissionInterval(dtlsRetransmissionInterval)
se.SetICETimeouts(iceDisconnectedTimeout, iceFailedTimeout, iceKeepaliveInterval)
lf := serverlogger.NewLoggerFactory(logr.Logger(params.Logger))
if lf != nil {
se.LoggerFactory = lf
}
ir := &interceptor.Registry{}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
isSendSideBWE := false
for _, ext := range directionConfig.RTPHeaderExtension.Video {
if ext == sdp.TransportCCURI {
isSendSideBWE = true
break
}
}
for _, ext := range directionConfig.RTPHeaderExtension.Audio {
if ext == sdp.TransportCCURI {
isSendSideBWE = true
break
}
}
if isSendSideBWE {
gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
gcc.SendSideBWEInitialBitrate(1*1000*1000),
gcc.SendSideBWEPacer(gcc.NewNoOpPacer()),
)
})
if err == nil {
gf.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
if onBandwidthEstimator != nil {
onBandwidthEstimator(estimator)
}
})
ir.Add(gf)
tf, err := twcc.NewHeaderExtensionInterceptor()
if err == nil {
ir.Add(tf)
}
}
}
}
if len(params.SimTracks) > 0 {
f, err := NewUnhandleSimulcastInterceptorFactory(UnhandleSimulcastTracks(params.SimTracks))
if err != nil {
params.Logger.Errorw("NewUnhandleSimulcastInterceptorFactory failed", err)
} else {
ir.Add(f)
}
}
api := webrtc.NewAPI(
webrtc.WithMediaEngine(me),
webrtc.WithSettingEngine(se),
webrtc.WithInterceptorRegistry(ir),
)
pc, err := api.NewPeerConnection(params.Config.Configuration)
return pc, me, err
}
func NewPCTransport(params TransportParams) (*PCTransport, error) {
var bwe cc.BandwidthEstimator
pc, me, err := newPeerConnection(params, func(estimator cc.BandwidthEstimator) {
bwe = estimator
})
if err != nil {
return nil, err
}
t := &PCTransport{
pc: pc,
me: me,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: negotiationStateNone,
logger: params.Logger,
}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{
Config: params.CongestionControlConfig,
Logger: params.Logger,
})
t.streamAllocator.Start()
if bwe != nil {
t.streamAllocator.SetBandwidthEstimator(bwe)
}
}
t.pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
if state == webrtc.ICEGathererStateComplete {
go func() {
t.lock.Lock()
defer t.lock.Unlock()
if t.restartAfterGathering {
params.Logger.Debugw("restarting ICE after ICE gathering")
if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
params.Logger.Warnw("could not restart ICE", err)
}
}
}()
}
})
return t, nil
}
func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) error {
if t.pc.RemoteDescription() == nil {
t.lock.Lock()
t.pendingCandidates = append(t.pendingCandidates, candidate)
t.lock.Unlock()
return nil
}
t.logger.Debugw("add candidate ", "candidate", candidate.Candidate)
return t.pc.AddICECandidate(candidate)
}
func (t *PCTransport) PeerConnection() *webrtc.PeerConnection {
return t.pc
}
func (t *PCTransport) Close() {
if t.streamAllocator != nil {
t.streamAllocator.Stop()
}
_ = t.pc.Close()
}
func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
t.lock.Lock()
defer t.lock.Unlock()
if err := t.pc.SetRemoteDescription(sd); err != nil {
return err
}
// negotiated, reset flag
lastState := t.negotiationState
t.negotiationState = negotiationStateNone
for _, c := range t.pendingCandidates {
if err := t.pc.AddICECandidate(c); err != nil {
return err
}
}
t.pendingCandidates = nil
// only initiate when we are the offerer
if lastState == negotiationRetry && sd.Type == webrtc.SDPTypeAnswer {
t.logger.Debugw("re-negotiate after answering")
if err := t.createAndSendOffer(nil); err != nil {
t.logger.Errorw("could not negotiate", err)
}
}
return nil
}
// OnOffer is called when the PeerConnection starts negotiation and prepares an offer
func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) {
t.onOffer = f
}
func (t *PCTransport) Negotiate() {
t.debouncedNegotiate(func() {
if err := t.CreateAndSendOffer(nil); err != nil {
t.logger.Errorw("could not negotiate", err)
}
})
}
func (t *PCTransport) CreateAndSendOffer(options *webrtc.OfferOptions) error {
t.lock.Lock()
defer t.lock.Unlock()
return t.createAndSendOffer(options)
}
// creates and sends offer assuming lock has been acquired
func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
if t.onOffer == nil {
return nil
}
if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
return nil
}
iceRestart := options != nil && options.ICERestart
// if restart is requested, and we are not ready, then continue afterwards
if iceRestart {
if t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
t.logger.Debugw("restart ICE after gathering")
t.restartAfterGathering = true
return nil
}
t.logger.Debugw("restarting ICE")
}
// when there's an ongoing negotiation, let it finish and not disrupt its state
if t.negotiationState == negotiationStateClient {
currentSD := t.pc.CurrentRemoteDescription()
if iceRestart && currentSD != nil {
t.logger.Debugw("recovering from client negotiation state")
if err := t.pc.SetRemoteDescription(*currentSD); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
return err
}
} else {
t.logger.Debugw("skipping negotiation, trying again later")
t.negotiationState = negotiationRetry
return nil
}
} else if t.negotiationState == negotiationRetry {
// already set to retry, we can safely skip this attempt
return nil
}
if t.previousAnswer != nil {
t.previousAnswer = nil
if options == nil {
options = &webrtc.OfferOptions{}
}
options.ICERestart = true
}
offer, err := t.pc.CreateOffer(options)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
t.logger.Errorw("could not create offer", err)
return err
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
t.logger.Errorw("could not set local description", err)
return err
}
// indicate waiting for client
t.negotiationState = negotiationStateClient
t.restartAfterGathering = false
go t.onOffer(offer)
return nil
}
func (t *PCTransport) preparePC(previousAnswer webrtc.SessionDescription) error {
// sticky data channel to first m-lines, if someday we don't send sdp without media streams to
// client's subscribe pc after joining, should change this step
parsed, err := previousAnswer.Unmarshal()
if err != nil {
return err
}
fp, fpHahs, err := extractFingerprint(parsed)
if err != nil {
return err
}
// for pion generate unmatched sdp, it always appends data channel to last m-lines,
// that is not consistent with our subscribe offer which data channel is first m-lines,
// so use a dumb pc to negotiate sdp with only data channel then the data channel will
// sticky to first m-lines(subsequent sdp negotiation will keep m-lines' sequence)
offer, err := t.pc.CreateOffer(nil)
if err != nil {
return err
}
t.pc.SetLocalDescription(offer)
//
// Simulate client side peer connection and set DTLS role from previous answer.
// Role needs to be set properly (one side needs to be server and the other side
// needs to be the client) for DTLS connection to form properly. As this is
// trying to replicate previous setup, read from previous answer and use that role.
//
se := webrtc.SettingEngine{}
se.SetAnsweringDTLSRole(extractDTLSRole(parsed))
api := webrtc.NewAPI(
webrtc.WithSettingEngine(se),
)
pc2, err := api.NewPeerConnection(webrtc.Configuration{
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
})
if err != nil {
return err
}
defer pc2.Close()
pc2.SetRemoteDescription(offer)
ans, err := pc2.CreateAnswer(nil)
if err != nil {
return err
}
// replace client's fingerprint into dump pc's answer, for pion's dtls process, it will
// keep the fingerprint at first call of SetRemoteDescription, if dumb pc and client pc use
// different fingerprint, that will cause pion denied dtls data after handshake with client
// complete (can't pass fingerprint change).
// in this step, we don't established connection with dump pc(no candidate swap), just use
// sdp negotiation to sticky data channel and keep client's fingerprint
parsedAns, _ := ans.Unmarshal()
fpLine := fpHahs + " " + fp
replaceFP := func(attrs []sdp.Attribute, fpLine string) {
for k := range attrs {
if attrs[k].Key == "fingerprint" {
attrs[k].Value = fpLine
}
}
}
replaceFP(parsedAns.Attributes, fpLine)
for _, m := range parsedAns.MediaDescriptions {
replaceFP(m.Attributes, fpLine)
}
bytes, err := parsedAns.Marshal()
if err != nil {
return err
}
ans.SDP = string(bytes)
return t.pc.SetRemoteDescription(ans)
}
func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDescription) error {
if err := t.preparePC(previousAnswer); err != nil {
return err
}
parsed, err := previousAnswer.Unmarshal()
if err != nil {
return err
}
for _, m := range parsed.MediaDescriptions {
var codecType webrtc.RTPCodecType
switch m.MediaName.Media {
case "video":
codecType = webrtc.RTPCodecTypeVideo
case "audio":
codecType = webrtc.RTPCodecTypeAudio
default:
continue
}
tr, err := t.pc.AddTransceiverFromKind(codecType, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
if err != nil {
return err
}
tr.Stop()
mid := getMidValue(m)
if mid == "" {
return errors.New("mid value not found")
}
tr.SetMid(mid)
}
return nil
}
func (t *PCTransport) OnStreamStateChange(f func(update *sfu.StreamStateUpdate) error) {
if t.streamAllocator == nil {
return
}
t.streamAllocator.OnStreamStateChange(f)
}
func (t *PCTransport) AddTrack(subTrack types.SubscribedTrack) {
if t.streamAllocator == nil {
return
}
t.streamAllocator.AddTrack(subTrack.DownTrack(), sfu.AddTrackParams{
Source: subTrack.MediaTrack().Source(),
IsSimulcast: subTrack.MediaTrack().IsSimulcast(),
PublisherID: subTrack.MediaTrack().PublisherID(),
})
}
func (t *PCTransport) RemoveTrack(subTrack types.SubscribedTrack) {
if t.streamAllocator == nil {
return
}
t.streamAllocator.RemoveTrack(subTrack.DownTrack())
}
func (t *PCTransport) SetPreviousAnswer(answer *webrtc.SessionDescription) {
t.lock.Lock()
defer t.lock.Unlock()
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
t.previousAnswer = answer
t.initPCWithPreviousAnswer(*t.previousAnswer)
}
}
func getMidValue(media *sdp.MediaDescription) string {
for _, attr := range media.Attributes {
if attr.Key == "mid" {
return attr.Value
}
}
return ""
}
func extractFingerprint(desc *sdp.SessionDescription) (string, string, error) {
fingerprints := make([]string, 0)
if fingerprint, haveFingerprint := desc.Attribute("fingerprint"); haveFingerprint {
fingerprints = append(fingerprints, fingerprint)
}
for _, m := range desc.MediaDescriptions {
if fingerprint, haveFingerprint := m.Attribute("fingerprint"); haveFingerprint {
fingerprints = append(fingerprints, fingerprint)
}
}
if len(fingerprints) < 1 {
return "", "", webrtc.ErrSessionDescriptionNoFingerprint
}
for _, m := range fingerprints {
if m != fingerprints[0] {
return "", "", webrtc.ErrSessionDescriptionConflictingFingerprints
}
}
parts := strings.Split(fingerprints[0], " ")
if len(parts) != 2 {
return "", "", webrtc.ErrSessionDescriptionInvalidFingerprint
}
return parts[1], parts[0], nil
}
func extractDTLSRole(desc *sdp.SessionDescription) webrtc.DTLSRole {
for _, md := range desc.MediaDescriptions {
setup, ok := md.Attribute(sdp.AttrKeyConnectionSetup)
if !ok {
continue
}
if setup == sdp.ConnectionRoleActive.String() {
return webrtc.DTLSRoleClient
}
if setup == sdp.ConnectionRolePassive.String() {
return webrtc.DTLSRoleServer
}
}
//
// If 'setup' attribute is not available, use client role
// as that is the default behaviour of answerers
//
// There seems to be some differences in how role is decided.
// libwebrtc (Chrome) code - (https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/pc/jsep_transport.cc;l=592;drc=369fb686729e7eb20d2bd09717cec14269a399d7)
// does not mention anything about ICE role when determining
// DTLS Role.
//
// But, ORTC has this - https://github.com/w3c/ortc/issues/167#issuecomment-69409953
// and pion/webrtc follows that (https://github.com/pion/webrtc/blob/e071a4eded1efd5d9b401bcfc4efacb3a2a5a53c/dtlstransport.go#L269)
//
// So if remote is ice-lite, pion will use DTLSRoleServer when answering
// while browsers pick DTLSRoleClient.
//
return webrtc.DTLSRoleClient
}