From 608da4ba4bc1c1cdda5e02e3e1ae59d55e96aa7b Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 7 Mar 2022 14:59:01 +0800 Subject: [PATCH] stats for pli and bitrate (#487) * stats for pli and bitrate * solve comments * return 0 if no pli sent --- pkg/rtc/datatrack.go | 4 ++ pkg/rtc/mediatracksubscriptions.go | 11 ++++ pkg/rtc/participant.go | 11 ++++ pkg/rtc/types/interfaces.go | 2 + .../typesfakes/fake_local_media_track.go | 65 +++++++++++++++++++ .../typesfakes/fake_local_participant.go | 65 +++++++++++++++++++ pkg/rtc/types/typesfakes/fake_media_track.go | 65 +++++++++++++++++++ pkg/sfu/buffer/buffer.go | 5 ++ pkg/sfu/buffer/streamstats.go | 1 + pkg/sfu/downtrack.go | 40 +++++++++++- pkg/sfu/receiver.go | 17 ++++- 11 files changed, 282 insertions(+), 4 deletions(-) diff --git a/pkg/rtc/datatrack.go b/pkg/rtc/datatrack.go index 818ae2e1b..6646abc95 100644 --- a/pkg/rtc/datatrack.go +++ b/pkg/rtc/datatrack.go @@ -176,6 +176,10 @@ func (t *DataTrack) GetBitrateTemporalCumulative() sfu.Bitrates { func (t *DataTrack) SendPLI(layer int32) { } +func (t *DataTrack) LastPLI() int64 { + return 0 +} + func (t *DataTrack) SetUpTrackPaused(paused bool) { } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 3fd0dfde0..7f54e09d7 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -331,6 +331,17 @@ func (t *MediaTrackSubscriptions) RevokeDisallowedSubscribers(allowedSubscriberI return revokedSubscriberIDs } +func (t *MediaTrackSubscriptions) GetAllSubscribers() []livekit.ParticipantID { + t.subscribedTracksMu.RLock() + defer t.subscribedTracksMu.RUnlock() + + subs := make([]livekit.ParticipantID, 0, len(t.subscribedTracks)) + for id := range t.subscribedTracks { + subs = append(subs, id) + } + return subs +} + func (t *MediaTrackSubscriptions) UpdateVideoLayers() { for _, st := range t.getAllSubscribedTracks() { st.UpdateVideoLayer() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 99f4a21c1..ec9ebed20 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1650,3 +1650,14 @@ func (p *ParticipantImpl) handlePendingDataChannels() { } p.pendingDataChannels = nil } + +func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { + p.lock.RLock() + defer p.lock.RUnlock() + + tracks := make([]types.SubscribedTrack, 0, len(p.subscribedTracks)) + for _, t := range p.subscribedTracks { + tracks = append(tracks, t) + } + return tracks +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index f76dd894e..134fab23c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -121,6 +121,7 @@ type LocalParticipant interface { AddSubscribedTrack(st SubscribedTrack) RemoveSubscribedTrack(st SubscribedTrack) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error + GetSubscribedTracks() []SubscribedTrack // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []livekit.ParticipantID @@ -201,6 +202,7 @@ type MediaTrack interface { IsSubscriber(subID livekit.ParticipantID) bool RemoveAllSubscribers() RevokeDisallowedSubscribers(allowedSubscriberIDs []livekit.ParticipantID) []livekit.ParticipantID + GetAllSubscribers() []livekit.ParticipantID // returns quality information that's appropriate for width & height GetQualityForDimension(width, height uint32) livekit.VideoQuality diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 86d083a2c..0371852eb 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -26,6 +26,16 @@ type FakeLocalMediaTrack struct { addSubscriberReturnsOnCall map[int]struct { result1 error } + GetAllSubscribersStub func() []livekit.ParticipantID + getAllSubscribersMutex sync.RWMutex + getAllSubscribersArgsForCall []struct { + } + getAllSubscribersReturns struct { + result1 []livekit.ParticipantID + } + getAllSubscribersReturnsOnCall map[int]struct { + result1 []livekit.ParticipantID + } GetAudioLevelStub func() (uint8, bool) getAudioLevelMutex sync.RWMutex getAudioLevelArgsForCall []struct { @@ -336,6 +346,59 @@ func (fake *FakeLocalMediaTrack) AddSubscriberReturnsOnCall(i int, result1 error }{result1} } +func (fake *FakeLocalMediaTrack) GetAllSubscribers() []livekit.ParticipantID { + fake.getAllSubscribersMutex.Lock() + ret, specificReturn := fake.getAllSubscribersReturnsOnCall[len(fake.getAllSubscribersArgsForCall)] + fake.getAllSubscribersArgsForCall = append(fake.getAllSubscribersArgsForCall, struct { + }{}) + stub := fake.GetAllSubscribersStub + fakeReturns := fake.getAllSubscribersReturns + fake.recordInvocation("GetAllSubscribers", []interface{}{}) + fake.getAllSubscribersMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalMediaTrack) GetAllSubscribersCallCount() int { + fake.getAllSubscribersMutex.RLock() + defer fake.getAllSubscribersMutex.RUnlock() + return len(fake.getAllSubscribersArgsForCall) +} + +func (fake *FakeLocalMediaTrack) GetAllSubscribersCalls(stub func() []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = stub +} + +func (fake *FakeLocalMediaTrack) GetAllSubscribersReturns(result1 []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = nil + fake.getAllSubscribersReturns = struct { + result1 []livekit.ParticipantID + }{result1} +} + +func (fake *FakeLocalMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = nil + if fake.getAllSubscribersReturnsOnCall == nil { + fake.getAllSubscribersReturnsOnCall = make(map[int]struct { + result1 []livekit.ParticipantID + }) + } + fake.getAllSubscribersReturnsOnCall[i] = struct { + result1 []livekit.ParticipantID + }{result1} +} + func (fake *FakeLocalMediaTrack) GetAudioLevel() (uint8, bool) { fake.getAudioLevelMutex.Lock() ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] @@ -1501,6 +1564,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} { defer fake.addOnCloseMutex.RUnlock() fake.addSubscriberMutex.RLock() defer fake.addSubscriberMutex.RUnlock() + fake.getAllSubscribersMutex.RLock() + defer fake.getAllSubscribersMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() fake.getConnectionScoreMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index e25dba25d..1fc661037 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -204,6 +204,16 @@ type FakeLocalParticipant struct { getSubscribedParticipantsReturnsOnCall map[int]struct { result1 []livekit.ParticipantID } + GetSubscribedTracksStub func() []types.SubscribedTrack + getSubscribedTracksMutex sync.RWMutex + getSubscribedTracksArgsForCall []struct { + } + getSubscribedTracksReturns struct { + result1 []types.SubscribedTrack + } + getSubscribedTracksReturnsOnCall map[int]struct { + result1 []types.SubscribedTrack + } HandleAnswerStub func(webrtc.SessionDescription) error handleAnswerMutex sync.RWMutex handleAnswerArgsForCall []struct { @@ -1623,6 +1633,59 @@ func (fake *FakeLocalParticipant) GetSubscribedParticipantsReturnsOnCall(i int, }{result1} } +func (fake *FakeLocalParticipant) GetSubscribedTracks() []types.SubscribedTrack { + fake.getSubscribedTracksMutex.Lock() + ret, specificReturn := fake.getSubscribedTracksReturnsOnCall[len(fake.getSubscribedTracksArgsForCall)] + fake.getSubscribedTracksArgsForCall = append(fake.getSubscribedTracksArgsForCall, struct { + }{}) + stub := fake.GetSubscribedTracksStub + fakeReturns := fake.getSubscribedTracksReturns + fake.recordInvocation("GetSubscribedTracks", []interface{}{}) + fake.getSubscribedTracksMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetSubscribedTracksCallCount() int { + fake.getSubscribedTracksMutex.RLock() + defer fake.getSubscribedTracksMutex.RUnlock() + return len(fake.getSubscribedTracksArgsForCall) +} + +func (fake *FakeLocalParticipant) GetSubscribedTracksCalls(stub func() []types.SubscribedTrack) { + fake.getSubscribedTracksMutex.Lock() + defer fake.getSubscribedTracksMutex.Unlock() + fake.GetSubscribedTracksStub = stub +} + +func (fake *FakeLocalParticipant) GetSubscribedTracksReturns(result1 []types.SubscribedTrack) { + fake.getSubscribedTracksMutex.Lock() + defer fake.getSubscribedTracksMutex.Unlock() + fake.GetSubscribedTracksStub = nil + fake.getSubscribedTracksReturns = struct { + result1 []types.SubscribedTrack + }{result1} +} + +func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result1 []types.SubscribedTrack) { + fake.getSubscribedTracksMutex.Lock() + defer fake.getSubscribedTracksMutex.Unlock() + fake.GetSubscribedTracksStub = nil + if fake.getSubscribedTracksReturnsOnCall == nil { + fake.getSubscribedTracksReturnsOnCall = make(map[int]struct { + result1 []types.SubscribedTrack + }) + } + fake.getSubscribedTracksReturnsOnCall[i] = struct { + result1 []types.SubscribedTrack + }{result1} +} + func (fake *FakeLocalParticipant) HandleAnswer(arg1 webrtc.SessionDescription) error { fake.handleAnswerMutex.Lock() ret, specificReturn := fake.handleAnswerReturnsOnCall[len(fake.handleAnswerArgsForCall)] @@ -3933,6 +3996,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getResponseSinkMutex.RUnlock() fake.getSubscribedParticipantsMutex.RLock() defer fake.getSubscribedParticipantsMutex.RUnlock() + fake.getSubscribedTracksMutex.RLock() + defer fake.getSubscribedTracksMutex.RUnlock() fake.handleAnswerMutex.RLock() defer fake.handleAnswerMutex.RUnlock() fake.handleOfferMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index 5172162fd..64583dc07 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -26,6 +26,16 @@ type FakeMediaTrack struct { addSubscriberReturnsOnCall map[int]struct { result1 error } + GetAllSubscribersStub func() []livekit.ParticipantID + getAllSubscribersMutex sync.RWMutex + getAllSubscribersArgsForCall []struct { + } + getAllSubscribersReturns struct { + result1 []livekit.ParticipantID + } + getAllSubscribersReturnsOnCall map[int]struct { + result1 []livekit.ParticipantID + } GetQualityForDimensionStub func(uint32, uint32) livekit.VideoQuality getQualityForDimensionMutex sync.RWMutex getQualityForDimensionArgsForCall []struct { @@ -289,6 +299,59 @@ func (fake *FakeMediaTrack) AddSubscriberReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeMediaTrack) GetAllSubscribers() []livekit.ParticipantID { + fake.getAllSubscribersMutex.Lock() + ret, specificReturn := fake.getAllSubscribersReturnsOnCall[len(fake.getAllSubscribersArgsForCall)] + fake.getAllSubscribersArgsForCall = append(fake.getAllSubscribersArgsForCall, struct { + }{}) + stub := fake.GetAllSubscribersStub + fakeReturns := fake.getAllSubscribersReturns + fake.recordInvocation("GetAllSubscribers", []interface{}{}) + fake.getAllSubscribersMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeMediaTrack) GetAllSubscribersCallCount() int { + fake.getAllSubscribersMutex.RLock() + defer fake.getAllSubscribersMutex.RUnlock() + return len(fake.getAllSubscribersArgsForCall) +} + +func (fake *FakeMediaTrack) GetAllSubscribersCalls(stub func() []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = stub +} + +func (fake *FakeMediaTrack) GetAllSubscribersReturns(result1 []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = nil + fake.getAllSubscribersReturns = struct { + result1 []livekit.ParticipantID + }{result1} +} + +func (fake *FakeMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []livekit.ParticipantID) { + fake.getAllSubscribersMutex.Lock() + defer fake.getAllSubscribersMutex.Unlock() + fake.GetAllSubscribersStub = nil + if fake.getAllSubscribersReturnsOnCall == nil { + fake.getAllSubscribersReturnsOnCall = make(map[int]struct { + result1 []livekit.ParticipantID + }) + } + fake.getAllSubscribersReturnsOnCall[i] = struct { + result1 []livekit.ParticipantID + }{result1} +} + func (fake *FakeMediaTrack) GetQualityForDimension(arg1 uint32, arg2 uint32) livekit.VideoQuality { fake.getQualityForDimensionMutex.Lock() ret, specificReturn := fake.getQualityForDimensionReturnsOnCall[len(fake.getQualityForDimensionArgsForCall)] @@ -1207,6 +1270,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.addOnCloseMutex.RUnlock() fake.addSubscriberMutex.RLock() defer fake.addSubscriberMutex.RUnlock() + fake.getAllSubscribersMutex.RLock() + defer fake.getAllSubscribersMutex.RUnlock() fake.getQualityForDimensionMutex.RLock() defer fake.getQualityForDimensionMutex.RUnlock() fake.iDMutex.RLock() diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 9af90a916..3f254ef81 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -321,6 +321,10 @@ func (b *Buffer) SetRTT(rtt uint32) { } } +func (b *Buffer) LastPLI() int64 { + return b.lastPli +} + func (b *Buffer) calc(pkt []byte, arrivalTime int64) { isRTX := false @@ -638,6 +642,7 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { packetsLost: b.stats.TotalPacketsLost, lastLossRate: lossRate, } + b.stats.LostRate = lossRate return &rtcp.ReceptionReport{ SSRC: b.mediaSSRC, diff --git a/pkg/sfu/buffer/streamstats.go b/pkg/sfu/buffer/streamstats.go index ca717d11b..6b3126243 100644 --- a/pkg/sfu/buffer/streamstats.go +++ b/pkg/sfu/buffer/streamstats.go @@ -20,6 +20,7 @@ type StreamStats struct { TotalNACKs uint32 TotalPLIs uint32 TotalFIRs uint32 + LostRate float32 } type StreamStatsWithLayers struct { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 2a516e5cb..c3746e2d7 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -104,6 +104,10 @@ type DownTrack struct { connectionStats *connectionquality.ConnectionStats + bitrateHelper uint64 + bitrate uint64 + lastBitrateReport time.Time + // Debug info lastPli atomic.Time lastRTP atomic.Time @@ -181,7 +185,7 @@ func NewDownTrack( d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ CodecType: kind, ClockRate: c.ClockRate, - GetTrackStats: d.getTrackStats, + GetTrackStats: d.GetTrackStats, GetIsReducedQuality: func() bool { return d.GetForwardingStatus() != ForwardingStatusOptimal }, @@ -229,6 +233,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.callbacksQueue.Enqueue(d.onBind) } d.bound.Store(true) + d.lastBitrateReport = time.Now() go d.requestFirstKeyframe() @@ -377,6 +382,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if extPkt.KeyFrame { d.isNACKThrottled.Store(false) } + d.updateBitrate() } else { d.logger.Errorw("writing rtp packet err", err) d.pktsDropped.Inc() @@ -467,6 +473,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { for _, f := range d.onPaddingSentUnsafe { f(d, size) } + d.updateBitrate() // // Register with sequencer with invalid layer so that NACKs for these can be filtered out. @@ -483,6 +490,26 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { return bytesSent } +func (d *DownTrack) updateBitrate() { + lastRtp := d.lastRTP.Load() + d.statsLock.RLock() + timeDiff := lastRtp.Sub(d.lastBitrateReport).Seconds() + d.statsLock.RUnlock() + if timeDiff < 1 { + return + } + octets, _ := d.getSRStats() + d.statsLock.Lock() + d.bitrate = uint64(float64(octets*8-d.bitrateHelper) / timeDiff) + d.bitrateHelper = octets * 8 + d.lastBitrateReport = lastRtp + d.statsLock.Unlock() +} + +func (d *DownTrack) Bitrate() uint64 { + return d.bitrate +} + // Mute enables or disables media forwarding func (d *DownTrack) Mute(muted bool) { changed, maxLayers := d.forwarder.Mute(muted) @@ -985,6 +1012,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { d.stats.RTT = rtt d.stats.Jitter = float64(r.Jitter) + d.stats.LostRate = float32(r.FractionLost) / 256 d.statsLock.Unlock() d.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, rtt, r.Jitter) @@ -1251,7 +1279,7 @@ func (d *DownTrack) GetConnectionScore() float32 { return d.connectionStats.GetScore() } -func (d *DownTrack) getTrackStats() map[uint32]*buffer.StreamStatsWithLayers { +func (d *DownTrack) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers { d.statsLock.RLock() defer d.statsLock.RUnlock() @@ -1280,3 +1308,11 @@ func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint totalRepeatedNACKs = d.totalRepeatedNACKs return } + +func (d *DownTrack) LastPLI() int64 { + t := d.lastPli.Load() + if t.IsZero() { + return 0 + } + return t.UnixNano() +} diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0dfdf7eaa..f7bca9070 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -39,6 +39,7 @@ type TrackReceiver interface { GetBitrateTemporalCumulative() Bitrates SendPLI(layer int32) + LastPLI() int64 SetUpTrackPaused(paused bool) SetMaxExpectedSpatialLayer(layer int32) @@ -169,7 +170,7 @@ func NewWebRTCReceiver( w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ CodecType: w.kind, ClockRate: w.codec.ClockRate, - GetTrackStats: w.getTrackStats, + GetTrackStats: w.GetTrackStats, GetIsReducedQuality: func() bool { return w.streamTrackerManager.IsReducedQuality() }, @@ -402,6 +403,18 @@ func (w *WebRTCReceiver) SendPLI(layer int32) { buff.SendPLI() } +func (w *WebRTCReceiver) LastPLI() int64 { + var lastPLI int64 + w.bufferMu.RLock() + for _, b := range w.buffers { + if b != nil && b.LastPLI() > lastPLI { + lastPLI = b.LastPLI() + } + } + w.bufferMu.RUnlock() + return lastPLI +} + func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) { w.rtcpCh = ch } @@ -422,7 +435,7 @@ func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error return buff.GetPacket(buf, sn) } -func (w *WebRTCReceiver) getTrackStats() map[uint32]*buffer.StreamStatsWithLayers { +func (w *WebRTCReceiver) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers { w.bufferMu.RLock() defer w.bufferMu.RUnlock()