stats for pli and bitrate (#487)

* stats for pli and bitrate

* solve comments

* return 0 if no pli sent
This commit is contained in:
cnderrauber
2022-03-07 14:59:01 +08:00
committed by GitHub
parent e5ffd38054
commit 608da4ba4b
11 changed files with 282 additions and 4 deletions
+4
View File
@@ -176,6 +176,10 @@ func (t *DataTrack) GetBitrateTemporalCumulative() sfu.Bitrates {
func (t *DataTrack) SendPLI(layer int32) {
}
func (t *DataTrack) LastPLI() int64 {
return 0
}
func (t *DataTrack) SetUpTrackPaused(paused bool) {
}
+11
View File
@@ -331,6 +331,17 @@ func (t *MediaTrackSubscriptions) RevokeDisallowedSubscribers(allowedSubscriberI
return revokedSubscriberIDs
}
func (t *MediaTrackSubscriptions) GetAllSubscribers() []livekit.ParticipantID {
t.subscribedTracksMu.RLock()
defer t.subscribedTracksMu.RUnlock()
subs := make([]livekit.ParticipantID, 0, len(t.subscribedTracks))
for id := range t.subscribedTracks {
subs = append(subs, id)
}
return subs
}
func (t *MediaTrackSubscriptions) UpdateVideoLayers() {
for _, st := range t.getAllSubscribedTracks() {
st.UpdateVideoLayer()
+11
View File
@@ -1650,3 +1650,14 @@ func (p *ParticipantImpl) handlePendingDataChannels() {
}
p.pendingDataChannels = nil
}
func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {
p.lock.RLock()
defer p.lock.RUnlock()
tracks := make([]types.SubscribedTrack, 0, len(p.subscribedTracks))
for _, t := range p.subscribedTracks {
tracks = append(tracks, t)
}
return tracks
}
+2
View File
@@ -121,6 +121,7 @@ type LocalParticipant interface {
AddSubscribedTrack(st SubscribedTrack)
RemoveSubscribedTrack(st SubscribedTrack)
UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error
GetSubscribedTracks() []SubscribedTrack
// returns list of participant identities that the current participant is subscribed to
GetSubscribedParticipants() []livekit.ParticipantID
@@ -201,6 +202,7 @@ type MediaTrack interface {
IsSubscriber(subID livekit.ParticipantID) bool
RemoveAllSubscribers()
RevokeDisallowedSubscribers(allowedSubscriberIDs []livekit.ParticipantID) []livekit.ParticipantID
GetAllSubscribers() []livekit.ParticipantID
// returns quality information that's appropriate for width & height
GetQualityForDimension(width, height uint32) livekit.VideoQuality
@@ -26,6 +26,16 @@ type FakeLocalMediaTrack struct {
addSubscriberReturnsOnCall map[int]struct {
result1 error
}
GetAllSubscribersStub func() []livekit.ParticipantID
getAllSubscribersMutex sync.RWMutex
getAllSubscribersArgsForCall []struct {
}
getAllSubscribersReturns struct {
result1 []livekit.ParticipantID
}
getAllSubscribersReturnsOnCall map[int]struct {
result1 []livekit.ParticipantID
}
GetAudioLevelStub func() (uint8, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
@@ -336,6 +346,59 @@ func (fake *FakeLocalMediaTrack) AddSubscriberReturnsOnCall(i int, result1 error
}{result1}
}
func (fake *FakeLocalMediaTrack) GetAllSubscribers() []livekit.ParticipantID {
fake.getAllSubscribersMutex.Lock()
ret, specificReturn := fake.getAllSubscribersReturnsOnCall[len(fake.getAllSubscribersArgsForCall)]
fake.getAllSubscribersArgsForCall = append(fake.getAllSubscribersArgsForCall, struct {
}{})
stub := fake.GetAllSubscribersStub
fakeReturns := fake.getAllSubscribersReturns
fake.recordInvocation("GetAllSubscribers", []interface{}{})
fake.getAllSubscribersMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalMediaTrack) GetAllSubscribersCallCount() int {
fake.getAllSubscribersMutex.RLock()
defer fake.getAllSubscribersMutex.RUnlock()
return len(fake.getAllSubscribersArgsForCall)
}
func (fake *FakeLocalMediaTrack) GetAllSubscribersCalls(stub func() []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = stub
}
func (fake *FakeLocalMediaTrack) GetAllSubscribersReturns(result1 []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = nil
fake.getAllSubscribersReturns = struct {
result1 []livekit.ParticipantID
}{result1}
}
func (fake *FakeLocalMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = nil
if fake.getAllSubscribersReturnsOnCall == nil {
fake.getAllSubscribersReturnsOnCall = make(map[int]struct {
result1 []livekit.ParticipantID
})
}
fake.getAllSubscribersReturnsOnCall[i] = struct {
result1 []livekit.ParticipantID
}{result1}
}
func (fake *FakeLocalMediaTrack) GetAudioLevel() (uint8, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
@@ -1501,6 +1564,8 @@ func (fake *FakeLocalMediaTrack) Invocations() map[string][][]interface{} {
defer fake.addOnCloseMutex.RUnlock()
fake.addSubscriberMutex.RLock()
defer fake.addSubscriberMutex.RUnlock()
fake.getAllSubscribersMutex.RLock()
defer fake.getAllSubscribersMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getConnectionScoreMutex.RLock()
@@ -204,6 +204,16 @@ type FakeLocalParticipant struct {
getSubscribedParticipantsReturnsOnCall map[int]struct {
result1 []livekit.ParticipantID
}
GetSubscribedTracksStub func() []types.SubscribedTrack
getSubscribedTracksMutex sync.RWMutex
getSubscribedTracksArgsForCall []struct {
}
getSubscribedTracksReturns struct {
result1 []types.SubscribedTrack
}
getSubscribedTracksReturnsOnCall map[int]struct {
result1 []types.SubscribedTrack
}
HandleAnswerStub func(webrtc.SessionDescription) error
handleAnswerMutex sync.RWMutex
handleAnswerArgsForCall []struct {
@@ -1623,6 +1633,59 @@ func (fake *FakeLocalParticipant) GetSubscribedParticipantsReturnsOnCall(i int,
}{result1}
}
func (fake *FakeLocalParticipant) GetSubscribedTracks() []types.SubscribedTrack {
fake.getSubscribedTracksMutex.Lock()
ret, specificReturn := fake.getSubscribedTracksReturnsOnCall[len(fake.getSubscribedTracksArgsForCall)]
fake.getSubscribedTracksArgsForCall = append(fake.getSubscribedTracksArgsForCall, struct {
}{})
stub := fake.GetSubscribedTracksStub
fakeReturns := fake.getSubscribedTracksReturns
fake.recordInvocation("GetSubscribedTracks", []interface{}{})
fake.getSubscribedTracksMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetSubscribedTracksCallCount() int {
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
return len(fake.getSubscribedTracksArgsForCall)
}
func (fake *FakeLocalParticipant) GetSubscribedTracksCalls(stub func() []types.SubscribedTrack) {
fake.getSubscribedTracksMutex.Lock()
defer fake.getSubscribedTracksMutex.Unlock()
fake.GetSubscribedTracksStub = stub
}
func (fake *FakeLocalParticipant) GetSubscribedTracksReturns(result1 []types.SubscribedTrack) {
fake.getSubscribedTracksMutex.Lock()
defer fake.getSubscribedTracksMutex.Unlock()
fake.GetSubscribedTracksStub = nil
fake.getSubscribedTracksReturns = struct {
result1 []types.SubscribedTrack
}{result1}
}
func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result1 []types.SubscribedTrack) {
fake.getSubscribedTracksMutex.Lock()
defer fake.getSubscribedTracksMutex.Unlock()
fake.GetSubscribedTracksStub = nil
if fake.getSubscribedTracksReturnsOnCall == nil {
fake.getSubscribedTracksReturnsOnCall = make(map[int]struct {
result1 []types.SubscribedTrack
})
}
fake.getSubscribedTracksReturnsOnCall[i] = struct {
result1 []types.SubscribedTrack
}{result1}
}
func (fake *FakeLocalParticipant) HandleAnswer(arg1 webrtc.SessionDescription) error {
fake.handleAnswerMutex.Lock()
ret, specificReturn := fake.handleAnswerReturnsOnCall[len(fake.handleAnswerArgsForCall)]
@@ -3933,6 +3996,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getResponseSinkMutex.RUnlock()
fake.getSubscribedParticipantsMutex.RLock()
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
fake.handleAnswerMutex.RLock()
defer fake.handleAnswerMutex.RUnlock()
fake.handleOfferMutex.RLock()
@@ -26,6 +26,16 @@ type FakeMediaTrack struct {
addSubscriberReturnsOnCall map[int]struct {
result1 error
}
GetAllSubscribersStub func() []livekit.ParticipantID
getAllSubscribersMutex sync.RWMutex
getAllSubscribersArgsForCall []struct {
}
getAllSubscribersReturns struct {
result1 []livekit.ParticipantID
}
getAllSubscribersReturnsOnCall map[int]struct {
result1 []livekit.ParticipantID
}
GetQualityForDimensionStub func(uint32, uint32) livekit.VideoQuality
getQualityForDimensionMutex sync.RWMutex
getQualityForDimensionArgsForCall []struct {
@@ -289,6 +299,59 @@ func (fake *FakeMediaTrack) AddSubscriberReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeMediaTrack) GetAllSubscribers() []livekit.ParticipantID {
fake.getAllSubscribersMutex.Lock()
ret, specificReturn := fake.getAllSubscribersReturnsOnCall[len(fake.getAllSubscribersArgsForCall)]
fake.getAllSubscribersArgsForCall = append(fake.getAllSubscribersArgsForCall, struct {
}{})
stub := fake.GetAllSubscribersStub
fakeReturns := fake.getAllSubscribersReturns
fake.recordInvocation("GetAllSubscribers", []interface{}{})
fake.getAllSubscribersMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMediaTrack) GetAllSubscribersCallCount() int {
fake.getAllSubscribersMutex.RLock()
defer fake.getAllSubscribersMutex.RUnlock()
return len(fake.getAllSubscribersArgsForCall)
}
func (fake *FakeMediaTrack) GetAllSubscribersCalls(stub func() []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = stub
}
func (fake *FakeMediaTrack) GetAllSubscribersReturns(result1 []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = nil
fake.getAllSubscribersReturns = struct {
result1 []livekit.ParticipantID
}{result1}
}
func (fake *FakeMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []livekit.ParticipantID) {
fake.getAllSubscribersMutex.Lock()
defer fake.getAllSubscribersMutex.Unlock()
fake.GetAllSubscribersStub = nil
if fake.getAllSubscribersReturnsOnCall == nil {
fake.getAllSubscribersReturnsOnCall = make(map[int]struct {
result1 []livekit.ParticipantID
})
}
fake.getAllSubscribersReturnsOnCall[i] = struct {
result1 []livekit.ParticipantID
}{result1}
}
func (fake *FakeMediaTrack) GetQualityForDimension(arg1 uint32, arg2 uint32) livekit.VideoQuality {
fake.getQualityForDimensionMutex.Lock()
ret, specificReturn := fake.getQualityForDimensionReturnsOnCall[len(fake.getQualityForDimensionArgsForCall)]
@@ -1207,6 +1270,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.addOnCloseMutex.RUnlock()
fake.addSubscriberMutex.RLock()
defer fake.addSubscriberMutex.RUnlock()
fake.getAllSubscribersMutex.RLock()
defer fake.getAllSubscribersMutex.RUnlock()
fake.getQualityForDimensionMutex.RLock()
defer fake.getQualityForDimensionMutex.RUnlock()
fake.iDMutex.RLock()
+5
View File
@@ -321,6 +321,10 @@ func (b *Buffer) SetRTT(rtt uint32) {
}
}
func (b *Buffer) LastPLI() int64 {
return b.lastPli
}
func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
isRTX := false
@@ -638,6 +642,7 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
packetsLost: b.stats.TotalPacketsLost,
lastLossRate: lossRate,
}
b.stats.LostRate = lossRate
return &rtcp.ReceptionReport{
SSRC: b.mediaSSRC,
+1
View File
@@ -20,6 +20,7 @@ type StreamStats struct {
TotalNACKs uint32
TotalPLIs uint32
TotalFIRs uint32
LostRate float32
}
type StreamStatsWithLayers struct {
+38 -2
View File
@@ -104,6 +104,10 @@ type DownTrack struct {
connectionStats *connectionquality.ConnectionStats
bitrateHelper uint64
bitrate uint64
lastBitrateReport time.Time
// Debug info
lastPli atomic.Time
lastRTP atomic.Time
@@ -181,7 +185,7 @@ func NewDownTrack(
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: kind,
ClockRate: c.ClockRate,
GetTrackStats: d.getTrackStats,
GetTrackStats: d.GetTrackStats,
GetIsReducedQuality: func() bool {
return d.GetForwardingStatus() != ForwardingStatusOptimal
},
@@ -229,6 +233,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.callbacksQueue.Enqueue(d.onBind)
}
d.bound.Store(true)
d.lastBitrateReport = time.Now()
go d.requestFirstKeyframe()
@@ -377,6 +382,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
if extPkt.KeyFrame {
d.isNACKThrottled.Store(false)
}
d.updateBitrate()
} else {
d.logger.Errorw("writing rtp packet err", err)
d.pktsDropped.Inc()
@@ -467,6 +473,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
for _, f := range d.onPaddingSentUnsafe {
f(d, size)
}
d.updateBitrate()
//
// Register with sequencer with invalid layer so that NACKs for these can be filtered out.
@@ -483,6 +490,26 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
return bytesSent
}
func (d *DownTrack) updateBitrate() {
lastRtp := d.lastRTP.Load()
d.statsLock.RLock()
timeDiff := lastRtp.Sub(d.lastBitrateReport).Seconds()
d.statsLock.RUnlock()
if timeDiff < 1 {
return
}
octets, _ := d.getSRStats()
d.statsLock.Lock()
d.bitrate = uint64(float64(octets*8-d.bitrateHelper) / timeDiff)
d.bitrateHelper = octets * 8
d.lastBitrateReport = lastRtp
d.statsLock.Unlock()
}
func (d *DownTrack) Bitrate() uint64 {
return d.bitrate
}
// Mute enables or disables media forwarding
func (d *DownTrack) Mute(muted bool) {
changed, maxLayers := d.forwarder.Mute(muted)
@@ -985,6 +1012,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
d.stats.RTT = rtt
d.stats.Jitter = float64(r.Jitter)
d.stats.LostRate = float32(r.FractionLost) / 256
d.statsLock.Unlock()
d.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, rtt, r.Jitter)
@@ -1251,7 +1279,7 @@ func (d *DownTrack) GetConnectionScore() float32 {
return d.connectionStats.GetScore()
}
func (d *DownTrack) getTrackStats() map[uint32]*buffer.StreamStatsWithLayers {
func (d *DownTrack) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
@@ -1280,3 +1308,11 @@ func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint
totalRepeatedNACKs = d.totalRepeatedNACKs
return
}
func (d *DownTrack) LastPLI() int64 {
t := d.lastPli.Load()
if t.IsZero() {
return 0
}
return t.UnixNano()
}
+15 -2
View File
@@ -39,6 +39,7 @@ type TrackReceiver interface {
GetBitrateTemporalCumulative() Bitrates
SendPLI(layer int32)
LastPLI() int64
SetUpTrackPaused(paused bool)
SetMaxExpectedSpatialLayer(layer int32)
@@ -169,7 +170,7 @@ func NewWebRTCReceiver(
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: w.kind,
ClockRate: w.codec.ClockRate,
GetTrackStats: w.getTrackStats,
GetTrackStats: w.GetTrackStats,
GetIsReducedQuality: func() bool {
return w.streamTrackerManager.IsReducedQuality()
},
@@ -402,6 +403,18 @@ func (w *WebRTCReceiver) SendPLI(layer int32) {
buff.SendPLI()
}
func (w *WebRTCReceiver) LastPLI() int64 {
var lastPLI int64
w.bufferMu.RLock()
for _, b := range w.buffers {
if b != nil && b.LastPLI() > lastPLI {
lastPLI = b.LastPLI()
}
}
w.bufferMu.RUnlock()
return lastPLI
}
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) {
w.rtcpCh = ch
}
@@ -422,7 +435,7 @@ func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error
return buff.GetPacket(buf, sn)
}
func (w *WebRTCReceiver) getTrackStats() map[uint32]*buffer.StreamStatsWithLayers {
func (w *WebRTCReceiver) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()