// Copyright 2023 LiveKit, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rtc import ( "context" "io" "math/bits" "sync" "time" "github.com/pion/rtcp" "github.com/pion/sctp" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v4" "github.com/pkg/errors" "go.uber.org/atomic" "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/sfu/interceptor" "github.com/livekit/livekit-server/pkg/sfu/pacer" ) const ( failureCountThreshold = 2 preferNextByFailureWindow = time.Minute // when RR report loss percentage over this threshold, we consider it is a unstable event udpLossFracUnstable = 25 // if in last 32 times RR, the unstable report count over this threshold, the connection is unstable udpLossUnstableCountThreshold = 20 ) // ------------------------------- type TransportManagerTransportHandler struct { transport.Handler t *TransportManager logger logger.Logger } func (h TransportManagerTransportHandler) OnFailed(isShortLived bool, iceConnectionInfo *types.ICEConnectionInfo) { if isShortLived { h.logger.Infow("short ice connection", connectionDetailsFields([]*types.ICEConnectionInfo{iceConnectionInfo})...) } h.t.handleConnectionFailed(isShortLived) h.Handler.OnFailed(isShortLived, iceConnectionInfo) } // ------------------------------- type TransportManagerParams struct { SubscriberAsPrimary bool UseSinglePeerConnection bool Config *WebRTCConfig Twcc *twcc.Responder ProtocolVersion types.ProtocolVersion CongestionControlConfig config.CongestionControlConfig EnabledSubscribeCodecs []*livekit.Codec EnabledPublishCodecs []*livekit.Codec SimTracks map[uint32]interceptor.SimulcastTrackInfo ClientInfo ClientInfo Migration bool AllowTCPFallback bool TCPFallbackRTTThreshold int AllowUDPUnstableFallback bool TURNSEnabled bool AllowPlayoutDelay bool DataChannelMaxBufferedAmount uint64 DatachannelSlowThreshold int DatachannelLossyTargetLatency time.Duration Logger logger.Logger PublisherHandler transport.Handler SubscriberHandler transport.Handler DataChannelStats *BytesTrackStats UseOneShotSignallingMode bool FireOnTrackBySdp bool EnableDataTracks bool } type TransportManager struct { params TransportManagerParams lock sync.RWMutex publisher *PCTransport subscriber *PCTransport failureCount int isTransportReconfigured bool lastFailure time.Time lastSignalAt time.Time signalSourceValid atomic.Bool pendingOfferPublisher *webrtc.SessionDescription pendingOfferIdPublisher uint32 pendingDataChannelsPublisher []*livekit.DataChannelInfo iceConfig *livekit.ICEConfig mediaLossProxy *MediaLossProxy udpLossUnstableCount uint32 signalingRTT, udpRTT uint32 onICEConfigChanged func(iceConfig *livekit.ICEConfig) dataChannelSendErrorDroppedBySlowReaderCount atomic.Uint32 dataChannelSendErrorCount atomic.Uint32 } func NewTransportManager(params TransportManagerParams) (*TransportManager, error) { if params.Logger == nil { params.Logger = logger.GetLogger() } t := &TransportManager{ params: params, mediaLossProxy: NewMediaLossProxy(MediaLossProxyParams{Logger: params.Logger}), iceConfig: &livekit.ICEConfig{}, } t.mediaLossProxy.OnMediaLossUpdate(t.onMediaLossUpdate) lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER) // In single-PC mode the one PC carries both directions, so it needs both // codec lists registered on its MediaEngine. In dual-PC mode the publisher // PC is recvonly, so subscribe codecs are left nil. var publisherSubscribeCodecs []*livekit.Codec if params.UseSinglePeerConnection || params.UseOneShotSignallingMode { publisherSubscribeCodecs = params.EnabledSubscribeCodecs } publisher, err := NewPCTransport(TransportParams{ ProtocolVersion: params.ProtocolVersion, Config: params.Config, Twcc: params.Twcc, DirectionConfig: params.Config.Publisher, CongestionControlConfig: params.CongestionControlConfig, EnabledPublishCodecs: params.EnabledPublishCodecs, EnabledSubscribeCodecs: publisherSubscribeCodecs, Logger: lgr, SimTracks: params.SimTracks, ClientInfo: params.ClientInfo, IsSendSide: params.UseOneShotSignallingMode || params.UseSinglePeerConnection, AllowPlayoutDelay: params.AllowPlayoutDelay, Transport: livekit.SignalTarget_PUBLISHER, Handler: TransportManagerTransportHandler{params.PublisherHandler, t, lgr}, UseOneShotSignallingMode: params.UseOneShotSignallingMode, DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount, DatachannelSlowThreshold: params.DatachannelSlowThreshold, DatachannelLossyTargetLatency: params.DatachannelLossyTargetLatency, FireOnTrackBySdp: params.FireOnTrackBySdp, EnableDataTracks: params.EnableDataTracks, }) if err != nil { return nil, err } t.publisher = publisher if !t.params.UseOneShotSignallingMode && !t.params.UseSinglePeerConnection { lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_SUBSCRIBER) subscriber, err := NewPCTransport(TransportParams{ ProtocolVersion: params.ProtocolVersion, Config: params.Config, DirectionConfig: params.Config.Subscriber, CongestionControlConfig: params.CongestionControlConfig, EnabledSubscribeCodecs: params.EnabledSubscribeCodecs, Logger: lgr, ClientInfo: params.ClientInfo, IsOfferer: true, IsSendSide: true, AllowPlayoutDelay: params.AllowPlayoutDelay, DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount, DatachannelSlowThreshold: params.DatachannelSlowThreshold, DatachannelLossyTargetLatency: params.DatachannelLossyTargetLatency, Transport: livekit.SignalTarget_SUBSCRIBER, Handler: TransportManagerTransportHandler{params.SubscriberHandler, t, lgr}, FireOnTrackBySdp: params.FireOnTrackBySdp, EnableDataTracks: params.EnableDataTracks, }) if err != nil { return nil, err } t.subscriber = subscriber } if !t.params.Migration && t.params.SubscriberAsPrimary { if err := t.createDataChannelsForSubscriber(nil); err != nil { return nil, err } } t.signalSourceValid.Store(true) return t, nil } func (t *TransportManager) Close() { if t.publisher != nil { t.publisher.Close() } if t.subscriber != nil { t.subscriber.Close() } } func (t *TransportManager) SubscriberClose() { t.subscriber.Close() } func (t *TransportManager) HasPublisherEverConnected() bool { return t.publisher.HasEverConnected() } func (t *TransportManager) IsPublisherEstablished() bool { return t.publisher.IsEstablished() } func (t *TransportManager) GetPublisherRTT() (float64, bool) { return t.publisher.GetRTT() } func (t *TransportManager) GetPublisherMid(rtpReceiver *webrtc.RTPReceiver) string { return t.publisher.GetMid(rtpReceiver) } func (t *TransportManager) GetPublisherRTPTransceiver(mid string) *webrtc.RTPTransceiver { return t.publisher.GetRTPTransceiver(mid) } func (t *TransportManager) GetPublisherRTPReceiver(mid string) *webrtc.RTPReceiver { return t.publisher.GetRTPReceiver(mid) } func (t *TransportManager) WritePublisherRTCP(pkts []rtcp.Packet) error { return t.publisher.WriteRTCP(pkts) } func (t *TransportManager) GetSubscriberRTT() (float64, bool) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.GetRTT() } else { return t.subscriber.GetRTT() } } func (t *TransportManager) HasSubscriberEverConnected() bool { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.HasEverConnected() } else { return t.subscriber.HasEverConnected() } } func (t *TransportManager) AddTrackLocal( trackLocal webrtc.TrackLocal, params types.AddTrackParams, enabledCodecs []*livekit.Codec, rtcpFeedbackConfig RTCPFeedbackConfig, ) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.AddTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig) } else { return t.subscriber.AddTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig) } } func (t *TransportManager) AddTransceiverFromTrackLocal( trackLocal webrtc.TrackLocal, params types.AddTrackParams, enabledCodecs []*livekit.Codec, rtcpFeedbackConfig RTCPFeedbackConfig, ) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig) } else { return t.subscriber.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig) } } func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.RemoveTrack(sender) } else { return t.subscriber.RemoveTrack(sender) } } func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.WriteRTCP(pkts) } else { return t.subscriber.WriteRTCP(pkts) } } func (t *TransportManager) GetSubscriberPacer() pacer.Pacer { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { return t.publisher.GetPacer() } else { return t.subscriber.GetPacer() } } func (t *TransportManager) AddSubscribedTrack(subTrack types.SubscribedTrack) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { t.publisher.AddTrackToStreamAllocator(subTrack) } else { t.subscriber.AddTrackToStreamAllocator(subTrack) } } func (t *TransportManager) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { t.publisher.RemoveTrackFromStreamAllocator(subTrack) } else { t.subscriber.RemoveTrackFromStreamAllocator(subTrack) } } func (t *TransportManager) SendDataMessage(kind livekit.DataPacket_Kind, data []byte) error { // downstream data is sent via primary peer connection return t.handleSendDataResult(t.getTransport(true).SendDataMessage(kind, data), kind.String(), len(data)) } func (t *TransportManager) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error { // downstream data is sent via primary peer connection return t.handleSendDataResult( t.getTransport(true).SendDataMessageUnlabeled(data, useRaw, sender), "unlabeled", len(data), ) } func (t *TransportManager) handleSendDataResult(err error, kind string, size int) error { if err != nil { if !utils.ErrorIsOneOf( err, io.ErrClosedPipe, sctp.ErrStreamClosed, ErrTransportFailure, ErrDataChannelUnavailable, context.DeadlineExceeded, datachannel.ErrDataDroppedByHighBufferedAmount, ) { if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) { droppedBySlowReaderCount := t.dataChannelSendErrorDroppedBySlowReaderCount.Inc() if (droppedBySlowReaderCount-1)%100 == 0 { t.params.Logger.Infow( "drop data message by slow reader", "error", err, "kind", kind, "count", droppedBySlowReaderCount, ) } } else { count := t.dataChannelSendErrorCount.Inc() if (count-1)%100 == 0 { t.params.Logger.Infow( "send data message error", "error", err, "kind", kind, "count", count, ) } } } if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) { if t.params.SubscriberAsPrimary { t.params.SubscriberHandler.OnDataSendError(err) } else { t.params.PublisherHandler.OnDataSendError(err) } } } else { t.params.DataChannelStats.AddBytes(uint64(size), true) } return err } func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error { var ( reliableID, lossyID, dataTrackID uint16 reliableIDPtr, lossyIDPtr, dataTrackIDPtr *uint16 ) // // For old version migration clients, they don't send subscriber data channel info // so we need to create data channels with default ID and don't negotiate as client already has // data channels with default ID. // // For new version migration clients, we create data channels with new ID and negotiate with client // for _, dc := range pendingDataChannels { switch dc.Label { case ReliableDataChannel: // pion use step 2 for auto generated ID, so we need to add 6 to avoid conflict reliableID = uint16(dc.Id) + 6 reliableIDPtr = &reliableID case LossyDataChannel: lossyID = uint16(dc.Id) + 6 lossyIDPtr = &lossyID case DataTrackDataChannel: dataTrackID = uint16(dc.Id) + 6 dataTrackIDPtr = &dataTrackID } } ordered := true negotiated := t.params.Migration && reliableIDPtr == nil if err := t.subscriber.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, ID: reliableIDPtr, Negotiated: &negotiated, }); err != nil { return err } ordered = false retransmits := uint16(0) negotiated = t.params.Migration && lossyIDPtr == nil if err := t.subscriber.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, MaxRetransmits: &retransmits, ID: lossyIDPtr, Negotiated: &negotiated, }); err != nil { return err } negotiated = t.params.Migration && dataTrackIDPtr == nil if err := t.subscriber.CreateDataChannel(DataTrackDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, MaxRetransmits: &retransmits, ID: dataTrackIDPtr, Negotiated: &negotiated, }); err != nil { return err } return nil } func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescription, mediaType string) (unmatched []*sdp.MediaDescription, err error) { var lastMatchedMid string if lastAnswer := t.publisher.CurrentLocalDescription(); lastAnswer != nil { parsedAnswer, err1 := lastAnswer.Unmarshal() if err1 != nil { // should not happen t.params.Logger.Errorw("failed to parse last answer", err1) return unmatched, err1 } for i := len(parsedAnswer.MediaDescriptions) - 1; i >= 0; i-- { media := parsedAnswer.MediaDescriptions[i] if media.MediaName.Media == mediaType { lastMatchedMid, _ = media.Attribute(sdp.AttrKeyMID) break } } } for i := len(parsedOffer.MediaDescriptions) - 1; i >= 0; i-- { media := parsedOffer.MediaDescriptions[i] if media.MediaName.Media == mediaType { mid, _ := media.Attribute(sdp.AttrKeyMID) if mid == lastMatchedMid { break } unmatched = append(unmatched, media) } } return } func (t *TransportManager) LastPublisherOffer() *webrtc.SessionDescription { return t.publisher.CurrentRemoteDescription() } func (t *TransportManager) LastPublisherOfferPending() *webrtc.SessionDescription { return t.publisher.PendingRemoteDescription() } func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, offerId uint32, shouldPend bool) error { t.lock.Lock() if shouldPend { t.pendingOfferPublisher = &offer t.pendingOfferIdPublisher = offerId t.lock.Unlock() return nil } t.lock.Unlock() return t.publisher.HandleRemoteDescription(offer, offerId) } func (t *TransportManager) GetAnswer() (webrtc.SessionDescription, uint32, error) { return t.publisher.GetAnswer() } func (t *TransportManager) GetPublisherICESessionUfrag() (string, error) { return t.publisher.GetICESessionUfrag() } func (t *TransportManager) HandleICETrickleSDPFragment(sdpFragment string) error { return t.publisher.HandleICETrickleSDPFragment(sdpFragment) } func (t *TransportManager) HandleICERestartSDPFragment(sdpFragment string) (string, error) { return t.publisher.HandleICERestartSDPFragment(sdpFragment) } func (t *TransportManager) ProcessPendingPublisherOffer() { t.lock.Lock() pendingOffer := t.pendingOfferPublisher t.pendingOfferPublisher = nil pendingOfferId := t.pendingOfferIdPublisher t.pendingOfferIdPublisher = 0 t.lock.Unlock() if pendingOffer != nil { t.HandleOffer(*pendingOffer, pendingOfferId, false) } } func (t *TransportManager) HandleAnswer(answer webrtc.SessionDescription, answerId uint32) { t.subscriber.HandleRemoteDescription(answer, answerId) } // AddICECandidate adds candidates for remote peer func (t *TransportManager) AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) { switch target { case livekit.SignalTarget_PUBLISHER: if t.publisher != nil { t.publisher.AddICECandidate(candidate) } else { t.params.Logger.Warnw("ice candidate for publisher, but no peer connection", nil, "candidate", candidate) } case livekit.SignalTarget_SUBSCRIBER: if t.subscriber != nil { t.subscriber.AddICECandidate(candidate) } else { t.params.Logger.Warnw("ice candidate for subscriber, but no peer connection", nil, "candidate", candidate) } default: err := errors.New("unknown signal target") t.params.Logger.Errorw("ice candidate for unknown signal target", err, "target", target) } } func (t *TransportManager) NegotiateSubscriber(force bool) { if t.subscriber != nil { t.subscriber.Negotiate(force) } else { t.publisher.Negotiate(force) } } func (t *TransportManager) HandleClientReconnect(reason livekit.ReconnectReason) { var ( isShort bool duration time.Duration resetShortConnection bool ) switch reason { case livekit.ReconnectReason_RR_PUBLISHER_FAILED: if t.publisher != nil { resetShortConnection = true isShort, duration = t.publisher.IsShortConnection(time.Now()) } case livekit.ReconnectReason_RR_SUBSCRIBER_FAILED: if t.subscriber != nil { resetShortConnection = true isShort, duration = t.subscriber.IsShortConnection(time.Now()) } } if isShort { t.lock.Lock() t.resetTransportConfigureLocked(false) t.lock.Unlock() t.params.Logger.Infow("short connection by client ice restart", "duration", duration, "reason", reason) t.handleConnectionFailed(isShort) } if resetShortConnection { if t.publisher != nil { t.publisher.ResetShortConnOnICERestart() } if t.subscriber != nil { t.subscriber.ResetShortConnOnICERestart() } } } func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig) error { t.SetICEConfig(iceConfig) if t.subscriber != nil { return t.subscriber.ICERestart() } return nil } func (t *TransportManager) OnICEConfigChanged(f func(iceConfig *livekit.ICEConfig)) { t.lock.Lock() t.onICEConfigChanged = f t.lock.Unlock() } func (t *TransportManager) SetICEConfig(iceConfig *livekit.ICEConfig) { if iceConfig != nil { t.configureICE(iceConfig, true) } } func (t *TransportManager) GetICEConfig() *livekit.ICEConfig { t.lock.RLock() defer t.lock.RUnlock() if t.iceConfig == nil { return nil } return utils.CloneProto(t.iceConfig) } func (t *TransportManager) resetTransportConfigureLocked(reconfigured bool) { t.failureCount = 0 t.isTransportReconfigured = reconfigured t.udpLossUnstableCount = 0 t.lastFailure = time.Time{} } func (t *TransportManager) configureICE(iceConfig *livekit.ICEConfig, reset bool) { t.lock.Lock() isEqual := proto.Equal(t.iceConfig, iceConfig) if reset || !isEqual { t.resetTransportConfigureLocked(!reset) } if isEqual { t.lock.Unlock() return } t.params.Logger.Infow("setting ICE config", "iceConfig", logger.Proto(iceConfig)) onICEConfigChanged := t.onICEConfigChanged t.iceConfig = iceConfig t.lock.Unlock() if iceConfig.PreferenceSubscriber != livekit.ICECandidateType_ICT_NONE { t.mediaLossProxy.OnMediaLossUpdate(nil) } if t.publisher != nil { t.publisher.SetPreferTCP(iceConfig.PreferencePublisher == livekit.ICECandidateType_ICT_TCP) } if t.subscriber != nil { t.subscriber.SetPreferTCP(iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TCP) } if onICEConfigChanged != nil { onICEConfigChanged(iceConfig) } } func (t *TransportManager) SubscriberAsPrimary() bool { return t.params.SubscriberAsPrimary } func (t *TransportManager) GetICEConnectionInfo() []*types.ICEConnectionInfo { infos := make([]*types.ICEConnectionInfo, 0, 2) for _, pc := range []*PCTransport{t.publisher, t.subscriber} { if pc == nil { continue } info := pc.GetICEConnectionInfo() if info.HasCandidates() { infos = append(infos, info) } } return infos } func (t *TransportManager) GetDataTrackTransport() types.DataTrackTransport { return t.getTransport(true) } func (t *TransportManager) getTransport(isPrimary bool) *PCTransport { switch { case t.publisher == nil: return t.subscriber case t.subscriber == nil: return t.publisher default: pcTransport := t.publisher if (isPrimary && t.params.SubscriberAsPrimary) || (!isPrimary && !t.params.SubscriberAsPrimary) { pcTransport = t.subscriber } return pcTransport } } func (t *TransportManager) getLowestPriorityConnectionType() types.ICEConnectionType { switch { case t.publisher == nil: return t.subscriber.GetICEConnectionType() case t.subscriber == nil: return t.publisher.GetICEConnectionType() default: ctype := t.publisher.GetICEConnectionType() if stype := t.subscriber.GetICEConnectionType(); stype > ctype { ctype = stype } return ctype } } func (t *TransportManager) handleConnectionFailed(isShortLived bool) { if !t.params.AllowTCPFallback || t.params.UseOneShotSignallingMode { return } t.lock.Lock() if t.isTransportReconfigured { t.lock.Unlock() return } lastSignalSince := time.Since(t.lastSignalAt) signalValid := t.signalSourceValid.Load() if !t.hasRecentSignalLocked() || !signalValid { // the failed might cause by network interrupt because signal closed or we have not seen any signal in the time window, // so don't switch to next candidate type t.params.Logger.Debugw( "ignoring prefer candidate check by ICE failure because signal connection interrupted", "lastSignalSince", lastSignalSince, "signalValid", signalValid, ) t.failureCount = 0 t.lastFailure = time.Time{} t.lock.Unlock() return } lowestPriorityConnectionType := t.getLowestPriorityConnectionType() // // Checking only `PreferenceSubscriber` field although any connection failure (PUBLISHER OR SUBSCRIBER) will // flow through here. // // As both transports are switched to the same type on any failure, checking just subscriber should be fine. // getNext := func(ic *livekit.ICEConfig) livekit.ICECandidateType { switch lowestPriorityConnectionType { case types.ICEConnectionTypeUDP: // try ICE/TCP if ICE/UDP failed if ic.PreferenceSubscriber == livekit.ICECandidateType_ICT_NONE { if t.params.ClientInfo.SupportsICETCP() && t.canUseICETCP() { return livekit.ICECandidateType_ICT_TCP } else if t.params.TURNSEnabled { // fallback to TURN/TLS if TCP is not supported return livekit.ICECandidateType_ICT_TLS } } case types.ICEConnectionTypeTCP: // try TURN/TLS if ICE/TCP failed, // the configuration could have been ICT_NONE or ICT_TCP, // in either case, fallback to TURN/TLS if t.params.TURNSEnabled { return livekit.ICECandidateType_ICT_TLS } else { // keep the current config return ic.PreferenceSubscriber } case types.ICEConnectionTypeTURN: // TURN/TLS is the most permissive option, if that fails there is nowhere to go to // the configuration could have been ICT_NONE or ICT_TLS, // keep the current config return ic.PreferenceSubscriber } return livekit.ICECandidateType_ICT_NONE } var preferNext livekit.ICECandidateType if isShortLived { preferNext = getNext(t.iceConfig) } else { t.failureCount++ lastFailure := t.lastFailure t.lastFailure = time.Now() if t.failureCount < failureCountThreshold || time.Since(lastFailure) > preferNextByFailureWindow { t.lock.Unlock() return } preferNext = getNext(t.iceConfig) } if preferNext == t.iceConfig.PreferenceSubscriber { t.lock.Unlock() return } t.isTransportReconfigured = true t.lock.Unlock() switch preferNext { case livekit.ICECandidateType_ICT_TCP: t.params.Logger.Debugw("prefer TCP transport on both peer connections") case livekit.ICECandidateType_ICT_TLS: t.params.Logger.Debugw("prefer TLS transport both peer connections") case livekit.ICECandidateType_ICT_NONE: t.params.Logger.Debugw("allowing all transports on both peer connections") } // irrespective of which one fails, force prefer candidate on both as the other one might // fail at a different time and cause another disruption t.configureICE(&livekit.ICEConfig{ PreferenceSubscriber: preferNext, PreferencePublisher: preferNext, }, false) } func (t *TransportManager) SetMigrateInfo( previousOffer *webrtc.SessionDescription, previousAnswer *webrtc.SessionDescription, dataChannels []*livekit.DataChannelInfo, ) { t.lock.Lock() t.pendingDataChannelsPublisher = make([]*livekit.DataChannelInfo, 0, len(dataChannels)) pendingDataChannelsSubscriber := make([]*livekit.DataChannelInfo, 0, len(dataChannels)) for _, dci := range dataChannels { if dci.Target == livekit.SignalTarget_SUBSCRIBER { pendingDataChannelsSubscriber = append(pendingDataChannelsSubscriber, dci) } else { t.pendingDataChannelsPublisher = append(t.pendingDataChannelsPublisher, dci) } } t.lock.Unlock() if t.params.SubscriberAsPrimary { if err := t.createDataChannelsForSubscriber(pendingDataChannelsSubscriber); err != nil { t.params.Logger.Errorw("create subscriber data channels during migration failed", err) } } if t.params.UseSinglePeerConnection { t.publisher.SetPreviousSdp(previousAnswer, previousOffer) } else { t.subscriber.SetPreviousSdp(previousOffer, previousAnswer) } } func (t *TransportManager) ProcessPendingPublisherDataChannels() { t.lock.Lock() pendingDataChannels := t.pendingDataChannelsPublisher t.pendingDataChannelsPublisher = nil t.lock.Unlock() ordered := true negotiated := true for _, ci := range pendingDataChannels { var ( dcLabel string dcID uint16 dcExisting bool err error ) switch ci.Label { case ReliableDataChannel: id := uint16(ci.GetId()) dcLabel, dcID, dcExisting, err = t.publisher.CreateDataChannelIfEmpty(ReliableDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, Negotiated: &negotiated, ID: &id, }) case LossyDataChannel: ordered = false retransmits := uint16(0) id := uint16(ci.GetId()) dcLabel, dcID, dcExisting, err = t.publisher.CreateDataChannelIfEmpty(LossyDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, MaxRetransmits: &retransmits, Negotiated: &negotiated, ID: &id, }) case DataTrackDataChannel: ordered = false retransmits := uint16(0) id := uint16(ci.GetId()) dcLabel, dcID, dcExisting, err = t.publisher.CreateDataChannelIfEmpty(DataTrackDataChannel, &webrtc.DataChannelInit{ Ordered: &ordered, MaxRetransmits: &retransmits, Negotiated: &negotiated, ID: &id, }) } if err != nil { t.params.Logger.Errorw("create migrated data channel failed", err, "label", ci.Label) } else if dcExisting { t.params.Logger.Debugw("existing data channel during migration", "label", dcLabel, "id", dcID) } else { t.params.Logger.Debugw("create migrated data channel", "label", dcLabel, "id", dcID) } } } func (t *TransportManager) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { t.mediaLossProxy.HandleMaxLossFeedback(dt, report) } func (t *TransportManager) onMediaLossUpdate(loss uint8) { if t.params.TCPFallbackRTTThreshold == 0 || !t.params.AllowUDPUnstableFallback { return } t.lock.Lock() t.udpLossUnstableCount <<= 1 if loss >= uint8(255*udpLossFracUnstable/100) { t.udpLossUnstableCount |= 1 if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold { if t.udpRTT > 0 && t.signalingRTT < uint32(float32(t.udpRTT)*1.3) && int(t.signalingRTT) < t.params.TCPFallbackRTTThreshold && t.hasRecentSignalLocked() { t.udpLossUnstableCount = 0 t.lock.Unlock() t.params.Logger.Infow("udp connection unstable, switch to tcp", "signalingRTT", t.signalingRTT) if t.params.UseSinglePeerConnection { t.params.PublisherHandler.OnFailed(true, t.publisher.GetICEConnectionInfo()) } else { t.params.SubscriberHandler.OnFailed(true, t.subscriber.GetICEConnectionInfo()) } return } } } t.lock.Unlock() } func (t *TransportManager) UpdateSignalingRTT(rtt uint32) { t.lock.Lock() t.signalingRTT = rtt t.lock.Unlock() if t.publisher != nil { t.publisher.SetSignalingRTT(rtt) } if t.subscriber != nil { t.subscriber.SetSignalingRTT(rtt) } // TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established // within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp. // Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now. } func (t *TransportManager) UpdateMediaRTT(rtt uint32) { t.lock.Lock() if t.udpRTT == 0 { t.udpRTT = rtt } else { t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2) } t.lock.Unlock() } func (t *TransportManager) UpdateLastSeenSignal() { t.lock.Lock() t.lastSignalAt = time.Now() t.lock.Unlock() } func (t *TransportManager) SinceLastSignal() time.Duration { t.lock.RLock() defer t.lock.RUnlock() return time.Since(t.lastSignalAt) } func (t *TransportManager) LastSeenSignalAt() time.Time { t.lock.RLock() defer t.lock.RUnlock() return t.lastSignalAt } func (t *TransportManager) canUseICETCP() bool { return t.params.TCPFallbackRTTThreshold == 0 || int(t.signalingRTT) < t.params.TCPFallbackRTTThreshold } func (t *TransportManager) SetSignalSourceValid(valid bool) { t.signalSourceValid.Store(valid) t.params.Logger.Debugw("signal source valid", "valid", valid) } func (t *TransportManager) SetSubscriberAllowPause(allowPause bool) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { t.publisher.SetAllowPauseOfStreamAllocator(allowPause) } else { t.subscriber.SetAllowPauseOfStreamAllocator(allowPause) } } func (t *TransportManager) SetSubscriberChannelCapacity(channelCapacity int64) { if t.params.UseOneShotSignallingMode || t.params.UseSinglePeerConnection { t.publisher.SetChannelCapacityOfStreamAllocator(channelCapacity) } else { t.subscriber.SetChannelCapacityOfStreamAllocator(channelCapacity) } } func (t *TransportManager) hasRecentSignalLocked() bool { return time.Since(t.lastSignalAt) < PingTimeoutSeconds*time.Second } func (t *TransportManager) RTPStreamPublished(ssrc uint32, mid, rid string) { t.publisher.RTPStreamPublished(ssrc, mid, rid) }