mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 16:15:25 +00:00
Disable data channel throttle by default (#3281)
* Disable data channel throttle by default * data race * err type
This commit is contained in:
@@ -326,7 +326,6 @@ var DefaultConfig = Config{
|
||||
UseSendSideBWE: false,
|
||||
SendSideBWE: sendsidebwe.DefaultSendSideBWEConfig,
|
||||
},
|
||||
DatachannelSlowThreshold: 1000000,
|
||||
},
|
||||
Audio: sfu.DefaultAudioConfig,
|
||||
Video: VideoConfig{
|
||||
|
||||
@@ -777,7 +777,7 @@ func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
|
||||
for {
|
||||
n, _, err := rawDC.ReadDataChannel(buffer)
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
|
||||
t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
|
||||
}
|
||||
return
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/rtc/transport"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/sfu"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/pacer"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
)
|
||||
@@ -299,7 +300,11 @@ func (t *TransportManager) SendDataPacket(kind livekit.DataPacket_Kind, encoded
|
||||
err := t.getTransport(true).SendDataPacket(kind, encoded)
|
||||
if err != nil {
|
||||
if !utils.ErrorIsOneOf(err, io.ErrClosedPipe, sctp.ErrStreamClosed, ErrTransportFailure, ErrDataChannelBufferFull, context.DeadlineExceeded) {
|
||||
t.params.Logger.Warnw("send data packet error", err)
|
||||
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
|
||||
t.params.Logger.Debugw("slow data reader", "error", err)
|
||||
} else {
|
||||
t.params.Logger.Warnw("send data packet error", err)
|
||||
}
|
||||
}
|
||||
if utils.ErrorIsOneOf(err, sctp.ErrStreamClosed, io.ErrClosedPipe) {
|
||||
t.params.SubscriberHandler.OnDataSendError(err)
|
||||
|
||||
@@ -3,6 +3,7 @@ package datachannel
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pion/datachannel"
|
||||
@@ -62,8 +63,12 @@ func (w *DataChannelWriter[T]) Write(p []byte) (n int, err error) {
|
||||
now := mono.Now()
|
||||
w.rate.AddBytes(n, int(w.bufferGetter.BufferedAmount()), now)
|
||||
// retry if the write timed out on a non-slow receiver
|
||||
if errors.Is(err, context.DeadlineExceeded) && w.rate.Bitrate(now) > w.slowThreshold {
|
||||
continue
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
if bitrate := w.rate.Bitrate(now); bitrate >= w.slowThreshold {
|
||||
continue
|
||||
} else {
|
||||
err = fmt.Errorf("%w: bitrate %d, threshold %d", ErrDataDroppedBySlowReader, bitrate, w.slowThreshold)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestDataChannelWriter(t *testing.T) {
|
||||
// bitrate below slow threshold(2000bytes/3sec), should drop by timeout
|
||||
mockDC.SetNextWriteCompleteAt(t0.Add(3 * time.Second))
|
||||
n, err = w.Write(buf[:1000])
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded, err)
|
||||
require.ErrorIs(t, err, ErrDataDroppedBySlowReader, err)
|
||||
require.Equal(t, 0, n)
|
||||
}
|
||||
|
||||
|
||||
+29
-28
@@ -37,6 +37,7 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/datachannel"
|
||||
"github.com/livekit/livekit-server/pkg/testutils"
|
||||
testclient "github.com/livekit/livekit-server/test/client"
|
||||
)
|
||||
@@ -746,34 +747,6 @@ func TestDataPublishSlowSubscriber(t *testing.T) {
|
||||
slowSubDrop.Stop()
|
||||
}()
|
||||
|
||||
// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
|
||||
var (
|
||||
blocked atomic.Bool
|
||||
stopWrite atomic.Bool
|
||||
writeIdx atomic.Uint64
|
||||
)
|
||||
writeStopped := make(chan struct{})
|
||||
go func() {
|
||||
defer close(writeStopped)
|
||||
var i int
|
||||
buf := make([]byte, 100)
|
||||
for !stopWrite.Load() {
|
||||
i++
|
||||
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
|
||||
if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
blocked.Store(true)
|
||||
i--
|
||||
continue
|
||||
} else {
|
||||
t.Log("error writing", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
writeIdx.Store(uint64(i))
|
||||
}
|
||||
}()
|
||||
|
||||
// no data should be dropped for fast subscriber
|
||||
var fastDataIndex atomic.Uint64
|
||||
fastSub.OnDataReceived = func(data []byte, sid string) {
|
||||
@@ -813,6 +786,34 @@ func TestDataPublishSlowSubscriber(t *testing.T) {
|
||||
slowDropReader.Read(data, sid)
|
||||
}
|
||||
|
||||
// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
|
||||
var (
|
||||
blocked atomic.Bool
|
||||
stopWrite atomic.Bool
|
||||
writeIdx atomic.Uint64
|
||||
)
|
||||
writeStopped := make(chan struct{})
|
||||
go func() {
|
||||
defer close(writeStopped)
|
||||
var i int
|
||||
buf := make([]byte, 100)
|
||||
for !stopWrite.Load() {
|
||||
i++
|
||||
binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
|
||||
if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
|
||||
if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
|
||||
blocked.Store(true)
|
||||
i--
|
||||
continue
|
||||
} else {
|
||||
t.Log("error writing", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
writeIdx.Store(uint64(i))
|
||||
}
|
||||
}()
|
||||
|
||||
<-dropped
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
Reference in New Issue
Block a user