diff --git a/pkg/config/config.go b/pkg/config/config.go index bf2a59be0..77592ea5d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -326,7 +326,6 @@ var DefaultConfig = Config{ UseSendSideBWE: false, SendSideBWE: sendsidebwe.DefaultSendSideBWEConfig, }, - DatachannelSlowThreshold: 1000000, }, Audio: sfu.DefaultAudioConfig, Video: VideoConfig{ diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 21990f6db..85478418f 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -777,7 +777,7 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) { for { n, _, err := rawDC.ReadDataChannel(buffer) if err != nil { - if !errors.Is(err, io.EOF) { + if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") { t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label()) } return diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 68889d70e..a8800a060 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -38,6 +38,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/sfu/pacer" "github.com/livekit/livekit-server/pkg/telemetry" ) @@ -299,7 +300,11 @@ func (t *TransportManager) SendDataPacket(kind livekit.DataPacket_Kind, encoded err := t.getTransport(true).SendDataPacket(kind, encoded) if err != nil { if !utils.ErrorIsOneOf(err, io.ErrClosedPipe, sctp.ErrStreamClosed, ErrTransportFailure, ErrDataChannelBufferFull, context.DeadlineExceeded) { - t.params.Logger.Warnw("send data packet error", err) + if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) { + t.params.Logger.Debugw("slow data reader", "error", err) + } else { + t.params.Logger.Warnw("send data packet error", err) + } } if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) { t.params.SubscriberHandler.OnDataSendError(err) diff --git a/pkg/sfu/datachannel/datachannel_writer.go b/pkg/sfu/datachannel/datachannel_writer.go index 2ac05b351..9e9ee6767 100644 --- a/pkg/sfu/datachannel/datachannel_writer.go +++ b/pkg/sfu/datachannel/datachannel_writer.go @@ -3,6 +3,7 @@ package datachannel import ( "context" "errors" + "fmt" "time" "github.com/pion/datachannel" @@ -62,8 +63,12 @@ func (w *DataChannelWriter[T]) Write(p []byte) (n int, err error) { now := mono.Now() w.rate.AddBytes(n, int(w.bufferGetter.BufferedAmount()), now) // retry if the write timed out on a non-slow receiver - if errors.Is(err, context.DeadlineExceeded) && w.rate.Bitrate(now) > w.slowThreshold { - continue + if errors.Is(err, context.DeadlineExceeded) { + if bitrate := w.rate.Bitrate(now); bitrate >= w.slowThreshold { + continue + } else { + err = fmt.Errorf("%w: bitrate %d, threshold %d", ErrDataDroppedBySlowReader, bitrate, w.slowThreshold) + } } return diff --git a/pkg/sfu/datachannel/datachannel_writer_test.go b/pkg/sfu/datachannel/datachannel_writer_test.go index b07f8a3b6..45459cee6 100644 --- a/pkg/sfu/datachannel/datachannel_writer_test.go +++ b/pkg/sfu/datachannel/datachannel_writer_test.go @@ -32,7 +32,7 @@ func TestDataChannelWriter(t *testing.T) { // bitrate below slow threshold(2000bytes/3sec), should drop by timeout mockDC.SetNextWriteCompleteAt(t0.Add(3 * time.Second)) n, err = w.Write(buf[:1000]) - require.ErrorIs(t, err, context.DeadlineExceeded, err) + require.ErrorIs(t, err, ErrDataDroppedBySlowReader, err) require.Equal(t, 0, n) } diff --git a/test/singlenode_test.go b/test/singlenode_test.go index 6f993eab1..32946c963 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -37,6 +37,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/sfu/datachannel" "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" ) @@ -746,34 +747,6 @@ func TestDataPublishSlowSubscriber(t *testing.T) { slowSubDrop.Stop() }() - // publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold - var ( - blocked atomic.Bool - stopWrite atomic.Bool - writeIdx atomic.Uint64 - ) - writeStopped := make(chan struct{}) - go func() { - defer close(writeStopped) - var i int - buf := make([]byte, 100) - for !stopWrite.Load() { - i++ - binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i)) - if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil { - if errors.Is(err, context.DeadlineExceeded) { - blocked.Store(true) - i-- - continue - } else { - t.Log("error writing", err) - break - } - } - writeIdx.Store(uint64(i)) - } - }() - // no data should be dropped for fast subscriber var fastDataIndex atomic.Uint64 fastSub.OnDataReceived = func(data []byte, sid string) { @@ -813,6 +786,34 @@ func TestDataPublishSlowSubscriber(t *testing.T) { slowDropReader.Read(data, sid) } + // publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold + var ( + blocked atomic.Bool + stopWrite atomic.Bool + writeIdx atomic.Uint64 + ) + writeStopped := make(chan struct{}) + go func() { + defer close(writeStopped) + var i int + buf := make([]byte, 100) + for !stopWrite.Load() { + i++ + binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i)) + if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil { + if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) { + blocked.Store(true) + i-- + continue + } else { + t.Log("error writing", err) + break + } + } + writeIdx.Store(uint64(i)) + } + }() + <-dropped time.Sleep(time.Second)