Remove unused traffic load (#2734)

This commit is contained in:
Raja Subramanian
2024-05-27 10:53:23 +05:30
committed by GitHub
parent 721c36fbc0
commit 991ad5a0ea
5 changed files with 0 additions and 503 deletions
-16
View File
@@ -140,7 +140,6 @@ type ParticipantParams struct {
SubscriptionLimitVideo int32
PlayoutDelay *livekit.PlayoutDelay
SyncStreams bool
EnableTrafficLoadTracking bool
ForwardStats *sfu.ForwardStats
}
@@ -187,7 +186,6 @@ type ParticipantImpl struct {
*TransportManager
*UpTrackManager
*SubscriptionManager
*ParticipantTrafficLoad
// keeps track of unpublished tracks in order to reuse trackID
unpublishedTracks []*livekit.TrackInfo
@@ -297,7 +295,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.setupUpTrackManager()
p.setupSubscriptionManager()
p.setupParticipantTrafficLoad()
return p, nil
}
@@ -865,9 +862,6 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
go func() {
p.SubscriptionManager.Close(isExpectedToResume)
p.TransportManager.Close()
if p.ParticipantTrafficLoad != nil {
p.ParticipantTrafficLoad.Close()
}
}()
p.dataChannelStats.Stop()
@@ -1373,16 +1367,6 @@ func (p *ParticipantImpl) setupSubscriptionManager() {
})
}
func (p *ParticipantImpl) setupParticipantTrafficLoad() {
if p.params.EnableTrafficLoadTracking {
p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{
Participant: p,
DataChannelStats: p.dataChannelStats,
Logger: p.params.Logger,
})
}
}
func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
oldState := p.state.Swap(state).(livekit.ParticipantInfo_State)
if oldState == state {
-219
View File
@@ -1,219 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry"
)
const (
reportInterval = 10 * time.Second
)
type ParticipantTrafficLoadParams struct {
Participant *ParticipantImpl
DataChannelStats *telemetry.BytesTrackStats
Logger logger.Logger
}
type ParticipantTrafficLoad struct {
params ParticipantTrafficLoadParams
lock sync.RWMutex
onTrafficLoad func(trafficLoad *types.TrafficLoad)
tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats
dataChannelTraffic *telemetry.TrafficTotals
trafficLoad *types.TrafficLoad
closed core.Fuse
}
func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad {
p := &ParticipantTrafficLoad{
params: params,
tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats),
}
go p.reporter()
return p
}
func (p *ParticipantTrafficLoad) Close() {
p.closed.Break()
}
func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) {
if p == nil {
return
}
p.lock.Lock()
p.onTrafficLoad = f
p.lock.Unlock()
}
func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onTrafficLoad
}
func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad {
if p == nil {
return nil
}
p.lock.RLock()
defer p.lock.RUnlock()
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad {
publishedTracks := p.params.Participant.GetPublishedTracks()
subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks()
availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks))
upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks))
upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks))
downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks))
p.lock.Lock()
defer p.lock.Unlock()
for _, pt := range publishedTracks {
lmt, ok := pt.(types.LocalMediaTrack)
if !ok {
continue
}
trackID := lmt.ID()
stats := lmt.GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch lmt.Kind() {
case livekit.TrackType_AUDIO:
upstreamAudioStats = append(upstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
upstreamVideoStats = append(upstreamVideoStats, trafficStats)
}
}
}
for _, st := range subscribedTracks {
trackID := st.ID()
stats := st.DownTrack().GetTrackStats()
trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats)
if stats != nil {
p.tracksStatsMedia[trackID] = stats
availableTracks[trackID] = true
}
if trafficStats != nil {
switch st.MediaTrack().Kind() {
case livekit.TrackType_AUDIO:
downstreamAudioStats = append(downstreamAudioStats, trafficStats)
case livekit.TrackType_VIDEO:
downstreamVideoStats = append(downstreamVideoStats, trafficStats)
}
}
}
// remove unavailable tracks from track stats cache
for trackID := range p.tracksStatsMedia {
if !availableTracks[trackID] {
delete(p.tracksStatsMedia, trackID)
}
}
trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6)
addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) {
agg := types.AggregateTrafficStats(statsList...)
if agg != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: trackType,
StreamType: streamType,
TrafficStats: agg,
})
}
}
addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM)
addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM)
addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM)
if p.params.DataChannelStats != nil {
dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals()
if p.dataChannelTraffic != nil {
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_UPSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages,
Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes,
},
})
trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{
TrackType: livekit.TrackType_DATA,
StreamType: livekit.StreamType_DOWNSTREAM,
TrafficStats: &types.TrafficStats{
StartTime: p.dataChannelTraffic.At,
EndTime: dataChannelTraffic.At,
Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages,
Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes,
},
})
}
p.dataChannelTraffic = dataChannelTraffic
}
p.trafficLoad = &types.TrafficLoad{
TrafficTypeStats: trafficTypeStats,
}
return p.trafficLoad
}
func (p *ParticipantTrafficLoad) reporter() {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-p.closed.Watch():
return
case <-ticker.C:
trafficLoad := p.updateTrafficLoad()
if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil {
onTrafficLoad(trafficLoad)
}
}
}
}
-3
View File
@@ -394,7 +394,6 @@ type LocalParticipant interface {
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnTrafficLoad(callback func(trafficLoad *TrafficLoad))
HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
@@ -423,8 +422,6 @@ type LocalParticipant interface {
SetSubscriberChannelCapacity(channelCapacity int64)
GetPacer() pacer.Pacer
GetTrafficLoad() *TrafficLoad
}
// Room is a container of participants, and can provide room-level actions
-161
View File
@@ -1,161 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"time"
"github.com/livekit/protocol/livekit"
)
type TrafficStats struct {
StartTime time.Time
EndTime time.Time
Packets uint32
PacketsLost uint32
PacketsPadding uint32
PacketsOutOfOrder uint32
Bytes uint64
}
type TrafficTypeStats struct {
TrackType livekit.TrackType
StreamType livekit.StreamType
TrafficStats *TrafficStats
}
type TrafficLoad struct {
TrafficTypeStats []*TrafficTypeStats
}
func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats {
if after == nil {
return nil
}
startTime := after.StartTime
if before != nil {
startTime = before.EndTime
}
getAfter := func() *TrafficStats {
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets,
PacketsLost: after.PacketsLost,
PacketsPadding: after.PacketsPadding,
PacketsOutOfOrder: after.PacketsOutOfOrder,
Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding,
}
}
if before == nil {
return getAfter()
}
if (after.Packets - before.Packets) > (1 << 31) {
// after packets < before packets, probably got reset, just return after
return getAfter()
}
if ((after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding)) > (1 << 63) {
// after bytes < before bytes, probably got reset, just return after
return getAfter()
}
packetsLost := uint32(0)
if after.PacketsLost >= before.PacketsLost {
packetsLost = after.PacketsLost - before.PacketsLost
}
return &TrafficStats{
StartTime: startTime.AsTime(),
EndTime: after.EndTime.AsTime(),
Packets: after.Packets - before.Packets,
PacketsLost: packetsLost,
PacketsPadding: after.PacketsPadding - before.PacketsPadding,
PacketsOutOfOrder: after.PacketsOutOfOrder - before.PacketsOutOfOrder,
Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding),
}
}
func AggregateTrafficStats(statsList ...*TrafficStats) *TrafficStats {
if len(statsList) == 0 {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
packets := uint32(0)
packetsLost := uint32(0)
packetsPadding := uint32(0)
packetsOutOfOrder := uint32(0)
bytes := uint64(0)
for _, stats := range statsList {
if startTime.IsZero() || startTime.After(stats.StartTime) {
startTime = stats.StartTime
}
if endTime.IsZero() || endTime.Before(stats.EndTime) {
endTime = stats.EndTime
}
packets += stats.Packets
packetsLost += stats.PacketsLost
packetsPadding += stats.PacketsPadding
packetsOutOfOrder += stats.PacketsOutOfOrder
bytes += stats.Bytes
}
if endTime.IsZero() {
endTime = time.Now()
}
return &TrafficStats{
StartTime: startTime,
EndTime: endTime,
Packets: packets,
PacketsLost: packetsLost,
PacketsPadding: packetsPadding,
PacketsOutOfOrder: packetsOutOfOrder,
Bytes: bytes,
}
}
func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) (
packetRateIn float64,
byteRateIn float64,
packetRateOut float64,
byteRateOut float64,
) {
if trafficLoad == nil {
return
}
for _, trafficTypeStat := range trafficLoad.TrafficTypeStats {
elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds()
packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed
byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed
switch trafficTypeStat.StreamType {
case livekit.StreamType_UPSTREAM:
packetRateIn += packetRate
byteRateIn += byteRate
case livekit.StreamType_DOWNSTREAM:
packetRateOut += packetRate
byteRateOut += byteRate
}
}
return
}
@@ -345,16 +345,6 @@ type FakeLocalParticipant struct {
getSubscribedTracksReturnsOnCall map[int]struct {
result1 []types.SubscribedTrack
}
GetTrafficLoadStub func() *types.TrafficLoad
getTrafficLoadMutex sync.RWMutex
getTrafficLoadArgsForCall []struct {
}
getTrafficLoadReturns struct {
result1 *types.TrafficLoad
}
getTrafficLoadReturnsOnCall map[int]struct {
result1 *types.TrafficLoad
}
GetTrailerStub func() []byte
getTrailerMutex sync.RWMutex
getTrailerArgsForCall []struct {
@@ -636,11 +626,6 @@ type FakeLocalParticipant struct {
onTrackUpdatedArgsForCall []struct {
arg1 func(types.LocalParticipant, types.MediaTrack)
}
OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad))
onTrafficLoadMutex sync.RWMutex
onTrafficLoadArgsForCall []struct {
arg1 func(trafficLoad *types.TrafficLoad)
}
ProtocolVersionStub func() types.ProtocolVersion
protocolVersionMutex sync.RWMutex
protocolVersionArgsForCall []struct {
@@ -2722,59 +2707,6 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad {
fake.getTrafficLoadMutex.Lock()
ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)]
fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct {
}{})
stub := fake.GetTrafficLoadStub
fakeReturns := fake.getTrafficLoadReturns
fake.recordInvocation("GetTrafficLoad", []interface{}{})
fake.getTrafficLoadMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int {
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
return len(fake.getTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
fake.getTrafficLoadReturns = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) {
fake.getTrafficLoadMutex.Lock()
defer fake.getTrafficLoadMutex.Unlock()
fake.GetTrafficLoadStub = nil
if fake.getTrafficLoadReturnsOnCall == nil {
fake.getTrafficLoadReturnsOnCall = make(map[int]struct {
result1 *types.TrafficLoad
})
}
fake.getTrafficLoadReturnsOnCall[i] = struct {
result1 *types.TrafficLoad
}{result1}
}
func (fake *FakeLocalParticipant) GetTrailer() []byte {
fake.getTrailerMutex.Lock()
ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)]
@@ -4357,38 +4289,6 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) {
fake.onTrafficLoadMutex.Lock()
fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct {
arg1 func(trafficLoad *types.TrafficLoad)
}{arg1})
stub := fake.OnTrafficLoadStub
fake.recordInvocation("OnTrafficLoad", []interface{}{arg1})
fake.onTrafficLoadMutex.Unlock()
if stub != nil {
fake.OnTrafficLoadStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
return len(fake.onTrafficLoadArgsForCall)
}
func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) {
fake.onTrafficLoadMutex.Lock()
defer fake.onTrafficLoadMutex.Unlock()
fake.OnTrafficLoadStub = stub
}
func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) {
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
argsForCall := fake.onTrafficLoadArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion {
fake.protocolVersionMutex.Lock()
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
@@ -6577,8 +6477,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
defer fake.getSubscribedTracksMutex.RUnlock()
fake.getTrafficLoadMutex.RLock()
defer fake.getTrafficLoadMutex.RUnlock()
fake.getTrailerMutex.RLock()
defer fake.getTrailerMutex.RUnlock()
fake.handleAnswerMutex.RLock()
@@ -6653,8 +6551,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onTrackUnpublishedMutex.RUnlock()
fake.onTrackUpdatedMutex.RLock()
defer fake.onTrackUpdatedMutex.RUnlock()
fake.onTrafficLoadMutex.RLock()
defer fake.onTrafficLoadMutex.RUnlock()
fake.protocolVersionMutex.RLock()
defer fake.protocolVersionMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()