Add stats for data channel and signal (#1198)

* Add stats for data channel and signal

* Solve comment
This commit is contained in:
cnderrauber
2022-11-30 14:53:19 +08:00
committed by GitHub
parent caae389717
commit 3c907ed460
13 changed files with 217 additions and 46 deletions
+11 -1
View File
@@ -129,6 +129,8 @@ type ParticipantImpl struct {
subscribedTo map[livekit.ParticipantID]struct{}
unpublishedTracks []*livekit.TrackInfo
dataChannelStats *telemetry.BytesTrackStats
rttUpdatedAt time.Time
lastRTT uint32
@@ -184,7 +186,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
subscriptionInProgress: make(map[livekit.TrackID]bool),
subscriptionRequestsQueue: make(map[livekit.TrackID][]SubscribeRequest),
trackPublisherVersion: make(map[livekit.TrackID]uint32),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
dataChannelStats: telemetry.NewBytesTrackStats(
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
params.SID,
params.Telemetry),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
}
p.version.Store(params.InitialVersion)
p.migrateState.Store(types.MigrateStateInit)
@@ -601,6 +607,8 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.TransportManager.Close()
}()
p.dataChannelStats.Report()
return nil
}
@@ -1187,6 +1195,8 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
return
}
p.dataChannelStats.AddBytes(uint64(len(data)), false)
dp := livekit.DataPacket{}
if err := proto.Unmarshal(data, &dp); err != nil {
p.params.Logger.Warnw("could not parse data packet", err)
+6 -2
View File
@@ -104,12 +104,16 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) err
})
}
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket) error {
func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
if p.State() != livekit.ParticipantInfo_ACTIVE {
return ErrDataChannelUnavailable
}
return p.TransportManager.SendDataPacket(dp)
err := p.TransportManager.SendDataPacket(dp, data)
if err == nil {
p.dataChannelStats.AddBytes(uint64(len(data)), true)
}
return err
}
func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error {
+21 -2
View File
@@ -772,6 +772,7 @@ func (r *Room) onParticipantUpdate(p types.LocalParticipant) {
func (r *Room) onDataPacket(source types.LocalParticipant, dp *livekit.DataPacket) {
dest := dp.GetUser().GetDestinationSids()
var dpData []byte
for _, op := range r.GetParticipants() {
if op.State() != livekit.ParticipantInfo_ACTIVE {
@@ -792,7 +793,16 @@ func (r *Room) onDataPacket(source types.LocalParticipant, dp *livekit.DataPacke
continue
}
}
err := op.SendDataPacket(dp)
if dpData == nil {
var err error
dpData, err = proto.Marshal(dp)
if err != nil {
r.Logger.Errorw("failed to marshal data packet", err)
return
}
}
err := op.SendDataPacket(dp, dpData)
if err != nil {
r.Logger.Infow("send data packet error", "error", err, "participant", op.Identity())
}
@@ -879,9 +889,18 @@ func (r *Room) sendActiveSpeakers(speakers []*livekit.SpeakerInfo) {
},
}
var dpData []byte
for _, p := range r.GetParticipants() {
if p.ProtocolVersion().HandlesDataPackets() && !p.ProtocolVersion().SupportsSpeakerChanged() {
_ = p.SendDataPacket(dp)
if dpData == nil {
var err error
dpData, err = proto.Marshal(dp)
if err != nil {
r.Logger.Errorw("failed to marshal ActiveSpeaker data packet", err)
return
}
}
_ = p.SendDataPacket(dp, dpData)
}
}
}
+4 -2
View File
@@ -554,7 +554,8 @@ func TestDataChannel(t *testing.T) {
continue
}
require.Equal(t, 1, fp.SendDataPacketCallCount())
require.Equal(t, packet.Value, fp.SendDataPacketArgsForCall(0).Value)
dp, _ := fp.SendDataPacketArgsForCall(0)
require.Equal(t, packet.Value, dp.Value)
}
})
@@ -585,7 +586,8 @@ func TestDataChannel(t *testing.T) {
}
}
require.Equal(t, 1, p1.SendDataPacketCallCount())
require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value)
dp, _ := p1.SendDataPacketArgsForCall(0)
require.Equal(t, packet.Value, dp.Value)
})
t.Run("publishing disallowed", func(t *testing.T) {
+1 -7
View File
@@ -18,7 +18,6 @@ import (
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -726,12 +725,7 @@ func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error {
return t.pc.WriteRTCP(pkts)
}
func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket) error {
data, err := proto.Marshal(dp)
if err != nil {
return err
}
func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
var dc *webrtc.DataChannel
t.lock.RLock()
if dp.Kind == livekit.DataPacket_RELIABLE {
+2 -2
View File
@@ -284,9 +284,9 @@ func (t *TransportManager) OnDataMessage(f func(kind livekit.DataPacket_Kind, da
t.publisher.OnDataPacket(f)
}
func (t *TransportManager) SendDataPacket(dp *livekit.DataPacket) error {
func (t *TransportManager) SendDataPacket(dp *livekit.DataPacket, data []byte) error {
// downstream data is sent via primary peer connection
return t.getTransport(true).SendDataPacket(dp)
return t.getTransport(true).SendDataPacket(dp, data)
}
func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error {
+1 -1
View File
@@ -289,7 +289,7 @@ type LocalParticipant interface {
SendJoinResponse(joinResponse *livekit.JoinResponse) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
SendDataPacket(packet *livekit.DataPacket, data []byte) error
SendRoomUpdate(room *livekit.Room) error
SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error
SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool)
@@ -540,10 +540,11 @@ type FakeLocalParticipant struct {
sendConnectionQualityUpdateReturnsOnCall map[int]struct {
result1 error
}
SendDataPacketStub func(*livekit.DataPacket) error
SendDataPacketStub func(*livekit.DataPacket, []byte) error
sendDataPacketMutex sync.RWMutex
sendDataPacketArgsForCall []struct {
arg1 *livekit.DataPacket
arg2 []byte
}
sendDataPacketReturns struct {
result1 error
@@ -3591,18 +3592,24 @@ func (fake *FakeLocalParticipant) SendConnectionQualityUpdateReturnsOnCall(i int
}{result1}
}
func (fake *FakeLocalParticipant) SendDataPacket(arg1 *livekit.DataPacket) error {
func (fake *FakeLocalParticipant) SendDataPacket(arg1 *livekit.DataPacket, arg2 []byte) error {
var arg2Copy []byte
if arg2 != nil {
arg2Copy = make([]byte, len(arg2))
copy(arg2Copy, arg2)
}
fake.sendDataPacketMutex.Lock()
ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)]
fake.sendDataPacketArgsForCall = append(fake.sendDataPacketArgsForCall, struct {
arg1 *livekit.DataPacket
}{arg1})
arg2 []byte
}{arg1, arg2Copy})
stub := fake.SendDataPacketStub
fakeReturns := fake.sendDataPacketReturns
fake.recordInvocation("SendDataPacket", []interface{}{arg1})
fake.recordInvocation("SendDataPacket", []interface{}{arg1, arg2Copy})
fake.sendDataPacketMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@@ -3616,17 +3623,17 @@ func (fake *FakeLocalParticipant) SendDataPacketCallCount() int {
return len(fake.sendDataPacketArgsForCall)
}
func (fake *FakeLocalParticipant) SendDataPacketCalls(stub func(*livekit.DataPacket) error) {
func (fake *FakeLocalParticipant) SendDataPacketCalls(stub func(*livekit.DataPacket, []byte) error) {
fake.sendDataPacketMutex.Lock()
defer fake.sendDataPacketMutex.Unlock()
fake.SendDataPacketStub = stub
}
func (fake *FakeLocalParticipant) SendDataPacketArgsForCall(i int) *livekit.DataPacket {
func (fake *FakeLocalParticipant) SendDataPacketArgsForCall(i int) (*livekit.DataPacket, []byte) {
fake.sendDataPacketMutex.RLock()
defer fake.sendDataPacketMutex.RUnlock()
argsForCall := fake.sendDataPacketArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) SendDataPacketReturns(result1 error) {
+51 -11
View File
@@ -21,6 +21,7 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/selector"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
@@ -38,6 +39,7 @@ type RTCService struct {
isDev bool
limits config.LimitConfig
parser *uaparser.Parser
telemetry telemetry.TelemetryService
}
func NewRTCService(
@@ -46,6 +48,7 @@ func NewRTCService(
store ServiceStore,
router routing.MessageRouter,
currentNode routing.LocalNode,
telemetry telemetry.TelemetryService,
) *RTCService {
s := &RTCService{
router: router,
@@ -57,6 +60,7 @@ func NewRTCService(
isDev: conf.Development,
limits: conf.Limit,
parser: uaparser.NewFromSaved(),
telemetry: telemetry,
}
// allow connections from any origin, since script may be hosted anywhere
@@ -197,13 +201,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, livekit.RoomID(rm.Sid)),
pi.Identity,
"",
false,
)
// wait for the first message before upgrading to websocket. If no one is
// responding to our connection attempt, we should terminate the connection
// instead of waiting forever on the WebSocket
@@ -214,6 +211,25 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if !pi.Reconnect && initialResponse.GetJoin() != nil {
pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid())
}
var signalStats *telemetry.BytesTrackStats
if pi.ID != "" {
signalStats = telemetry.NewBytesTrackStats(
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID),
pi.ID,
s.telemetry)
}
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, livekit.RoomID(rm.Sid)),
pi.Identity,
pi.ID,
false,
)
done := make(chan struct{})
// function exits when websocket terminates, it'll close the event reading off of response sink as well
defer func() {
@@ -221,6 +237,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resSource.Close()
reqSink.Close()
close(done)
if signalStats != nil {
signalStats.Report()
}
}()
// upgrade only once the basics are good to go
@@ -233,9 +253,13 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// websocket established
sigConn := NewWSSignalConnection(conn)
if err := sigConn.WriteResponse(initialResponse); err != nil {
if count, err := sigConn.WriteResponse(initialResponse); err != nil {
pLogger.Warnw("could not write initial response", err)
return
} else {
if signalStats != nil {
signalStats.AddBytes(uint64(count), true)
}
}
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
@@ -265,9 +289,19 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
continue
}
if err = sigConn.WriteResponse(res); err != nil {
if pi.ID == "" && initialResponse.GetJoin() != nil {
pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid())
signalStats = telemetry.NewBytesTrackStats(
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID),
pi.ID,
s.telemetry)
}
if count, err := sigConn.WriteResponse(res); err != nil {
pLogger.Warnw("error writing to websocket", err)
return
} else if signalStats != nil {
signalStats.AddBytes(uint64(count), true)
}
}
}
@@ -275,7 +309,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// handle incoming requests from websocket
for {
req, err := sigConn.ReadRequest()
req, count, err := sigConn.ReadRequest()
// normal closure
if err != nil {
if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") ||
@@ -287,8 +321,11 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
}
if signalStats != nil {
signalStats.AddBytes(uint64(count), false)
}
if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok {
_ = sigConn.WriteResponse(&livekit.SignalResponse{
count, perr := sigConn.WriteResponse(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Pong{
//
// Although this field is int64, some clients (like JS) cause overflow if nanosecond granularity is used.
@@ -297,6 +334,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Pong: time.Now().UnixMilli(),
},
})
if perr == nil && signalStats != nil {
signalStats.AddBytes(uint64(count), true)
}
continue
}
if err := reqSink.WriteMessage(req); err != nil {
+1 -1
View File
@@ -67,7 +67,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
ingressRPCClient := getIngressRPCClient(rpc)
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher)
if err != nil {
+8 -8
View File
@@ -35,12 +35,12 @@ func NewWSSignalConnection(conn types.WebsocketClient) *WSSignalConnection {
return wsc
}
func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, error) {
func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, int, error) {
for {
// handle special messages and pass on the rest
messageType, payload, err := c.conn.ReadMessage()
if err != nil {
return nil, err
return nil, 0, err
}
msg := &livekit.SignalRequest{}
@@ -54,22 +54,22 @@ func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, error) {
}
// protobuf encoded
err := proto.Unmarshal(payload, msg)
return msg, err
return msg, len(payload), err
case websocket.TextMessage:
c.mu.Lock()
// json encoded, also write back JSON
c.useJSON = true
c.mu.Unlock()
err := protojson.Unmarshal(payload, msg)
return msg, err
return msg, len(payload), err
default:
logger.Debugw("unsupported message", "message", messageType)
return nil, nil
return nil, len(payload), nil
}
}
}
func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error {
func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) (int, error) {
var msgType int
var payload []byte
var err error
@@ -85,10 +85,10 @@ func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error {
payload, err = proto.Marshal(msg)
}
if err != nil {
return err
return 0, err
}
return c.conn.WriteMessage(msgType, payload)
return len(payload), c.conn.WriteMessage(msgType, payload)
}
func (c *WSSignalConnection) pingWorker() {
+90
View File
@@ -0,0 +1,90 @@
package telemetry
import (
"fmt"
"time"
"go.uber.org/atomic"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)
const statsReportInterval = 10 * time.Second
type BytesTrackType string
const (
BytesTrackTypeData BytesTrackType = "DT"
BytesTrackTypeSignal BytesTrackType = "SG"
)
// stats for signal and data channel
type BytesTrackStats struct {
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
lastStatsReport atomic.Value // *time.Time
telemetry TelemetryService
}
func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats {
s := &BytesTrackStats{
trackID: trackID,
pID: pID,
telemetry: telemetry,
}
now := time.Now()
s.lastStatsReport.Store(&now)
return s
}
func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
if isSend {
s.send.Add(bytes)
} else {
s.recv.Add(bytes)
}
s.report(false)
}
func (s *BytesTrackStats) Report() {
s.report(true)
}
func (p *BytesTrackStats) report(force bool) {
now := time.Now()
if !force {
lr := p.lastStatsReport.Load().(*time.Time)
if time.Since(*lr) < statsReportInterval {
return
}
if !p.lastStatsReport.CompareAndSwap(lr, &now) {
return
}
} else {
p.lastStatsReport.Store(&now)
}
if recv := p.recv.Swap(0); recv > 0 {
p.telemetry.TrackStats(livekit.StreamType_UPSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{PrimaryBytes: recv},
},
})
}
if send := p.send.Swap(0); send > 0 {
p.telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
{PrimaryBytes: send},
},
})
}
}
func BytesTrackIDForParticipantID(typ BytesTrackType, participantID livekit.ParticipantID) livekit.TrackID {
return livekit.TrackID(fmt.Sprintf("%s_%s%s", utils.TrackPrefix, string(typ), participantID))
}
+6 -1
View File
@@ -589,7 +589,12 @@ func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error
},
}
return c.publisher.SendDataPacket(dp)
dpData, err := proto.Marshal(dp)
if err != nil {
return err
}
return c.publisher.SendDataPacket(dp, dpData)
}
func (c *RTCClient) GetPublishedTrackIDs() []string {