From c7683fd3830e70f982d26e4432162fee07a92095 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 31 Aug 2023 21:44:19 +0530 Subject: [PATCH] Check for sctp.ErrStreamClosed (#2023) --- pkg/rtc/participant.go | 12 ++++++++++++ pkg/rtc/participant_signal.go | 12 ------------ pkg/rtc/room.go | 5 ++--- pkg/sfu/receiver.go | 4 ++-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ed4cef8f9..3b4518c51 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2244,3 +2244,15 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e return out, nil } + +func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error { + if p.State() != livekit.ParticipantInfo_ACTIVE { + return ErrDataChannelUnavailable + } + + err := p.TransportManager.SendDataPacket(dp, data) + if err == nil { + p.dataChannelStats.AddBytes(uint64(len(data)), true) + } + return err +} diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index e634fa573..0f0d3c6e7 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -154,18 +154,6 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for }) } -func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error { - if p.State() != livekit.ParticipantInfo_ACTIVE { - return ErrDataChannelUnavailable - } - - err := p.TransportManager.SendDataPacket(dp, data) - if err == nil { - p.dataChannelStats.AddBytes(uint64(len(data)), true) - } - return err -} - func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_RoomUpdate{ diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 4fadc21da..53c284e87 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -31,7 +31,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" - "github.com/livekit/psrpc" + "github.com/pion/sctp" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -1330,8 +1330,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) { err := op.SendDataPacket(dp, dpData) - if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, psrpc.Canceled) && - !errors.Is(err, psrpc.ErrStreamClosed) { + if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) { op.GetLogger().Infow("send data packet error", "error", err) } }) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index c725dbb69..a153ce53a 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -474,9 +474,9 @@ func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) { // StreamTrackerManagerListener.OnBitrateReport func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) { - for _, dt := range w.downTrackSpreader.GetDownTracks() { + w.downTrackSpreader.Broadcast(func(dt TrackSender) { dt.UpTrackBitrateReport(availableLayers, bitrates) - } + }) w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired()) }