Files
livekit/pkg/rtc/transportmanager.go
Raja Subramanian eee2001a31 Set publisher codec preferences after setting remote description (#3913)
* Set publisher codec preferences after setting remote description

Munging SDP prior to setting remote description was becoming problematic
in single peer connection mode. In that mode, it is possible that a
subscribe track m-section is added which sets the fmtp of H.265 to a
value that is different from when that client publishes. That gets
locked in as negotiated codecs when pion processes remote description.
Later when the client publishes H.265, the H.265 does only partial
match. So, if we munge offer and send it to SetRemoteDescription, the
H.265 does only a partial match due to different fmtp line and that gets
put at the end of the list. So, the answer does not enforce the
preferred codec. Changing pion to put partial match up front is more
risky given other projects. So, switch codec preferences to after remote
description is set and directly operate on transceiver which is a better
place to make these changes without munging SDP.

This fixes the case of
- firefox joins first
- Chrome preferring H.265 joining next. This causes a subscribe track
  m-section (for firefox's tracks) to be created first. So, the
  preferred codec munging was not working. Works after this change.

* clean up

* mage generate

* test

* clean up
2025-09-10 18:28:36 +05:30

979 lines
29 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"context"
"io"
"math/bits"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/sctp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v4"
"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/transport"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"github.com/livekit/livekit-server/pkg/telemetry"
)
const (
failureCountThreshold = 2
preferNextByFailureWindow = time.Minute
// when RR report loss percentage over this threshold, we consider it is a unstable event
udpLossFracUnstable = 25
// if in last 32 times RR, the unstable report count over this threshold, the connection is unstable
udpLossUnstableCountThreshold = 20
)
// -------------------------------
type TransportManagerTransportHandler struct {
transport.Handler
t *TransportManager
logger logger.Logger
}
func (h TransportManagerTransportHandler) OnFailed(isShortLived bool, iceConnectionInfo *types.ICEConnectionInfo) {
if isShortLived {
h.logger.Infow("short ice connection", connectionDetailsFields([]*types.ICEConnectionInfo{iceConnectionInfo})...)
}
h.t.handleConnectionFailed(isShortLived)
h.Handler.OnFailed(isShortLived, iceConnectionInfo)
}
// -------------------------------
type TransportManagerParams struct {
SubscriberAsPrimary bool
UseSinglePeerConnection bool
Config *WebRTCConfig
Twcc *twcc.Responder
ProtocolVersion types.ProtocolVersion
CongestionControlConfig config.CongestionControlConfig
EnabledSubscribeCodecs []*livekit.Codec
EnabledPublishCodecs []*livekit.Codec
SimTracks map[uint32]SimulcastTrackInfo
ClientInfo ClientInfo
Migration bool
AllowTCPFallback bool
TCPFallbackRTTThreshold int
AllowUDPUnstableFallback bool
TURNSEnabled bool
AllowPlayoutDelay bool
DataChannelMaxBufferedAmount uint64
DatachannelSlowThreshold int
Logger logger.Logger
PublisherHandler transport.Handler
SubscriberHandler transport.Handler
DataChannelStats *telemetry.BytesTrackStats
UseOneShotSignallingMode bool
FireOnTrackBySdp bool
}
type TransportManager struct {
params TransportManagerParams
lock sync.RWMutex
publisher *PCTransport
subscriber *PCTransport
failureCount int
isTransportReconfigured bool
lastFailure time.Time
lastSignalAt time.Time
signalSourceValid atomic.Bool
pendingOfferPublisher *webrtc.SessionDescription
pendingOfferIdPublisher uint32
pendingDataChannelsPublisher []*livekit.DataChannelInfo
iceConfig *livekit.ICEConfig
mediaLossProxy *MediaLossProxy
udpLossUnstableCount uint32
signalingRTT, udpRTT uint32
onICEConfigChanged func(iceConfig *livekit.ICEConfig)
droppedBySlowReaderCount atomic.Uint32
}
func NewTransportManager(params TransportManagerParams) (*TransportManager, error) {
if params.Logger == nil {
params.Logger = logger.GetLogger()
}
t := &TransportManager{
params: params,
mediaLossProxy: NewMediaLossProxy(MediaLossProxyParams{Logger: params.Logger}),
iceConfig: &livekit.ICEConfig{},
}
t.mediaLossProxy.OnMediaLossUpdate(t.onMediaLossUpdate)
lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER)
publisher, err := NewPCTransport(TransportParams{
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
Twcc: params.Twcc,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: lgr,
SimTracks: params.SimTracks,
ClientInfo: params.ClientInfo,
IsSendSide: params.UseOneShotSignallingMode || params.UseSinglePeerConnection,
AllowPlayoutDelay: params.AllowPlayoutDelay,
Transport: livekit.SignalTarget_PUBLISHER,
Handler: params.PublisherHandler,
UseOneShotSignallingMode: params.UseOneShotSignallingMode,
DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount,
DatachannelSlowThreshold: params.DatachannelSlowThreshold,
FireOnTrackBySdp: params.FireOnTrackBySdp,
})
if err != nil {
return nil, err
}
t.publisher = publisher
if !t.params.UseOneShotSignallingMode && !t.params.UseSinglePeerConnection {
lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER)
subscriber, err := NewPCTransport(TransportParams{
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
DirectionConfig: params.Config.Subscriber,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledSubscribeCodecs,
Logger: lgr,
ClientInfo: params.ClientInfo,
IsOfferer: true,
IsSendSide: true,
AllowPlayoutDelay: params.AllowPlayoutDelay,
DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount,
DatachannelSlowThreshold: params.DatachannelSlowThreshold,
Transport: livekit.SignalTarget_SUBSCRIBER,
Handler: TransportManagerTransportHandler{params.SubscriberHandler, t, lgr},
FireOnTrackBySdp: params.FireOnTrackBySdp,
})
if err != nil {
return nil, err
}
t.subscriber = subscriber
}
if !t.params.Migration && t.params.SubscriberAsPrimary {
if err := t.createDataChannelsForSubscriber(nil); err != nil {
return nil, err
}
}
t.signalSourceValid.Store(true)
return t, nil
}
func (t *TransportManager) Close() {
if t.publisher != nil {
t.publisher.Close()
}
if t.subscriber != nil {
t.subscriber.Close()
}
}
func (t *TransportManager) SubscriberClose() {
t.subscriber.Close()
}
func (t *TransportManager) HasPublisherEverConnected() bool {
return t.publisher.HasEverConnected()
}
func (t *TransportManager) IsPublisherEstablished() bool {
return t.publisher.IsEstablished()
}
func (t *TransportManager) GetPublisherRTT() (float64, bool) {
return t.publisher.GetRTT()
}
func (t *TransportManager) GetPublisherMid(rtpReceiver *webrtc.RTPReceiver) string {
return t.publisher.GetMid(rtpReceiver)
}
func (t *TransportManager) GetPublisherRTPTransceiver(mid string) *webrtc.RTPTransceiver {
return t.publisher.GetRTPTransceiver(mid)
}
func (t *TransportManager) GetPublisherRTPReceiver(mid string) *webrtc.RTPReceiver {
return t.publisher.GetRTPReceiver(mid)
}
func (t *TransportManager) WritePublisherRTCP(pkts []rtcp.Packet) error {
return t.publisher.WriteRTCP(pkts)
}
func (t *TransportManager) GetSubscriberRTT() (float64, bool) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.GetRTT()
} else {
return t.subscriber.GetRTT()
}
}
func (t *TransportManager) HasSubscriberEverConnected() bool {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.HasEverConnected()
} else {
return t.subscriber.HasEverConnected()
}
}
func (t *TransportManager) AddTrackLocal(
trackLocal webrtc.TrackLocal,
params types.AddTrackParams,
enabledCodecs []*livekit.Codec,
rtcpFeedbackConfig RTCPFeedbackConfig,
) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.AddTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
} else {
return t.subscriber.AddTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
}
}
func (t *TransportManager) AddTransceiverFromTrackLocal(
trackLocal webrtc.TrackLocal,
params types.AddTrackParams,
enabledCodecs []*livekit.Codec,
rtcpFeedbackConfig RTCPFeedbackConfig,
) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
} else {
return t.subscriber.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
}
}
func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.RemoveTrack(sender)
} else {
return t.subscriber.RemoveTrack(sender)
}
}
func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.WriteRTCP(pkts)
} else {
return t.subscriber.WriteRTCP(pkts)
}
}
func (t *TransportManager) GetSubscriberPacer() pacer.Pacer {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
return t.publisher.GetPacer()
} else {
return t.subscriber.GetPacer()
}
}
func (t *TransportManager) AddSubscribedTrack(subTrack types.SubscribedTrack) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
t.publisher.AddTrackToStreamAllocator(subTrack)
} else {
t.subscriber.AddTrackToStreamAllocator(subTrack)
}
}
func (t *TransportManager) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
t.publisher.RemoveTrackFromStreamAllocator(subTrack)
} else {
t.subscriber.RemoveTrackFromStreamAllocator(subTrack)
}
}
func (t *TransportManager) SendDataMessage(kind livekit.DataPacket_Kind, data []byte) error {
// downstream data is sent via primary peer connection
return t.handleSendDataResult(t.getTransport(true).SendDataMessage(kind, data), kind.String(), len(data))
}
func (t *TransportManager) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error {
// downstream data is sent via primary peer connection
return t.handleSendDataResult(
t.getTransport(true).SendDataMessageUnlabeled(data, useRaw, sender),
"unlabeled",
len(data),
)
}
func (t *TransportManager) handleSendDataResult(err error, kind string, size int) error {
if err != nil {
if !utils.ErrorIsOneOf(
err,
io.ErrClosedPipe,
sctp.ErrStreamClosed,
ErrTransportFailure,
ErrDataChannelBufferFull,
context.DeadlineExceeded,
) {
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
droppedBySlowReaderCount := t.droppedBySlowReaderCount.Inc()
if (droppedBySlowReaderCount-1)%100 == 0 {
t.params.Logger.Infow(
"drop data message by slow reader",
"error", err,
"kind", kind,
"count", droppedBySlowReaderCount,
)
}
} else {
t.params.Logger.Warnw("send data message error", err)
}
}
if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) {
if t.params.SubscriberAsPrimary {
t.params.SubscriberHandler.OnDataSendError(err)
} else {
t.params.PublisherHandler.OnDataSendError(err)
}
}
} else {
t.params.DataChannelStats.AddBytes(uint64(size), true)
}
return err
}
func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error {
var (
reliableID, lossyID uint16
reliableIDPtr, lossyIDPtr *uint16
)
//
// For old version migration clients, they don't send subscriber data channel info
// so we need to create data channels with default ID and don't negotiate as client already has
// data channels with default ID.
//
// For new version migration clients, we create data channels with new ID and negotiate with client
//
for _, dc := range pendingDataChannels {
if dc.Label == ReliableDataChannel {
// pion use step 2 for auto generated ID, so we need to add 4 to avoid conflict
reliableID = uint16(dc.Id) + 4
reliableIDPtr = &reliableID
} else if dc.Label == LossyDataChannel {
lossyID = uint16(dc.Id) + 4
lossyIDPtr = &lossyID
}
}
ordered := true
negotiated := t.params.Migration && reliableIDPtr == nil
if err := t.subscriber.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
ID: reliableIDPtr,
Negotiated: &negotiated,
}); err != nil {
return err
}
ordered = false
retransmits := uint16(0)
negotiated = t.params.Migration && lossyIDPtr == nil
if err := t.subscriber.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &retransmits,
ID: lossyIDPtr,
Negotiated: &negotiated,
}); err != nil {
return err
}
return nil
}
func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescription, mediaType string) (unmatched []*sdp.MediaDescription, err error) {
var lastMatchedMid string
if lastAnswer := t.publisher.CurrentLocalDescription(); lastAnswer != nil {
parsedAnswer, err1 := lastAnswer.Unmarshal()
if err1 != nil {
// should not happen
t.params.Logger.Errorw("failed to parse last answer", err1)
return unmatched, err1
}
for i := len(parsedAnswer.MediaDescriptions) - 1; i >= 0; i-- {
media := parsedAnswer.MediaDescriptions[i]
if media.MediaName.Media == mediaType {
lastMatchedMid, _ = media.Attribute(sdp.AttrKeyMID)
break
}
}
}
for i := len(parsedOffer.MediaDescriptions) - 1; i >= 0; i-- {
media := parsedOffer.MediaDescriptions[i]
if media.MediaName.Media == mediaType {
mid, _ := media.Attribute(sdp.AttrKeyMID)
if mid == lastMatchedMid {
break
}
unmatched = append(unmatched, media)
}
}
return
}
func (t *TransportManager) LastPublisherOffer() *webrtc.SessionDescription {
return t.publisher.CurrentRemoteDescription()
}
func (t *TransportManager) LastPublisherOfferPending() *webrtc.SessionDescription {
return t.publisher.PendingRemoteDescription()
}
func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, offerId uint32, shouldPend bool) error {
t.lock.Lock()
if shouldPend {
t.pendingOfferPublisher = &offer
t.pendingOfferIdPublisher = offerId
t.lock.Unlock()
return nil
}
t.lock.Unlock()
return t.publisher.HandleRemoteDescription(offer, offerId)
}
func (t *TransportManager) GetAnswer() (webrtc.SessionDescription, uint32, error) {
return t.publisher.GetAnswer()
}
func (t *TransportManager) GetPublisherICESessionUfrag() (string, error) {
return t.publisher.GetICESessionUfrag()
}
func (t *TransportManager) HandleICETrickleSDPFragment(sdpFragment string) error {
return t.publisher.HandleICETrickleSDPFragment(sdpFragment)
}
func (t *TransportManager) HandleICERestartSDPFragment(sdpFragment string) (string, error) {
return t.publisher.HandleICERestartSDPFragment(sdpFragment)
}
func (t *TransportManager) ProcessPendingPublisherOffer() {
t.lock.Lock()
pendingOffer := t.pendingOfferPublisher
t.pendingOfferPublisher = nil
pendingOfferId := t.pendingOfferIdPublisher
t.pendingOfferIdPublisher = 0
t.lock.Unlock()
if pendingOffer != nil {
t.HandleOffer(*pendingOffer, pendingOfferId, false)
}
}
func (t *TransportManager) HandleAnswer(answer webrtc.SessionDescription, answerId uint32) {
t.subscriber.HandleRemoteDescription(answer, answerId)
}
// AddICECandidate adds candidates for remote peer
func (t *TransportManager) AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) {
switch target {
case livekit.SignalTarget_PUBLISHER:
t.publisher.AddICECandidate(candidate)
case livekit.SignalTarget_SUBSCRIBER:
t.subscriber.AddICECandidate(candidate)
default:
err := errors.New("unknown signal target")
t.params.Logger.Errorw("ice candidate for unknown signal target", err, "target", target)
}
}
func (t *TransportManager) NegotiateSubscriber(force bool) {
if t.subscriber != nil {
t.subscriber.Negotiate(force)
} else {
t.publisher.Negotiate(force)
}
}
func (t *TransportManager) HandleClientReconnect(reason livekit.ReconnectReason) {
var (
isShort bool
duration time.Duration
resetShortConnection bool
)
switch reason {
case livekit.ReconnectReason_RR_PUBLISHER_FAILED:
if t.publisher != nil {
resetShortConnection = true
isShort, duration = t.publisher.IsShortConnection(time.Now())
}
case livekit.ReconnectReason_RR_SUBSCRIBER_FAILED:
if t.subscriber != nil {
resetShortConnection = true
isShort, duration = t.subscriber.IsShortConnection(time.Now())
}
}
if isShort {
t.lock.Lock()
t.resetTransportConfigureLocked(false)
t.lock.Unlock()
t.params.Logger.Infow("short connection by client ice restart", "duration", duration, "reason", reason)
t.handleConnectionFailed(isShort)
}
if resetShortConnection {
if t.publisher != nil {
t.publisher.ResetShortConnOnICERestart()
}
if t.subscriber != nil {
t.subscriber.ResetShortConnOnICERestart()
}
}
}
func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig) error {
t.SetICEConfig(iceConfig)
if t.subscriber != nil {
return t.subscriber.ICERestart()
}
return nil
}
func (t *TransportManager) OnICEConfigChanged(f func(iceConfig *livekit.ICEConfig)) {
t.lock.Lock()
t.onICEConfigChanged = f
t.lock.Unlock()
}
func (t *TransportManager) SetICEConfig(iceConfig *livekit.ICEConfig) {
if iceConfig != nil {
t.configureICE(iceConfig, true)
}
}
func (t *TransportManager) GetICEConfig() *livekit.ICEConfig {
t.lock.RLock()
defer t.lock.RUnlock()
if t.iceConfig == nil {
return nil
}
return utils.CloneProto(t.iceConfig)
}
func (t *TransportManager) resetTransportConfigureLocked(reconfigured bool) {
t.failureCount = 0
t.isTransportReconfigured = reconfigured
t.udpLossUnstableCount = 0
t.lastFailure = time.Time{}
}
func (t *TransportManager) configureICE(iceConfig *livekit.ICEConfig, reset bool) {
t.lock.Lock()
isEqual := proto.Equal(t.iceConfig, iceConfig)
if reset || !isEqual {
t.resetTransportConfigureLocked(!reset)
}
if isEqual {
t.lock.Unlock()
return
}
t.params.Logger.Infow("setting ICE config", "iceConfig", logger.Proto(iceConfig))
onICEConfigChanged := t.onICEConfigChanged
t.iceConfig = iceConfig
t.lock.Unlock()
if iceConfig.PreferenceSubscriber != livekit.ICECandidateType_ICT_NONE {
t.mediaLossProxy.OnMediaLossUpdate(nil)
}
if t.publisher != nil {
t.publisher.SetPreferTCP(iceConfig.PreferencePublisher == livekit.ICECandidateType_ICT_TCP)
}
if t.subscriber != nil {
t.subscriber.SetPreferTCP(iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TCP)
}
if onICEConfigChanged != nil {
onICEConfigChanged(iceConfig)
}
}
func (t *TransportManager) SubscriberAsPrimary() bool {
return t.params.SubscriberAsPrimary
}
func (t *TransportManager) GetICEConnectionInfo() []*types.ICEConnectionInfo {
infos := make([]*types.ICEConnectionInfo, 0, 2)
for _, pc := range []*PCTransport{t.publisher, t.subscriber} {
if pc == nil {
continue
}
info := pc.GetICEConnectionInfo()
if info.HasCandidates() {
infos = append(infos, info)
}
}
return infos
}
func (t *TransportManager) getTransport(isPrimary bool) *PCTransport {
switch {
case t.publisher == nil:
return t.subscriber
case t.subscriber == nil:
return t.publisher
default:
pcTransport := t.publisher
if (isPrimary && t.params.SubscriberAsPrimary) || (!isPrimary && !t.params.SubscriberAsPrimary) {
pcTransport = t.subscriber
}
return pcTransport
}
}
func (t *TransportManager) getLowestPriorityConnectionType() types.ICEConnectionType {
switch {
case t.publisher == nil:
return t.subscriber.GetICEConnectionType()
case t.subscriber == nil:
return t.publisher.GetICEConnectionType()
default:
ctype := t.publisher.GetICEConnectionType()
if stype := t.subscriber.GetICEConnectionType(); stype > ctype {
ctype = stype
}
return ctype
}
}
func (t *TransportManager) handleConnectionFailed(isShortLived bool) {
if !t.params.AllowTCPFallback || t.params.UseOneShotSignallingMode {
return
}
t.lock.Lock()
if t.isTransportReconfigured {
t.lock.Unlock()
return
}
lastSignalSince := time.Since(t.lastSignalAt)
signalValid := t.signalSourceValid.Load()
if !t.hasRecentSignalLocked() || !signalValid {
// the failed might cause by network interrupt because signal closed or we have not seen any signal in the time window,
// so don't switch to next candidate type
t.params.Logger.Debugw(
"ignoring prefer candidate check by ICE failure because signal connection interrupted",
"lastSignalSince", lastSignalSince,
"signalValid", signalValid,
)
t.failureCount = 0
t.lastFailure = time.Time{}
t.lock.Unlock()
return
}
lowestPriorityConnectionType := t.getLowestPriorityConnectionType()
//
// Checking only `PreferenceSubscriber` field although any connection failure (PUBLISHER OR SUBSCRIBER) will
// flow through here.
//
// As both transports are switched to the same type on any failure, checking just subscriber should be fine.
//
getNext := func(ic *livekit.ICEConfig) livekit.ICECandidateType {
switch lowestPriorityConnectionType {
case types.ICEConnectionTypeUDP:
// try ICE/TCP if ICE/UDP failed
if ic.PreferenceSubscriber == livekit.ICECandidateType_ICT_NONE {
if t.params.ClientInfo.SupportsICETCP() && t.canUseICETCP() {
return livekit.ICECandidateType_ICT_TCP
} else if t.params.TURNSEnabled {
// fallback to TURN/TLS if TCP is not supported
return livekit.ICECandidateType_ICT_TLS
}
}
case types.ICEConnectionTypeTCP:
// try TURN/TLS if ICE/TCP failed,
// the configuration could have been ICT_NONE or ICT_TCP,
// in either case, fallback to TURN/TLS
if t.params.TURNSEnabled {
return livekit.ICECandidateType_ICT_TLS
} else {
// keep the current config
return ic.PreferenceSubscriber
}
case types.ICEConnectionTypeTURN:
// TURN/TLS is the most permissive option, if that fails there is nowhere to go to
// the configuration could have been ICT_NONE or ICT_TLS,
// keep the current config
return ic.PreferenceSubscriber
}
return livekit.ICECandidateType_ICT_NONE
}
var preferNext livekit.ICECandidateType
if isShortLived {
preferNext = getNext(t.iceConfig)
} else {
t.failureCount++
lastFailure := t.lastFailure
t.lastFailure = time.Now()
if t.failureCount < failureCountThreshold || time.Since(lastFailure) > preferNextByFailureWindow {
t.lock.Unlock()
return
}
preferNext = getNext(t.iceConfig)
}
if preferNext == t.iceConfig.PreferenceSubscriber {
t.lock.Unlock()
return
}
t.isTransportReconfigured = true
t.lock.Unlock()
switch preferNext {
case livekit.ICECandidateType_ICT_TCP:
t.params.Logger.Debugw("prefer TCP transport on both peer connections")
case livekit.ICECandidateType_ICT_TLS:
t.params.Logger.Debugw("prefer TLS transport both peer connections")
case livekit.ICECandidateType_ICT_NONE:
t.params.Logger.Debugw("allowing all transports on both peer connections")
}
// irrespective of which one fails, force prefer candidate on both as the other one might
// fail at a different time and cause another disruption
t.configureICE(&livekit.ICEConfig{
PreferenceSubscriber: preferNext,
PreferencePublisher: preferNext,
}, false)
}
func (t *TransportManager) SetMigrateInfo(
previousOffer *webrtc.SessionDescription,
previousAnswer *webrtc.SessionDescription,
dataChannels []*livekit.DataChannelInfo,
) {
t.lock.Lock()
t.pendingDataChannelsPublisher = make([]*livekit.DataChannelInfo, 0, len(dataChannels))
pendingDataChannelsSubscriber := make([]*livekit.DataChannelInfo, 0, len(dataChannels))
for _, dci := range dataChannels {
if dci.Target == livekit.SignalTarget_SUBSCRIBER {
pendingDataChannelsSubscriber = append(pendingDataChannelsSubscriber, dci)
} else {
t.pendingDataChannelsPublisher = append(t.pendingDataChannelsPublisher, dci)
}
}
t.lock.Unlock()
if t.params.SubscriberAsPrimary {
if err := t.createDataChannelsForSubscriber(pendingDataChannelsSubscriber); err != nil {
t.params.Logger.Errorw("create subscriber data channels during migration failed", err)
}
}
if t.params.UseSinglePeerConnection {
t.publisher.SetPreviousSdp(previousAnswer, previousOffer)
} else {
t.subscriber.SetPreviousSdp(previousOffer, previousAnswer)
}
}
func (t *TransportManager) ProcessPendingPublisherDataChannels() {
t.lock.Lock()
pendingDataChannels := t.pendingDataChannelsPublisher
t.pendingDataChannelsPublisher = nil
t.lock.Unlock()
ordered := true
negotiated := true
for _, ci := range pendingDataChannels {
var (
dcLabel string
dcID uint16
dcExisting bool
err error
)
if ci.Label == LossyDataChannel {
ordered = false
retransmits := uint16(0)
id := uint16(ci.GetId())
dcLabel, dcID, dcExisting, err = t.publisher.CreateDataChannelIfEmpty(LossyDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &retransmits,
Negotiated: &negotiated,
ID: &id,
})
} else if ci.Label == ReliableDataChannel {
id := uint16(ci.GetId())
dcLabel, dcID, dcExisting, err = t.publisher.CreateDataChannelIfEmpty(ReliableDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
Negotiated: &negotiated,
ID: &id,
})
}
if err != nil {
t.params.Logger.Errorw("create migrated data channel failed", err, "label", ci.Label)
} else if dcExisting {
t.params.Logger.Debugw("existing data channel during migration", "label", dcLabel, "id", dcID)
} else {
t.params.Logger.Debugw("create migrated data channel", "label", dcLabel, "id", dcID)
}
}
}
func (t *TransportManager) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
t.mediaLossProxy.HandleMaxLossFeedback(dt, report)
}
func (t *TransportManager) onMediaLossUpdate(loss uint8) {
if t.params.TCPFallbackRTTThreshold == 0 || !t.params.AllowUDPUnstableFallback {
return
}
t.lock.Lock()
t.udpLossUnstableCount <<= 1
if loss >= uint8(255*udpLossFracUnstable/100) {
t.udpLossUnstableCount |= 1
if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold {
if t.udpRTT > 0 && t.signalingRTT < uint32(float32(t.udpRTT)*1.3) && int(t.signalingRTT) < t.params.TCPFallbackRTTThreshold && t.hasRecentSignalLocked() {
t.udpLossUnstableCount = 0
t.lock.Unlock()
t.params.Logger.Infow("udp connection unstable, switch to tcp", "signalingRTT", t.signalingRTT)
if t.params.UseSinglePeerConnection {
t.params.PublisherHandler.OnFailed(true, t.publisher.GetICEConnectionInfo())
} else {
t.params.SubscriberHandler.OnFailed(true, t.subscriber.GetICEConnectionInfo())
}
return
}
}
}
t.lock.Unlock()
}
func (t *TransportManager) UpdateSignalingRTT(rtt uint32) {
t.lock.Lock()
t.signalingRTT = rtt
t.lock.Unlock()
if t.publisher != nil {
t.publisher.SetSignalingRTT(rtt)
}
if t.subscriber != nil {
t.subscriber.SetSignalingRTT(rtt)
}
// TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established
// within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp.
// Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now.
}
func (t *TransportManager) UpdateMediaRTT(rtt uint32) {
t.lock.Lock()
if t.udpRTT == 0 {
t.udpRTT = rtt
} else {
t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2)
}
t.lock.Unlock()
}
func (t *TransportManager) UpdateLastSeenSignal() {
t.lock.Lock()
t.lastSignalAt = time.Now()
t.lock.Unlock()
}
func (t *TransportManager) SinceLastSignal() time.Duration {
t.lock.RLock()
defer t.lock.RUnlock()
return time.Since(t.lastSignalAt)
}
func (t *TransportManager) LastSeenSignalAt() time.Time {
t.lock.RLock()
defer t.lock.RUnlock()
return t.lastSignalAt
}
func (t *TransportManager) canUseICETCP() bool {
return t.params.TCPFallbackRTTThreshold == 0 || int(t.signalingRTT) < t.params.TCPFallbackRTTThreshold
}
func (t *TransportManager) SetSignalSourceValid(valid bool) {
t.signalSourceValid.Store(valid)
t.params.Logger.Debugw("signal source valid", "valid", valid)
}
func (t *TransportManager) SetSubscriberAllowPause(allowPause bool) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
t.publisher.SetAllowPauseOfStreamAllocator(allowPause)
} else {
t.subscriber.SetAllowPauseOfStreamAllocator(allowPause)
}
}
func (t *TransportManager) SetSubscriberChannelCapacity(channelCapacity int64) {
if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection {
t.publisher.SetChannelCapacityOfStreamAllocator(channelCapacity)
} else {
t.subscriber.SetChannelCapacityOfStreamAllocator(channelCapacity)
}
}
func (t *TransportManager) hasRecentSignalLocked() bool {
return time.Since(t.lastSignalAt) < PingTimeoutSeconds*time.Second
}