Check for sctp.ErrStreamClosed (#2023)

This commit is contained in:
Raja Subramanian
2023-08-31 21:44:19 +05:30
committed by GitHub
parent bc5b4d68af
commit c7683fd383
4 changed files with 16 additions and 17 deletions

View File

@@ -2244,3 +2244,15 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e
return out, nil
}
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
err := p.TransportManager.SendDataPacket(dp, data)
if err == nil {
p.dataChannelStats.AddBytes(uint64(len(data)), true)
}
return err
}

View File

@@ -154,18 +154,6 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for
})
}
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
err := p.TransportManager.SendDataPacket(dp, data)
if err == nil {
p.dataChannelStats.AddBytes(uint64(len(data)), true)
}
return err
}
func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomUpdate{

View File

@@ -31,7 +31,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/pion/sctp"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -1330,8 +1330,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
err := op.SendDataPacket(dp, dpData)
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, psrpc.Canceled) &&
!errors.Is(err, psrpc.ErrStreamClosed) {
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) {
op.GetLogger().Infow("send data packet error", "error", err)
}
})

View File

@@ -474,9 +474,9 @@ func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
// StreamTrackerManagerListener.OnBitrateReport
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackBitrateReport(availableLayers, bitrates)
}
})
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}