From fa490dd51031b007b2f2fdccbf77db1b39efa461 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 25 Jun 2024 14:55:42 +0530 Subject: [PATCH] Log rtp stats more consistently. (#2816) * Log rtp stats more consistently. Thank you Paul for the logging tip. Also update deps. * remove duplicate logging field * nil check --- go.mod | 14 ++-- go.sum | 39 ++++++---- pkg/sfu/buffer/rtpstats_base.go | 28 +++---- pkg/sfu/buffer/rtpstats_receiver.go | 74 ++++++++++-------- pkg/sfu/buffer/rtpstats_sender.go | 116 +++++++++++----------------- 5 files changed, 127 insertions(+), 144 deletions(-) diff --git a/go.mod b/go.mod index e26f6c4d8..a0f0370e2 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,9 @@ require ( github.com/jellydator/ttlcache/v3 v3.2.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240622055623-ce8d272f389e - github.com/livekit/protocol v1.19.0 - github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 + github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 + github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9 + github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 @@ -85,10 +85,10 @@ require ( github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-retryablehttp v0.7.5 // indirect + github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -97,7 +97,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect - github.com/nats-io/nats.go v1.35.0 // indirect + github.com/nats-io/nats.go v1.36.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -128,7 +128,7 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect google.golang.org/grpc v1.64.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 3883f60c8..18e1c4eb7 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw= github.com/florianl/go-tc v0.4.3/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= @@ -101,10 +103,10 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= -github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= -github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= -github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= @@ -131,8 +133,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -150,16 +152,20 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240622055623-ce8d272f389e h1:ZKA07UcpsdMmLUAA/GHJiFbyZ/QHpggIk7npkjUx9H4= -github.com/livekit/mediatransportutil v0.0.0-20240622055623-ce8d272f389e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.0 h1:EPcFQAa6ymVknKn21NbSeFuUsHTA8r3DH+RmP72yRrU= -github.com/livekit/protocol v1.19.0/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4= -github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= -github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= +github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= +github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9 h1:dG/rhtgXL3Clh/ywRH2g2g5pnssPr98OP6VmbOCSB2A= +github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9/go.mod h1:zUHHcMbhBRDe0+OBTHbQT1c7njLKEki+xe1qPpR7LhU= +github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= +github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM= @@ -191,8 +197,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= -github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk= -github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -286,7 +292,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -472,8 +477,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e h1:Elxv5MwEkCI9f5SkoL6afed6NTdxaGoAo39eANBwHL8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index a3ab05126..07c906e70 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -15,6 +15,7 @@ package buffer import ( + "errors" "fmt" "sync" "time" @@ -573,10 +574,10 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u return packetsSeen - r.packetsPadding } -func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) *RTPDeltaInfo { +func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) { then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN) if now == nil || then == nil { - return nil + return } startTime := then.startTime @@ -587,22 +588,23 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes packetsExpected = 0 } if packetsExpected > cNumSequenceNumbers { - r.logger.Infow( - "too many packets expected in delta", + loggingFields = []interface{}{ "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, "startTime", startTime, "endTime", endTime, "duration", endTime.Sub(startTime).String(), - ) - return nil + } + err = errors.New("too many packets expected in delta") + return } if packetsExpected == 0 { - return &RTPDeltaInfo{ + deltaInfo = &RTPDeltaInfo{ StartTime: startTime, EndTime: endTime, } + return } packetsLost := uint32(now.packetsLost - then.packetsLost) @@ -613,21 +615,20 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes // padding packets delta could be higher than expected due to out-of-order padding packets packetsPadding := now.packetsPadding - then.packetsPadding if packetsExpected < packetsPadding { - r.logger.Infow( - "padding packets more than expected", + loggingFields = []interface{}{ "packetsExpected", packetsExpected, "packetsPadding", packetsPadding, "packetsLost", packetsLost, "startSequenceNumber", then.extStartSN, - "endSequenceNumber", now.extStartSN-1, - "rtpStats", r, - ) + "endSequenceNumber", now.extStartSN - 1, + } + err = errors.New("padding packets more than expected") packetsExpected = 0 } else { packetsExpected -= packetsPadding } - return &RTPDeltaInfo{ + deltaInfo = &RTPDeltaInfo{ StartTime: startTime, EndTime: endTime, Packets: uint32(packetsExpected), @@ -648,6 +649,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes Plis: now.plis - then.plis, Firs: now.firs - then.firs, } + return } func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index b67a50a00..eb3804fbe 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -176,10 +176,7 @@ func (r *RTPStatsReceiver) Update( r.logger.Debugw( "rtp receiver stream start", - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "startSN", r.sequenceNumber.GetExtendedStart(), - "startTS", r.timestamp.GetExtendedStart(), + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } else { resSN = r.sequenceNumber.Update(sequenceNumber) @@ -194,13 +191,6 @@ func (r *RTPStatsReceiver) Update( gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) getLoggingFields := func() []interface{} { return []interface{}{ - "extStartSN", r.sequenceNumber.GetExtendedStart(), - "extHighestSN", r.sequenceNumber.GetExtendedHighest(), - "extStartTS", r.timestamp.GetExtendedStart(), - "extHighestTS", r.timestamp.GetExtendedHighest(), - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "highestTime", r.highestTime.String(), "prevSN", resSN.PreExtendedHighest, "currSN", resSN.ExtendedVal, "gapSN", gapSN, @@ -214,8 +204,6 @@ func (r *RTPStatsReceiver) Update( "hdrSize", hdrSize, "payloadSize", payloadSize, "paddingSize", paddingSize, - "first", r.srFirst, - "last", r.srNewest, } } if gapSN <= 0 { // duplicate OR out-of-order @@ -314,9 +302,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received sender report, anachronous, dropping", - "first", r.srFirst, - "last", r.srNewest, "current", srData, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) return false } @@ -370,10 +357,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.outOfOrderSenderReportCount%10 == 0 { r.logger.Infow( "received sender report, out-of-order, skipping", - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, "count", r.outOfOrderSenderReportCount, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } r.outOfOrderSenderReportCount++ @@ -394,8 +380,6 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.clockSkewCount%100 == 0 { r.logger.Infow( "received sender report, clock skew", - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, "timeSinceFirst", timeSinceFirst, "rtpDiffSinceFirst", rtpDiffSinceFirst, @@ -404,6 +388,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) "rtpDiffSinceLast", rtpDiffSinceLast, "calculatedLast", calculatedClockRateFromLast, "count", r.clockSkewCount, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } r.clockSkewCount++ @@ -414,16 +399,13 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) var deltaPropagationDelay time.Duration getPropagationFields := func() []interface{} { return []interface{}{ - "propagationDelay", r.propagationDelay.String(), "receivedPropagationDelay", propagationDelay.String(), - "longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay.String(), "receivedDeltaPropagationDelay", deltaPropagationDelay.String(), "deltaHighCount", r.propagationDelayDeltaHighCount, "sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(), "propagationDelaySpike", r.propagationDelaySpike.String(), - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, } } resetDelta := func() { @@ -537,6 +519,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin r.logger.Warnw( "too many packets expected in receiver report", fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected), + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) return nil } @@ -584,7 +567,16 @@ func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { r.lock.Lock() defer r.lock.Unlock() - return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()) + deltaInfo, err, loggingFields := r.deltaInfo( + snapshotID, + r.sequenceNumber.GetExtendedStart(), + r.sequenceNumber.GetExtendedHighest(), + ) + if err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})) + } + + return deltaInfo } func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -595,16 +587,7 @@ func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { r.lock.RLock() defer r.lock.RUnlock() - e.AddObject("base", r.rtpStatsBase) - - e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) - e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) - e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) - e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) - - e.AddDuration("propagationDelay", r.propagationDelay) - e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay) - return nil + return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e) } func (r *RTPStatsReceiver) String() string { @@ -635,3 +618,26 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool { } // ---------------------------------- + +type lockedRTPStatsReceiverLogEncoder struct { + *RTPStatsReceiver +} + +func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r.RTPStatsReceiver == nil { + return nil + } + + e.AddObject("base", r.rtpStatsBase) + + e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) + e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) + e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) + e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) + + e.AddDuration("propagationDelay", r.propagationDelay) + e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay) + return nil +} + +// ---------------------------------- diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 9740a7fba..e438ad395 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -277,10 +277,7 @@ func (r *RTPStatsSender) Update( r.logger.Debugw( "rtp sender stream start", - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "startSN", r.extStartSN, - "startTS", r.extStartTS, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) } @@ -289,26 +286,16 @@ func (r *RTPStatsSender) Update( gapSN := int64(extSequenceNumber - r.extHighestSN) getLoggingFields := func() []interface{} { return []interface{}{ - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "highestTime", r.highestTime.String(), - "highestSN", r.extHighestSN, "currSN", extSequenceNumber, "gapSN", gapSN, - "highestTS", r.extHighestTS, "currTS", extTimestamp, "gapTS", int64(extTimestamp - r.extHighestTS), - "packetTime", packetTime.String(), + "packetTime", packetTime, "marker", marker, "hdrSize", hdrSize, "payloadSize", payloadSize, "paddingSize", paddingSize, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, } } if gapSN <= 0 { // duplicate OR out-of-order @@ -340,9 +327,7 @@ func (r *RTPStatsSender) Update( r.logger.Infow( "adjusting start sequence number", append(getLoggingFields(), - "snBefore", r.extStartSN, "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, "tsAfter", extTimestamp, )..., ) @@ -399,9 +384,7 @@ func (r *RTPStatsSender) Update( r.logger.Infow( "adjusting start timestamp", append(getLoggingFields(), - "snBefore", r.extStartSN, "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, "tsAfter", extTimestamp, )..., ) @@ -472,12 +455,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR { r.logger.Debugw( fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) return } @@ -539,24 +519,11 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } r.logger.Infow( "rr interval too big, skipping", - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "timeSinceLastRR", timeSinceLastRR.String(), "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "extLastRRSN", s.extLastRRSN, - "firstTime", r.firstTime.String(), - "startTime", r.startTime.String(), - "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) continue } @@ -573,27 +540,14 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if r.metadataCacheOverflowCount%10 == 0 { r.logger.Infow( "metadata cache overflow", - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "timeSinceLastRR", timeSinceLastRR.String(), "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "extLastRRSN", s.extLastRRSN, - "firstTime", r.firstTime.String(), - "startTime", r.startTime.String(), - "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, "intervalStats", is.ToString(), "aggregateIntervalStats", eis.ToString(), - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, "count", r.metadataCacheOverflowCount, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) } r.metadataCacheOverflowCount++ @@ -671,22 +625,17 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS getFields := func() []interface{} { return []interface{}{ - "first", r.srFirst, - "last", r.srNewest, "curr", srData, "feed", publisherSRData, "tsOffset", tsOffset, "timeNow", time.Now().String(), "now", now.String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), "timeSinceHighest", now.Sub(r.highestTime).String(), - "firstTime", r.firstTime.String(), "timeSinceFirst", now.Sub(r.firstTime).String(), "timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(), "timeSincePublisherSR", time.Since(publisherSRData.At).String(), "nowRTPExt", nowRTPExt, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, } } if r.srNewest != nil && nowRTPExt >= r.srNewest.RTPTimestampExt { @@ -733,7 +682,16 @@ func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { r.lock.Lock() defer r.lock.Unlock() - return r.deltaInfo(snapshotID, r.extStartSN, r.extHighestSN) + deltaInfo, err, loggingFields := r.deltaInfo( + snapshotID, + r.extStartSN, + r.extHighestSN, + ) + if err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})) + } + + return deltaInfo } func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo { @@ -761,8 +719,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "startTime", startTime.String(), "endTime", endTime.String(), "duration", endTime.Sub(startTime).String(), - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) return nil } @@ -790,6 +747,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo packetsLost, packetsLostFeed, ), + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) packetsLost = packetsExpected } @@ -833,18 +791,7 @@ func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error { r.lock.RLock() defer r.lock.RUnlock() - e.AddObject("base", r.rtpStatsBase) - e.AddUint64("extStartSN", r.extStartSN) - e.AddUint64("extHighestSN", r.extHighestSN) - e.AddUint64("extStartTS", r.extStartTS) - e.AddUint64("extHighestTS", r.extHighestTS) - e.AddTime("lastRRTime", r.lastRRTime) - e.AddReflected("lastRR", r.lastRR) - e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) - e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) - e.AddFloat64("jitterFromRR", r.jitterFromRR) - e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) - return nil + return lockedRTPStatsSenderLogEncoder{r}.MarshalLogObject(e) } func (r *RTPStatsSender) String() string { @@ -1027,3 +974,26 @@ func (r *RTPStatsSender) getIntervalStats( } // ------------------------------------------------------------------- + +type lockedRTPStatsSenderLogEncoder struct { + *RTPStatsSender +} + +func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r.RTPStatsSender == nil { + return nil + } + + e.AddObject("base", r.rtpStatsBase) + e.AddUint64("extStartSN", r.extStartSN) + e.AddUint64("extHighestSN", r.extHighestSN) + e.AddUint64("extStartTS", r.extStartTS) + e.AddUint64("extHighestTS", r.extHighestTS) + e.AddTime("lastRRTime", r.lastRRTime) + e.AddReflected("lastRR", r.lastRR) + e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) + e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) + e.AddFloat64("jitterFromRR", r.jitterFromRR) + e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) + return nil +}