From c7683fd3830e70f982d26e4432162fee07a92095 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 31 Aug 2023 21:44:19 +0530 Subject: [PATCH 1/3] Check for sctp.ErrStreamClosed (#2023) --- pkg/rtc/participant.go | 12 ++++++++++++ pkg/rtc/participant_signal.go | 12 ------------ pkg/rtc/room.go | 5 ++--- pkg/sfu/receiver.go | 4 ++-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ed4cef8f9..3b4518c51 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2244,3 +2244,15 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e return out, nil } + +func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error { + if p.State() != livekit.ParticipantInfo_ACTIVE { + return ErrDataChannelUnavailable + } + + err := p.TransportManager.SendDataPacket(dp, data) + if err == nil { + p.dataChannelStats.AddBytes(uint64(len(data)), true) + } + return err +} diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index e634fa573..0f0d3c6e7 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -154,18 +154,6 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for }) } -func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error { - if p.State() != livekit.ParticipantInfo_ACTIVE { - return ErrDataChannelUnavailable - } - - err := p.TransportManager.SendDataPacket(dp, data) - if err == nil { - p.dataChannelStats.AddBytes(uint64(len(data)), true) - } - return err -} - func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_RoomUpdate{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 4fadc21da..53c284e87 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -31,7 +31,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/livekit/psrpc" + "github.com/pion/sctp" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -1330,8 +1330,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) { err := op.SendDataPacket(dp, dpData) - if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, psrpc.Canceled) && - !errors.Is(err, psrpc.ErrStreamClosed) { + if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) { op.GetLogger().Infow("send data packet error", "error", err) } }) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index c725dbb69..a153ce53a 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -474,9 +474,9 @@ func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) { // StreamTrackerManagerListener.OnBitrateReport func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) { - for _, dt := range w.downTrackSpreader.GetDownTracks() { + w.downTrackSpreader.Broadcast(func(dt TrackSender) { dt.UpTrackBitrateReport(availableLayers, bitrates) - } + }) w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) } From d5808f96df66d97703bed9657fd642d5fb5d1478 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 31 Aug 2023 09:39:45 -0700 Subject: [PATCH 2/3] Disconnect participant when signal proxy is closed (#2024) When signal proxy is closed, we'd want to stop writing to it and sever WS connection with the client. Client would go into a reconnect sequence --- pkg/service/rtcservice.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index f469cda2d..ab49ba24a 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -34,6 +34,7 @@ import ( "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -418,6 +419,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := cr.RequestSink.WriteMessage(req); err != nil { pLogger.Warnw("error writing to request sink", err, "connID", cr.ConnectionID) + if errors.Is(err, psrpc.ErrStreamClosed) { + // disconnect the participant WS since the signal proxy has been broken + return + } } } } From b4efbe21a137dd013cdea0d46a60471edbd93d43 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 31 Aug 2023 23:44:11 +0530 Subject: [PATCH 3/3] Log data channel close and errors. (#2025) --- pkg/rtc/transport.go | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index b249fd835..fc13fe314 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -704,9 +704,9 @@ func (t *PCTransport) maybeNotifyFullyEstablished() { func (t *PCTransport) isFullyEstablished() bool { t.lock.RLock() - fullyEstablished := t.reliableDCOpened && t.lossyDCOpened && !t.connectedAt.IsZero() - t.lock.RUnlock() - return fullyEstablished + defer t.lock.RUnlock() + + return t.reliableDCOpened && t.lossyDCOpened && !t.connectedAt.IsZero() } func (t *PCTransport) SetPreferTCP(preferTCP bool) { @@ -813,22 +813,27 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni return err } - reliableDCReadyHandler := func() { - t.params.Logger.Debugw("reliable data channel open") + dcReadyHandler := func() { t.lock.Lock() - t.reliableDCOpened = true + switch dc.Label() { + case ReliableDataChannel: + t.reliableDCOpened = true + + case LossyDataChannel: + t.lossyDCOpened = true + } t.lock.Unlock() + t.params.Logger.Debugw(dc.Label() + " data channel open") t.maybeNotifyFullyEstablished() } - lossyDCReadyHanlder := func() { - t.params.Logger.Debugw("lossy data channel open") - t.lock.Lock() - t.lossyDCOpened = true - t.lock.Unlock() + dcCloseHandler := func() { + t.params.Logger.Infow(dc.Label() + " data channel close") + } - t.maybeNotifyFullyEstablished() + dcErrorHandler := func(err error) { + t.params.Logger.Errorw(dc.Label()+" data channel close", err) } t.lock.Lock() @@ -836,17 +841,21 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni case ReliableDataChannel: t.reliableDC = dc if t.params.DirectionConfig.StrictACKs { - t.reliableDC.OnOpen(reliableDCReadyHandler) + t.reliableDC.OnOpen(dcReadyHandler) } else { - t.reliableDC.OnDial(reliableDCReadyHandler) + t.reliableDC.OnDial(dcReadyHandler) } + t.reliableDC.OnClose(dcCloseHandler) + t.reliableDC.OnError(dcErrorHandler) case LossyDataChannel: t.lossyDC = dc if t.params.DirectionConfig.StrictACKs { - t.lossyDC.OnOpen(lossyDCReadyHanlder) + t.lossyDC.OnOpen(dcReadyHandler) } else { - t.lossyDC.OnDial(lossyDCReadyHanlder) + t.lossyDC.OnDial(dcReadyHandler) } + t.lossyDC.OnClose(dcCloseHandler) + t.lossyDC.OnError(dcErrorHandler) default: t.params.Logger.Errorw("unknown data channel label", nil, "label", dc.Label()) }