From 3c907ed46030fa27960a3e49ef8034fb0f9644ff Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 30 Nov 2022 14:53:19 +0800 Subject: [PATCH] Add stats for data channel and signal (#1198) * Add stats for data channel and signal * Solve comment --- pkg/rtc/participant.go | 12 ++- pkg/rtc/participant_signal.go | 8 +- pkg/rtc/room.go | 23 ++++- pkg/rtc/room_test.go | 6 +- pkg/rtc/transport.go | 8 +- pkg/rtc/transportmanager.go | 4 +- pkg/rtc/types/interfaces.go | 2 +- .../typesfakes/fake_local_participant.go | 23 +++-- pkg/service/rtcservice.go | 62 ++++++++++--- pkg/service/wire_gen.go | 2 +- pkg/service/wsprotocol.go | 16 ++-- pkg/telemetry/signalanddatastats.go | 90 +++++++++++++++++++ test/client/client.go | 7 +- 13 files changed, 217 insertions(+), 46 deletions(-) create mode 100644 pkg/telemetry/signalanddatastats.go diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index bbd1113bd..8756e959f 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -129,6 +129,8 @@ type ParticipantImpl struct { subscribedTo map[livekit.ParticipantID]struct{} unpublishedTracks []*livekit.TrackInfo + dataChannelStats *telemetry.BytesTrackStats + rttUpdatedAt time.Time lastRTT uint32 @@ -184,7 +186,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { subscriptionInProgress: make(map[livekit.TrackID]bool), subscriptionRequestsQueue: make(map[livekit.TrackID][]SubscribeRequest), trackPublisherVersion: make(map[livekit.TrackID]uint32), - supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), + dataChannelStats: telemetry.NewBytesTrackStats( + telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), + params.SID, + params.Telemetry), + supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), } p.version.Store(params.InitialVersion) p.migrateState.Store(types.MigrateStateInit) @@ -601,6 +607,8 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.TransportManager.Close() }() + + p.dataChannelStats.Report() return nil } @@ -1187,6 +1195,8 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt return } + p.dataChannelStats.AddBytes(uint64(len(data)), false) + dp := livekit.DataPacket{} if err := proto.Unmarshal(data, &dp); err != nil { p.params.Logger.Warnw("could not parse data packet", err) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index fa61ed5e8..e2a4bf576 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -104,12 +104,16 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) err }) } -func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket) error { +func (p *ParticipantImpl) SendDataPacket(dp *livekit.DataPacket, data []byte) error { if p.State() != livekit.ParticipantInfo_ACTIVE { return ErrDataChannelUnavailable } - return p.TransportManager.SendDataPacket(dp) + err := p.TransportManager.SendDataPacket(dp, data) + if err == nil { + p.dataChannelStats.AddBytes(uint64(len(data)), true) + } + return err } func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index a55c716f4..f8bebf29a 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -772,6 +772,7 @@ func (r *Room) onParticipantUpdate(p types.LocalParticipant) { func (r *Room) onDataPacket(source types.LocalParticipant, dp *livekit.DataPacket) { dest := dp.GetUser().GetDestinationSids() + var dpData []byte for _, op := range r.GetParticipants() { if op.State() != livekit.ParticipantInfo_ACTIVE { @@ -792,7 +793,16 @@ func (r *Room) onDataPacket(source types.LocalParticipant, dp *livekit.DataPacke continue } } - err := op.SendDataPacket(dp) + if dpData == nil { + var err error + dpData, err = proto.Marshal(dp) + if err != nil { + r.Logger.Errorw("failed to marshal data packet", err) + return + } + } + + err := op.SendDataPacket(dp, dpData) if err != nil { r.Logger.Infow("send data packet error", "error", err, "participant", op.Identity()) } @@ -879,9 +889,18 @@ func (r *Room) sendActiveSpeakers(speakers []*livekit.SpeakerInfo) { }, } + var dpData []byte for _, p := range r.GetParticipants() { if p.ProtocolVersion().HandlesDataPackets() && !p.ProtocolVersion().SupportsSpeakerChanged() { - _ = p.SendDataPacket(dp) + if dpData == nil { + var err error + dpData, err = proto.Marshal(dp) + if err != nil { + r.Logger.Errorw("failed to marshal ActiveSpeaker data packet", err) + return + } + } + _ = p.SendDataPacket(dp, dpData) } } } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index cf7528c88..7be24d82c 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -554,7 +554,8 @@ func TestDataChannel(t *testing.T) { continue } require.Equal(t, 1, fp.SendDataPacketCallCount()) - require.Equal(t, packet.Value, fp.SendDataPacketArgsForCall(0).Value) + dp, _ := fp.SendDataPacketArgsForCall(0) + require.Equal(t, packet.Value, dp.Value) } }) @@ -585,7 +586,8 @@ func TestDataChannel(t *testing.T) { } } require.Equal(t, 1, p1.SendDataPacketCallCount()) - require.Equal(t, packet.Value, p1.SendDataPacketArgsForCall(0).Value) + dp, _ := p1.SendDataPacketArgsForCall(0) + require.Equal(t, packet.Value, dp.Value) }) t.Run("publishing disallowed", func(t *testing.T) { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 9508fd52f..951cf143e 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -18,7 +18,6 @@ import ( "github.com/pion/webrtc/v3" "github.com/pkg/errors" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -726,12 +725,7 @@ func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error { return t.pc.WriteRTCP(pkts) } -func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket) error { - data, err := proto.Marshal(dp) - if err != nil { - return err - } - +func (t *PCTransport) SendDataPacket(dp *livekit.DataPacket, data []byte) error { var dc *webrtc.DataChannel t.lock.RLock() if dp.Kind == livekit.DataPacket_RELIABLE { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index ee8042d1c..7f961d952 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -284,9 +284,9 @@ func (t *TransportManager) OnDataMessage(f func(kind livekit.DataPacket_Kind, da t.publisher.OnDataPacket(f) } -func (t *TransportManager) SendDataPacket(dp *livekit.DataPacket) error { +func (t *TransportManager) SendDataPacket(dp *livekit.DataPacket, data []byte) error { // downstream data is sent via primary peer connection - return t.getTransport(true).SendDataPacket(dp) + return t.getTransport(true).SendDataPacket(dp, data) } func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels []*livekit.DataChannelInfo) error { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 8a74d4513..21538a7e1 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -289,7 +289,7 @@ type LocalParticipant interface { SendJoinResponse(joinResponse *livekit.JoinResponse) error SendParticipantUpdate(participants []*livekit.ParticipantInfo) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error - SendDataPacket(packet *livekit.DataPacket) error + SendDataPacket(packet *livekit.DataPacket, data []byte) error SendRoomUpdate(room *livekit.Room) error SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 8370ca176..557931f97 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -540,10 +540,11 @@ type FakeLocalParticipant struct { sendConnectionQualityUpdateReturnsOnCall map[int]struct { result1 error } - SendDataPacketStub func(*livekit.DataPacket) error + SendDataPacketStub func(*livekit.DataPacket, []byte) error sendDataPacketMutex sync.RWMutex sendDataPacketArgsForCall []struct { arg1 *livekit.DataPacket + arg2 []byte } sendDataPacketReturns struct { result1 error @@ -3591,18 +3592,24 @@ func (fake *FakeLocalParticipant) SendConnectionQualityUpdateReturnsOnCall(i int }{result1} } -func (fake *FakeLocalParticipant) SendDataPacket(arg1 *livekit.DataPacket) error { +func (fake *FakeLocalParticipant) SendDataPacket(arg1 *livekit.DataPacket, arg2 []byte) error { + var arg2Copy []byte + if arg2 != nil { + arg2Copy = make([]byte, len(arg2)) + copy(arg2Copy, arg2) + } fake.sendDataPacketMutex.Lock() ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)] fake.sendDataPacketArgsForCall = append(fake.sendDataPacketArgsForCall, struct { arg1 *livekit.DataPacket - }{arg1}) + arg2 []byte + }{arg1, arg2Copy}) stub := fake.SendDataPacketStub fakeReturns := fake.sendDataPacketReturns - fake.recordInvocation("SendDataPacket", []interface{}{arg1}) + fake.recordInvocation("SendDataPacket", []interface{}{arg1, arg2Copy}) fake.sendDataPacketMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -3616,17 +3623,17 @@ func (fake *FakeLocalParticipant) SendDataPacketCallCount() int { return len(fake.sendDataPacketArgsForCall) } -func (fake *FakeLocalParticipant) SendDataPacketCalls(stub func(*livekit.DataPacket) error) { +func (fake *FakeLocalParticipant) SendDataPacketCalls(stub func(*livekit.DataPacket, []byte) error) { fake.sendDataPacketMutex.Lock() defer fake.sendDataPacketMutex.Unlock() fake.SendDataPacketStub = stub } -func (fake *FakeLocalParticipant) SendDataPacketArgsForCall(i int) *livekit.DataPacket { +func (fake *FakeLocalParticipant) SendDataPacketArgsForCall(i int) (*livekit.DataPacket, []byte) { fake.sendDataPacketMutex.RLock() defer fake.sendDataPacketMutex.RUnlock() argsForCall := fake.sendDataPacketArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeLocalParticipant) SendDataPacketReturns(result1 error) { diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 76c421aaa..0879e6ec3 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -21,6 +21,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/selector" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) @@ -38,6 +39,7 @@ type RTCService struct { isDev bool limits config.LimitConfig parser *uaparser.Parser + telemetry telemetry.TelemetryService } func NewRTCService( @@ -46,6 +48,7 @@ func NewRTCService( store ServiceStore, router routing.MessageRouter, currentNode routing.LocalNode, + telemetry telemetry.TelemetryService, ) *RTCService { s := &RTCService{ router: router, @@ -57,6 +60,7 @@ func NewRTCService( isDev: conf.Development, limits: conf.Limit, parser: uaparser.NewFromSaved(), + telemetry: telemetry, } // allow connections from any origin, since script may be hosted anywhere @@ -197,13 +201,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, livekit.RoomID(rm.Sid)), - pi.Identity, - "", - false, - ) - // wait for the first message before upgrading to websocket. If no one is // responding to our connection attempt, we should terminate the connection // instead of waiting forever on the WebSocket @@ -214,6 +211,25 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + if !pi.Reconnect && initialResponse.GetJoin() != nil { + pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) + } + + var signalStats *telemetry.BytesTrackStats + if pi.ID != "" { + signalStats = telemetry.NewBytesTrackStats( + telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID), + pi.ID, + s.telemetry) + } + + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, livekit.RoomID(rm.Sid)), + pi.Identity, + pi.ID, + false, + ) + done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { @@ -221,6 +237,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { resSource.Close() reqSink.Close() close(done) + + if signalStats != nil { + signalStats.Report() + } }() // upgrade only once the basics are good to go @@ -233,9 +253,13 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // websocket established sigConn := NewWSSignalConnection(conn) - if err := sigConn.WriteResponse(initialResponse); err != nil { + if count, err := sigConn.WriteResponse(initialResponse); err != nil { pLogger.Warnw("could not write initial response", err) return + } else { + if signalStats != nil { + signalStats.AddBytes(uint64(count), true) + } } prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) @@ -265,9 +289,19 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { continue } - if err = sigConn.WriteResponse(res); err != nil { + if pi.ID == "" && initialResponse.GetJoin() != nil { + pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) + signalStats = telemetry.NewBytesTrackStats( + telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID), + pi.ID, + s.telemetry) + } + + if count, err := sigConn.WriteResponse(res); err != nil { pLogger.Warnw("error writing to websocket", err) return + } else if signalStats != nil { + signalStats.AddBytes(uint64(count), true) } } } @@ -275,7 +309,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // handle incoming requests from websocket for { - req, err := sigConn.ReadRequest() + req, count, err := sigConn.ReadRequest() // normal closure if err != nil { if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") || @@ -287,8 +321,11 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } + if signalStats != nil { + signalStats.AddBytes(uint64(count), false) + } if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok { - _ = sigConn.WriteResponse(&livekit.SignalResponse{ + count, perr := sigConn.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Pong{ // // Although this field is int64, some clients (like JS) cause overflow if nanosecond granularity is used. @@ -297,6 +334,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { Pong: time.Now().UnixMilli(), }, }) + if perr == nil && signalStats != nil { + signalStats.AddBytes(uint64(count), true) + } continue } if err := reqSink.WriteMessage(req); err != nil { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index dd00814b6..af7110598 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -67,7 +67,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live ingressRPCClient := getIngressRPCClient(rpc) ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) - rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) + rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService) clientConfigurationManager := createClientConfiguration() roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher) if err != nil { diff --git a/pkg/service/wsprotocol.go b/pkg/service/wsprotocol.go index 4b7ca3279..06e971719 100644 --- a/pkg/service/wsprotocol.go +++ b/pkg/service/wsprotocol.go @@ -35,12 +35,12 @@ func NewWSSignalConnection(conn types.WebsocketClient) *WSSignalConnection { return wsc } -func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, error) { +func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, int, error) { for { // handle special messages and pass on the rest messageType, payload, err := c.conn.ReadMessage() if err != nil { - return nil, err + return nil, 0, err } msg := &livekit.SignalRequest{} @@ -54,22 +54,22 @@ func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, error) { } // protobuf encoded err := proto.Unmarshal(payload, msg) - return msg, err + return msg, len(payload), err case websocket.TextMessage: c.mu.Lock() // json encoded, also write back JSON c.useJSON = true c.mu.Unlock() err := protojson.Unmarshal(payload, msg) - return msg, err + return msg, len(payload), err default: logger.Debugw("unsupported message", "message", messageType) - return nil, nil + return nil, len(payload), nil } } } -func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error { +func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) (int, error) { var msgType int var payload []byte var err error @@ -85,10 +85,10 @@ func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error { payload, err = proto.Marshal(msg) } if err != nil { - return err + return 0, err } - return c.conn.WriteMessage(msgType, payload) + return len(payload), c.conn.WriteMessage(msgType, payload) } func (c *WSSignalConnection) pingWorker() { diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go new file mode 100644 index 000000000..ae6ac2aad --- /dev/null +++ b/pkg/telemetry/signalanddatastats.go @@ -0,0 +1,90 @@ +package telemetry + +import ( + "fmt" + "time" + + "go.uber.org/atomic" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" +) + +const statsReportInterval = 10 * time.Second + +type BytesTrackType string + +const ( + BytesTrackTypeData BytesTrackType = "DT" + BytesTrackTypeSignal BytesTrackType = "SG" +) + +// stats for signal and data channel +type BytesTrackStats struct { + trackID livekit.TrackID + pID livekit.ParticipantID + send, recv atomic.Uint64 + lastStatsReport atomic.Value // *time.Time + telemetry TelemetryService +} + +func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats { + s := &BytesTrackStats{ + trackID: trackID, + pID: pID, + telemetry: telemetry, + } + now := time.Now() + s.lastStatsReport.Store(&now) + return s +} + +func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { + if isSend { + s.send.Add(bytes) + } else { + s.recv.Add(bytes) + } + + s.report(false) +} + +func (s *BytesTrackStats) Report() { + s.report(true) +} + +func (p *BytesTrackStats) report(force bool) { + now := time.Now() + if !force { + lr := p.lastStatsReport.Load().(*time.Time) + if time.Since(*lr) < statsReportInterval { + return + } + + if !p.lastStatsReport.CompareAndSwap(lr, &now) { + return + } + } else { + p.lastStatsReport.Store(&now) + } + + if recv := p.recv.Swap(0); recv > 0 { + p.telemetry.TrackStats(livekit.StreamType_UPSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{ + Streams: []*livekit.AnalyticsStream{ + {PrimaryBytes: recv}, + }, + }) + } + + if send := p.send.Swap(0); send > 0 { + p.telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, p.pID, p.trackID, &livekit.AnalyticsStat{ + Streams: []*livekit.AnalyticsStream{ + {PrimaryBytes: send}, + }, + }) + } +} + +func BytesTrackIDForParticipantID(typ BytesTrackType, participantID livekit.ParticipantID) livekit.TrackID { + return livekit.TrackID(fmt.Sprintf("%s_%s%s", utils.TrackPrefix, string(typ), participantID)) +} diff --git a/test/client/client.go b/test/client/client.go index 0923b81f2..b886d90f8 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -589,7 +589,12 @@ func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error }, } - return c.publisher.SendDataPacket(dp) + dpData, err := proto.Marshal(dp) + if err != nil { + return err + } + + return c.publisher.SendDataPacket(dp, dpData) } func (c *RTCClient) GetPublishedTrackIDs() []string {