Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-09-01 08:19:04 +05:30
6 changed files with 46 additions and 33 deletions
+12
View File
@@ -2244,3 +2244,15 @@ func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err e
return out, nil
}
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
err := p.TransportManager.SendDataPacket(dp, data)
if err == nil {
p.dataChannelStats.AddBytes(uint64(len(data)), true)
}
return err
}
-12
View File
@@ -154,18 +154,6 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for
})
}
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
err := p.TransportManager.SendDataPacket(dp, data)
if err == nil {
p.dataChannelStats.AddBytes(uint64(len(data)), true)
}
return err
}
func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomUpdate{
+2 -3
View File
@@ -31,7 +31,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/pion/sctp"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -1330,8 +1330,7 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
err := op.SendDataPacket(dp, dpData)
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, psrpc.Canceled) &&
!errors.Is(err, psrpc.ErrStreamClosed) {
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) {
op.GetLogger().Infow("send data packet error", "error", err)
}
})
+25 -16
View File
@@ -704,9 +704,9 @@ func (t *PCTransport) maybeNotifyFullyEstablished() {
func (t *PCTransport) isFullyEstablished() bool {
t.lock.RLock()
fullyEstablished := t.reliableDCOpened && t.lossyDCOpened && !t.connectedAt.IsZero()
t.lock.RUnlock()
return fullyEstablished
defer t.lock.RUnlock()
return t.reliableDCOpened && t.lossyDCOpened && !t.connectedAt.IsZero()
}
func (t *PCTransport) SetPreferTCP(preferTCP bool) {
@@ -813,22 +813,27 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni
return err
}
reliableDCReadyHandler := func() {
t.params.Logger.Debugw("reliable data channel open")
dcReadyHandler := func() {
t.lock.Lock()
t.reliableDCOpened = true
switch dc.Label() {
case ReliableDataChannel:
t.reliableDCOpened = true
case LossyDataChannel:
t.lossyDCOpened = true
}
t.lock.Unlock()
t.params.Logger.Debugw(dc.Label() + " data channel open")
t.maybeNotifyFullyEstablished()
}
lossyDCReadyHanlder := func() {
t.params.Logger.Debugw("lossy data channel open")
t.lock.Lock()
t.lossyDCOpened = true
t.lock.Unlock()
dcCloseHandler := func() {
t.params.Logger.Infow(dc.Label() + " data channel close")
}
t.maybeNotifyFullyEstablished()
dcErrorHandler := func(err error) {
t.params.Logger.Errorw(dc.Label()+" data channel close", err)
}
t.lock.Lock()
@@ -836,17 +841,21 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni
case ReliableDataChannel:
t.reliableDC = dc
if t.params.DirectionConfig.StrictACKs {
t.reliableDC.OnOpen(reliableDCReadyHandler)
t.reliableDC.OnOpen(dcReadyHandler)
} else {
t.reliableDC.OnDial(reliableDCReadyHandler)
t.reliableDC.OnDial(dcReadyHandler)
}
t.reliableDC.OnClose(dcCloseHandler)
t.reliableDC.OnError(dcErrorHandler)
case LossyDataChannel:
t.lossyDC = dc
if t.params.DirectionConfig.StrictACKs {
t.lossyDC.OnOpen(lossyDCReadyHanlder)
t.lossyDC.OnOpen(dcReadyHandler)
} else {
t.lossyDC.OnDial(lossyDCReadyHanlder)
t.lossyDC.OnDial(dcReadyHandler)
}
t.lossyDC.OnClose(dcCloseHandler)
t.lossyDC.OnError(dcErrorHandler)
default:
t.params.Logger.Errorw("unknown data channel label", nil, "label", dc.Label())
}
+5
View File
@@ -34,6 +34,7 @@ import (
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -418,6 +419,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := cr.RequestSink.WriteMessage(req); err != nil {
pLogger.Warnw("error writing to request sink", err, "connID", cr.ConnectionID)
if errors.Is(err, psrpc.ErrStreamClosed) {
// disconnect the participant WS since the signal proxy has been broken
return
}
}
}
}
+2 -2
View File
@@ -474,9 +474,9 @@ func (w *WebRTCReceiver) OnMaxAvailableLayerChanged(maxAvailableLayer int32) {
// StreamTrackerManagerListener.OnBitrateReport
func (w *WebRTCReceiver) OnBitrateReport(availableLayers []int32, bitrates Bitrates) {
for _, dt := range w.downTrackSpreader.GetDownTracks() {
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
dt.UpTrackBitrateReport(availableLayers, bitrates)
}
})
w.connectionStats.AddLayerTransition(w.streamTrackerManager.DistanceToDesired())
}