From 9616149a88a451a71ee411b078b4d914aff0eec9 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 11 May 2021 00:31:18 -0700 Subject: [PATCH] collect inbound and outbound packet stats --- go.mod | 2 + pkg/rtc/mediatrack.go | 80 ++++++++-------- pkg/rtc/participant.go | 100 ++++++++++++------- pkg/rtc/participant_internal_test.go | 18 ++-- pkg/rtc/room.go | 38 +++++++- pkg/rtc/stats.go | 138 +++++++++++++++++++++++++++ pkg/rtc/transport.go | 36 +++++-- pkg/service/roommanager.go | 27 +++++- test/client/client.go | 10 +- 9 files changed, 345 insertions(+), 104 deletions(-) create mode 100644 pkg/rtc/stats.go diff --git a/go.mod b/go.mod index e8ff5a4da..809dbe4f9 100644 --- a/go.mod +++ b/go.mod @@ -16,12 +16,14 @@ require ( github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 github.com/pion/ice/v2 v2.1.7 + github.com/pion/interceptor v0.0.12 github.com/pion/ion-sfu v1.10.0 github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.5 github.com/pion/sdp/v3 v3.0.4 github.com/pion/stun v0.3.5 + github.com/pion/transport v0.12.3 github.com/pion/turn/v2 v2.0.5 github.com/pion/webrtc/v3 v3.0.29 github.com/pkg/errors v0.9.1 diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 03506922b..7ce875797 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -29,44 +29,43 @@ var ( // MediaTrack represents a WebRTC track that needs to be forwarded // Implements the PublishedTrack interface type MediaTrack struct { - id string - participantId string - muted utils.AtomicFlag - - ssrc webrtc.SSRC - name string - streamID string - kind livekit.TrackType - codec webrtc.RTPCodecParameters - bufferFactory *buffer.Factory - receiverConf ReceiverConfig - audioConf config.AudioConfig - onClose func() + params MediaTrackParams + ssrc webrtc.SSRC + name string + streamID string + kind livekit.TrackType + codec webrtc.RTPCodecParameters + muted utils.AtomicFlag // channel to send RTCP packets to the source - rtcpCh chan []rtcp.Packet - lock sync.RWMutex + lock sync.RWMutex // map of target participantId -> *SubscribedTrack subscribedTracks map[string]*SubscribedTrack twcc *twcc.Responder audioLevel *AudioLevel receiver sfu.Receiver - //lastNack int64 - lastPLI time.Time + lastPLI time.Time + + onClose func() } -func NewMediaTrack(trackId string, pId string, rtcpCh chan []rtcp.Packet, track *webrtc.TrackRemote, bufferFactory *buffer.Factory, rc ReceiverConfig, ac config.AudioConfig) *MediaTrack { +type MediaTrackParams struct { + TrackID string + ParticipantID string + RTCPChan chan []rtcp.Packet + BufferFactory *buffer.Factory + ReceiverConfig ReceiverConfig + AudioConfig config.AudioConfig + Stats *StatsReporter +} + +func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTrack { t := &MediaTrack{ - id: trackId, - participantId: pId, + params: params, ssrc: track.SSRC(), streamID: track.StreamID(), kind: ToProtoTrackKind(track.Kind()), codec: track.Codec(), - bufferFactory: bufferFactory, - receiverConf: rc, - audioConf: ac, - rtcpCh: rtcpCh, lock: sync.RWMutex{}, subscribedTracks: make(map[string]*SubscribedTrack), } @@ -78,7 +77,7 @@ func (t *MediaTrack) Start() { } func (t *MediaTrack) ID() string { - return t.id + return t.params.TrackID } func (t *MediaTrack) Kind() livekit.TrackType { @@ -130,11 +129,11 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } // using DownTrack from ion-sfu - streamId := t.participantId + streamId := t.params.ParticipantID if sub.ProtocolVersion().SupportsPackedStreamId() { // when possible, pack both IDs in streamID to allow new streams to be generated // react-native-webrtc still uses stream based APIs and require this - streamId = PackStreamID(t.participantId, t.ID()) + streamId = PackStreamID(t.params.ParticipantID, t.ID()) } receiver := NewWrappedReceiver(t.receiver, t.ID(), streamId) downTrack, err := sfu.NewDownTrack(webrtc.RTPCodecCapability{ @@ -143,7 +142,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { Channels: codec.Channels, SDPFmtpLine: codec.SDPFmtpLine, RTCPFeedback: feedbackTypes, - }, receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize) + }, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.packetBufferSize) if err != nil { return err } @@ -179,8 +178,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return } logger.Debugw("removing peerconnection track", - "track", t.id, - "participantId", t.participantId, + "track", t.params.TrackID, + "participantId", t.params.ParticipantID, "destParticipant", sub.Identity()) if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { if err == webrtc.ErrConnectionClosed { @@ -194,14 +193,14 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } } - sub.RemoveSubscribedTrack(t.participantId, subTrack) + sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack) sub.Negotiate() }) t.subscribedTracks[sub.ID()] = subTrack t.receiver.AddDownTrack(downTrack, true) - sub.AddSubscribedTrack(t.participantId, subTrack) + sub.AddSubscribedTrack(t.params.ParticipantID, subTrack) sub.Negotiate() return nil @@ -213,14 +212,17 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra defer t.lock.Unlock() //rid := track.RID() - buff, rtcpReader := t.bufferFactory.GetBufferPair(uint32(track.SSRC())) + buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC())) buff.OnFeedback(func(fb []rtcp.Packet) { + if t.params.Stats != nil { + t.params.Stats.incoming.HandleRTCP(fb) + } // feedback for the source RTCP - t.rtcpCh <- fb + t.params.RTCPChan <- fb }) if t.Kind() == livekit.TrackType_AUDIO { - t.audioLevel = NewAudioLevel(t.audioConf.ActiveLevel, t.audioConf.MinPercentile) + t.audioLevel = NewAudioLevel(t.params.AudioConfig.ActiveLevel, t.params.AudioConfig.MinPercentile) buff.OnAudioLevel(func(level uint8) { t.audioLevel.Observe(level) }) @@ -250,8 +252,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra }) if t.receiver == nil { - t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.participantId) - t.receiver.SetRTCPCh(t.rtcpCh) + t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.params.ParticipantID) + t.receiver.SetRTCPCh(t.params.RTCPChan) t.receiver.OnCloseHandler(func() { t.lock.Lock() t.receiver = nil @@ -266,7 +268,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.receiver.AddUpTrack(track, buff, true) buff.Bind(receiver.GetParameters(), buffer.Options{ - MaxBitRate: t.receiverConf.maxBitrate, + MaxBitRate: t.params.ReceiverConfig.maxBitrate, }) } @@ -282,7 +284,7 @@ func (t *MediaTrack) RemoveSubscriber(participantId string) { } func (t *MediaTrack) RemoveAllSubscribers() { - logger.Debugw("removing all subscribers", "track", t.id) + logger.Debugw("removing all subscribers", "track", t.params.TrackID) t.lock.RLock() defer t.lock.RUnlock() for _, subTrack := range t.subscribedTracks { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 9ab64fbb5..1d509da18 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -29,19 +29,25 @@ const ( sdBatchSize = 15 ) +type ParticipantParams struct { + Identity string + Config *WebRTCConfig + Sink routing.MessageSink + AudioConfig config.AudioConfig + ProtocolVersion types.ProtocolVersion + Stats *StatsReporter +} + type ParticipantImpl struct { - id string - publisher *PCTransport - subscriber *PCTransport - responseSink routing.MessageSink - audioConfig config.AudioConfig - isClosed utils.AtomicFlag - conf *WebRTCConfig - identity string - permission *livekit.ParticipantPermission - state atomic.Value // livekit.ParticipantInfo_State - rtcpCh chan []rtcp.Packet - protocolVersion types.ProtocolVersion + params ParticipantParams + id string + publisher *PCTransport + subscriber *PCTransport + isClosed utils.AtomicFlag + permission *livekit.ParticipantPermission + state atomic.Value // livekit.ParticipantInfo_State + rtcpCh chan []rtcp.Packet + // reliable and unreliable data channels reliableDC *webrtc.DataChannel lossyDC *webrtc.DataChannel @@ -74,16 +80,12 @@ type ParticipantImpl struct { onClose func(types.Participant) } -func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig, pv types.ProtocolVersion) (*ParticipantImpl, error) { +func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { // TODO: check to ensure params are valid, id and identity can't be empty p := &ParticipantImpl{ + params: params, id: utils.NewGuid(utils.ParticipantPrefix), - identity: identity, - responseSink: rs, - audioConfig: ac, - conf: conf, - protocolVersion: pv, rtcpCh: make(chan []rtcp.Packet, 50), subscribedTracks: make(map[string][]types.SubscribedTrack), lock: sync.RWMutex{}, @@ -94,11 +96,19 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, p.state.Store(livekit.ParticipantInfo_JOINING) var err error - p.publisher, err = NewPCTransport(livekit.SignalTarget_PUBLISHER, conf) + p.publisher, err = NewPCTransport(TransportParams{ + Target: livekit.SignalTarget_PUBLISHER, + Config: params.Config, + Stats: p.params.Stats, + }) if err != nil { return nil, err } - p.subscriber, err = NewPCTransport(livekit.SignalTarget_SUBSCRIBER, conf) + p.subscriber, err = NewPCTransport(TransportParams{ + Target: livekit.SignalTarget_SUBSCRIBER, + Config: params.Config, + Stats: p.params.Stats, + }) if err != nil { return nil, err } @@ -130,7 +140,7 @@ func (p *ParticipantImpl) ID() string { } func (p *ParticipantImpl) Identity() string { - return p.identity + return p.params.Identity } func (p *ParticipantImpl) State() livekit.ParticipantInfo_State { @@ -138,7 +148,7 @@ func (p *ParticipantImpl) State() livekit.ParticipantInfo_State { } func (p *ParticipantImpl) ProtocolVersion() types.ProtocolVersion { - return p.protocolVersion + return p.params.ProtocolVersion } func (p *ParticipantImpl) IsReady() bool { @@ -170,7 +180,7 @@ func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet { func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { info := &livekit.ParticipantInfo{ Sid: p.id, - Identity: p.identity, + Identity: p.params.Identity, Metadata: p.metadata, State: p.State(), JoinedAt: p.ConnectedAt().Unix(), @@ -185,11 +195,11 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { } func (p *ParticipantImpl) GetResponseSink() routing.MessageSink { - return p.responseSink + return p.params.Sink } func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { - p.responseSink = sink + p.params.Sink = sink } func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine { @@ -197,6 +207,7 @@ func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine { } // callbacks for clients + func (p *ParticipantImpl) OnTrackPublished(callback func(types.Participant, types.PublishedTrack)) { p.onTrackPublished = callback } @@ -247,11 +258,14 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web "participant", p.Identity(), //"sdp", sdp.SDP, ) - p.writeMessage(&livekit.SignalResponse{ + err = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ Answer: ToProtoSessionDescription(answer), }, }) + if err != nil { + return + } if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) @@ -259,7 +273,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web return } -// client intends to publish track, this function records track details and schedules negotiation +// AddTrack is called when client intends to publish track. +// records track details and lets client know it's ok to proceed func (p *ParticipantImpl) AddTrack(clientId, name string, trackType livekit.TrackType) { p.lock.Lock() defer p.lock.Unlock() @@ -276,7 +291,7 @@ func (p *ParticipantImpl) AddTrack(clientId, name string, trackType livekit.Trac } p.pendingTracks[clientId] = ti - p.writeMessage(&livekit.SignalResponse{ + _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_TrackPublished{ TrackPublished: &livekit.TrackPublishedResponse{ Cid: clientId, @@ -361,7 +376,7 @@ func (p *ParticipantImpl) Close() error { p.publisher.pc.OnICECandidate(nil) // ensure this is synchronized p.lock.RLock() - p.responseSink.Close() + p.params.Sink.Close() onClose := p.onClose p.lock.RUnlock() if onClose != nil { @@ -422,6 +437,7 @@ func (p *ParticipantImpl) RemoveSubscriber(participantId string) { } // signal connection methods + func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []types.Participant, iceServers []*livekit.ICEServer) error { // send Join response return p.writeMessage(&livekit.SignalResponse{ @@ -500,7 +516,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) { if currentMuted != track.IsMuted() && p.onTrackUpdated != nil { logger.Debugw("mute status changed", - "participant", p.identity, + "participant", p.Identity(), "track", trackId, "muted", track.IsMuted()) p.onTrackUpdated(p, track) @@ -552,7 +568,7 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { return subscribed } -// add a track to the participant's subscribed list +// AddSubscribedTrack adds a track to the participant's subscribed list func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { logger.Debugw("added subscribedTrack", "srcParticipant", pubId, "participant", p.Identity()) @@ -561,7 +577,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.Subscr p.lock.Unlock() } -// remove a track to the participant's subscribed list +// RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { logger.Debugw("removed subscribedTrack", "srcParticipant", pubId, "participant", p.Identity()) @@ -611,13 +627,13 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { if p.State() == livekit.ParticipantInfo_DISCONNECTED { return nil } - sink := p.responseSink + sink := p.params.Sink err := sink.WriteMessage(msg) if err != nil { logger.Warnw("could not send message to participant", "error", err, "id", p.ID(), - "participant", p.identity, + "participant", p.Identity(), "message", fmt.Sprintf("%T", msg.Message)) return err } @@ -672,7 +688,15 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w if trk, ok := ptrack.(*MediaTrack); ok { mt = trk } else { - mt = NewMediaTrack(ti.Sid, p.id, p.rtcpCh, track, p.conf.BufferFactory, p.conf.Receiver, p.audioConfig) + mt = NewMediaTrack(track, MediaTrackParams{ + TrackID: ti.Sid, + ParticipantID: p.id, + RTCPChan: p.rtcpCh, + BufferFactory: p.params.Config.BufferFactory, + ReceiverConfig: p.params.Config.Receiver, + AudioConfig: p.params.AudioConfig, + Stats: p.params.Stats, + }) mt.name = ti.Name newTrack = true } @@ -680,7 +704,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w if p.twcc == nil { p.twcc = twcc.NewTransportWideCCResponder(uint32(track.SSRC())) p.twcc.OnFeedback(func(pkt rtcp.RawPacket) { - p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt}) + _ = p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt}) }) } mt.AddReceiver(rtpReceiver, track, p.twcc) @@ -806,7 +830,9 @@ func (p *ParticipantImpl) handlePublisherICEStateChange(state webrtc.ICEConnecti if state == webrtc.ICEConnectionStateConnected { p.updateState(livekit.ParticipantInfo_ACTIVE) } else if state == webrtc.ICEConnectionStateDisconnected || state == webrtc.ICEConnectionStateFailed { - go p.Close() + go func() { + _ = p.Close() + }() } } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 75d43c15c..252b3f9a0 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -95,7 +95,7 @@ func TestTrackPublishing(t *testing.T) { p := newParticipantForTest("test") //track := &typesfakes.FakePublishedTrack{} //track.IDReturns("id") - sink := p.responseSink.(*routingfakes.FakeMessageSink) + sink := p.params.Sink.(*routingfakes.FakeMessageSink) p.AddTrack("cid", "webcam", livekit.TrackType_VIDEO) assert.Equal(t, 1, sink.WriteMessageCallCount()) res := sink.WriteMessageArgsForCall(0).(*livekit.SignalResponse) @@ -110,7 +110,7 @@ func TestTrackPublishing(t *testing.T) { p := newParticipantForTest("test") //track := &typesfakes.FakePublishedTrack{} //track.IDReturns("id") - sink := p.responseSink.(*routingfakes.FakeMessageSink) + sink := p.params.Sink.(*routingfakes.FakeMessageSink) p.AddTrack("cid", "webcam", livekit.TrackType_VIDEO) p.AddTrack("cid", "duplicate", livekit.TrackType_AUDIO) @@ -123,7 +123,7 @@ func TestDisconnectTiming(t *testing.T) { t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) { p := newParticipantForTest("test") msg := routing.NewMessageChannel() - p.responseSink = msg + p.params.Sink = msg go func() { for msg := range msg.ReadChan() { t.Log("received message from chan", msg) @@ -153,11 +153,11 @@ func newParticipantForTest(identity string) *ParticipantImpl { if err != nil { panic(err) } - p, _ := NewParticipant( - identity, - rtcConf, - &routingfakes.FakeMessageSink{}, - config.AudioConfig{}, - 0) + p, _ := NewParticipant(ParticipantParams{ + Identity: identity, + Config: rtcConf, + Sink: &routingfakes.FakeMessageSink{}, + ProtocolVersion: 0, + }) return p } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5f1fb02fc..5ea858f4f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -13,7 +13,8 @@ import ( ) const ( - DefaultEmptyTimeout = 5 * 60 // 5 mins + DefaultEmptyTimeout = 5 * 60 // 5m + DefaultRoomDepartureGrace = 20 ) type Room struct { @@ -33,6 +34,10 @@ type Room struct { audioUpdateInterval uint32 lastActiveSpeakers []*livekit.SpeakerInfo + // aggregate stats + incomingStats PacketStats + outgoingStats PacketStats + onParticipantChanged func(p types.Participant) onClose func() } @@ -199,6 +204,12 @@ func (r *Room) RemoveParticipant(identity string) { // close participant as well _ = p.Close() + // add participant stats + if pi, ok := p.(*ParticipantImpl); ok && pi.params.Stats != nil { + r.incomingStats.addFrom(pi.params.Stats.incoming) + r.outgoingStats.addFrom(pi.params.Stats.outgoing) + } + if len(r.participants) == 0 { r.leftAt.Store(time.Now().Unix()) } @@ -252,25 +263,42 @@ func (r *Room) CloseIfEmpty() { return } + timeout := r.EmptyTimeout var elapsed int64 if r.FirstJoinedAt() > 0 { - // compute elasped from last departure + // exit 20s after elapsed = time.Now().Unix() - r.LastLeftAt() + if timeout > DefaultRoomDepartureGrace { + timeout = DefaultRoomDepartureGrace + } } else { elapsed = time.Now().Unix() - r.CreationTime } - if elapsed >= int64(r.EmptyTimeout) { + + if elapsed >= int64(timeout) { r.Close() } } func (r *Room) Close() { + if !r.isClosed.TrySet(true) { + return + } logger.Infow("closing room", "room", r.Sid, "name", r.Name) - if r.isClosed.TrySet(true) && r.onClose != nil { + + if r.onClose != nil { r.onClose() } } +func (r *Room) GetIncomingStats() PacketStats { + return r.incomingStats +} + +func (r *Room) GetOutgoingStats() PacketStats { + return r.outgoingStats +} + func (r *Room) OnClose(f func()) { r.onClose = f } @@ -316,7 +344,7 @@ func (r *Room) onTrackAdded(participant types.Participant, track types.Published } } -func (r *Room) onTrackUpdated(p types.Participant, track types.PublishedTrack) { +func (r *Room) onTrackUpdated(p types.Participant, _ types.PublishedTrack) { // send track updates to everyone, especially if track was updated by admin r.broadcastParticipantState(p, false) if r.onParticipantChanged != nil { diff --git a/pkg/rtc/stats.go b/pkg/rtc/stats.go new file mode 100644 index 000000000..5623129d3 --- /dev/null +++ b/pkg/rtc/stats.go @@ -0,0 +1,138 @@ +package rtc + +import ( + "io" + "sync/atomic" + + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/transport/packetio" +) + +type PacketStats struct { + roomName string + identity string + kind string // incoming or outgoing + + PacketCount uint64 `json:"packetCount"` + NackCount uint64 `json:"nackCount"` + PLICount uint64 `json:"pliCount"` + FIRCount uint64 `json:"firCount"` +} + +func newPacketStats(room, identity, kind string) *PacketStats { + return &PacketStats{ + roomName: room, + identity: identity, + kind: kind, + } +} + +func (s *PacketStats) IncrementPackets(count uint64) { + atomic.AddUint64(&s.PacketCount, count) +} + +func (s *PacketStats) IncrementNack(count uint64) { + atomic.AddUint64(&s.NackCount, count) +} + +func (s *PacketStats) IncrementPLI(count uint64) { + atomic.AddUint64(&s.PLICount, count) +} + +func (s *PacketStats) IncrementFIR(count uint64) { + atomic.AddUint64(&s.FIRCount, count) +} + +func (s *PacketStats) HandleRTCP(pkts []rtcp.Packet) { + for _, rtcpPacket := range pkts { + switch rtcpPacket.(type) { + case *rtcp.TransportLayerNack: + s.IncrementNack(1) + case *rtcp.PictureLossIndication: + s.IncrementPLI(1) + case *rtcp.FullIntraRequest: + s.IncrementFIR(1) + } + } +} + +func (s *PacketStats) addFrom(o *PacketStats) { + s.PacketCount += atomic.LoadUint64(&o.PacketCount) + s.NackCount += atomic.LoadUint64(&o.NackCount) + s.PLICount += atomic.LoadUint64(&o.PLICount) + s.FIRCount += atomic.LoadUint64(&o.FIRCount) +} + +type StatsReporter struct { + incoming *PacketStats + outgoing *PacketStats +} + +func NewStatsReporter(roomName, identity string) *StatsReporter { + return &StatsReporter{ + incoming: newPacketStats(roomName, identity, "incoming"), + outgoing: newPacketStats(roomName, identity, "outgoing"), + } +} + +// StatsBufferWrapper wraps a buffer factory so we could get information on +// incoming packets +type StatsBufferWrapper struct { + createBufferFunc func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser + stats *PacketStats +} + +func (w *StatsBufferWrapper) CreateBuffer(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { + writer := w.createBufferFunc(packetType, ssrc) + if packetType == packetio.RTPBufferPacket { + // wrap this in a counter class + return &rtpReporterWriter{ + ReadWriteCloser: writer, + stats: w.stats, + } + } + return writer +} + +type rtpReporterWriter struct { + io.ReadWriteCloser + stats *PacketStats +} + +func (w *rtpReporterWriter) Write(p []byte) (n int, err error) { + w.stats.IncrementPackets(1) + return w.ReadWriteCloser.Write(p) +} + +// StatsInterceptor is created for each participant to keep of track of outgoing stats +// it adheres to Pion interceptor interface +type StatsInterceptor struct { + interceptor.NoOp + reporter *StatsReporter +} + +func NewStatsInterceptor(reporter *StatsReporter) *StatsInterceptor { + return &StatsInterceptor{ + reporter: reporter, + } +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + s.reporter.outgoing.HandleRTCP(pkts) + return writer.Write(pkts, attributes) + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + s.reporter.outgoing.IncrementPackets(1) + return writer.Write(header, payload, attributes) + }) +} diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 530084bc2..709595df5 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -7,6 +7,7 @@ import ( "github.com/bep/debounce" "github.com/livekit/livekit-server/pkg/logger" + "github.com/pion/interceptor" "github.com/pion/webrtc/v3" livekit "github.com/livekit/livekit-server/proto" @@ -37,10 +38,16 @@ type PCTransport struct { negotiationState atomic.Value } -func newPeerConnection(target livekit.SignalTarget, conf *WebRTCConfig) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { +type TransportParams struct { + Target livekit.SignalTarget + Config *WebRTCConfig + Stats *StatsReporter +} + +func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { var me *webrtc.MediaEngine var err error - if target == livekit.SignalTarget_PUBLISHER { + if params.Target == livekit.SignalTarget_PUBLISHER { me, err = createPubMediaEngine() } else { me, err = createSubMediaEngine() @@ -48,16 +55,31 @@ func newPeerConnection(target livekit.SignalTarget, conf *WebRTCConfig) (*webrtc if err != nil { return nil, nil, err } - se := conf.SettingEngine + se := params.Config.SettingEngine se.DisableMediaEngineCopy(true) + if params.Stats != nil && se.BufferFactory != nil { + wrapper := &StatsBufferWrapper{ + createBufferFunc: se.BufferFactory, + stats: params.Stats.incoming, + } + se.BufferFactory = wrapper.CreateBuffer + } - api := webrtc.NewAPI(webrtc.WithMediaEngine(me), webrtc.WithSettingEngine(se)) - pc, err := api.NewPeerConnection(conf.Configuration) + ir := &interceptor.Registry{} + if params.Stats != nil { + ir.Add(NewStatsInterceptor(params.Stats)) + } + api := webrtc.NewAPI( + webrtc.WithMediaEngine(me), + webrtc.WithSettingEngine(se), + webrtc.WithInterceptorRegistry(ir), + ) + pc, err := api.NewPeerConnection(params.Config.Configuration) return pc, me, err } -func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTransport, error) { - pc, me, err := newPeerConnection(target, conf) +func NewPCTransport(params TransportParams) (*PCTransport, error) { + pc, me, err := newPeerConnection(params) if err != nil { return nil, err } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index fe6dcc70d..c302d5347 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -233,7 +233,14 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, if pi.UsePlanB { rtcConf.Configuration.SDPSemantics = webrtc.SDPSemanticsPlanB } - participant, err = rtc.NewParticipant(pi.Identity, &rtcConf, responseSink, r.config.Audio, pv) + participant, err = rtc.NewParticipant(rtc.ParticipantParams{ + Identity: pi.Identity, + Config: &rtcConf, + Sink: responseSink, + AudioConfig: r.config.Audio, + ProtocolVersion: pv, + Stats: rtc.NewStatsReporter(roomName, pi.Identity), + }) if err != nil { logger.Errorw("could not create participant", "error", err) return @@ -277,6 +284,11 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { if err := r.DeleteRoom(roomName); err != nil { logger.Errorw("could not delete room", "error", err) } + // print stats + logger.Infow("room closed", + "incomingStats", room.GetIncomingStats(), + "outgoingStats", room.GetOutgoingStats(), + ) }) room.OnParticipantChanged(func(p types.Participant) { var err error @@ -303,7 +315,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici "participant", participant.Identity(), "room", room.Name, ) - participant.Close() + _ = participant.Close() }() defer rtc.Recover() @@ -356,9 +368,14 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted) case *livekit.SignalRequest_Subscription: if participant.CanSubscribe() { - room.UpdateSubscriptions(participant, msg.Subscription) + if err := room.UpdateSubscriptions(participant, msg.Subscription); err != nil { + logger.Warnw("could not update subscription", + "participant", participant.Identity(), + "tracks", msg.Subscription.TrackSids, + "subscribe", msg.Subscription.Subscribe) + } } else { - logger.Warnw("rejected participant subscription", + logger.Infow("rejected participant subscription", "participant", participant.Identity(), "tracks", msg.Subscription.TrackSids) } @@ -373,7 +390,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici } } case *livekit.SignalRequest_Leave: - participant.Close() + _ = participant.Close() } } } diff --git a/test/client/client.go b/test/client/client.go index 73ebe335f..ee85aa1f8 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -103,12 +103,18 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { conf := rtc.WebRTCConfig{ Configuration: rtcConf, } - c.publisher, err = rtc.NewPCTransport(livekit.SignalTarget_PUBLISHER, &conf) + c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{ + Target: livekit.SignalTarget_PUBLISHER, + Config: &conf, + }) if err != nil { return nil, err } // intentionally use publisher transport to have codecs pre-registered - c.subscriber, err = rtc.NewPCTransport(livekit.SignalTarget_PUBLISHER, &conf) + c.subscriber, err = rtc.NewPCTransport(rtc.TransportParams{ + Target: livekit.SignalTarget_PUBLISHER, + Config: &conf, + }) if err != nil { return nil, err }