From ef2e5efe14eb1fc32d625552d8bc9d66a0607c65 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 13 May 2026 16:04:53 +0530 Subject: [PATCH] Log large packets receive/send. (#4521) * Log large packets receive/send. Seeing cases of servers reporting need for segmentation/re-assembly of packets. So, logging packet receive/send for RTP/RTCP to check if anything is seeing more than 1400 byte packets. * log downtrack RTCP too --- pkg/rtc/mediatrack.go | 12 +++++++++++- pkg/rtc/transport.go | 5 +++++ pkg/sfu/buffer/buffer.go | 6 ++++-- pkg/sfu/downtrack.go | 6 ++++++ pkg/sfu/pacer/base.go | 4 ++++ 5 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index bc4238aa9..f938f364c 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -301,16 +301,23 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe var lastRR uint32 rtcpReader.OnPacket(func(bytes []byte) { + isLargePacket := len(bytes) > 1400 pkts, err := rtcp.Unmarshal(bytes) if err != nil { - t.params.Logger.Errorw("could not unmarshal RTCP", err) + t.params.Logger.Errorw("could not unmarshal RTCP", err, "size", len(bytes)) return } for _, pkt := range pkts { switch pkt := pkt.(type) { case *rtcp.SourceDescription: + if isLargePacket { + t.params.Logger.Infow("large RTCP packet received with SDES", "size", len(bytes)) + } case *rtcp.SenderReport: + if isLargePacket { + t.params.Logger.Infow("large RTCP packet received with sender report", "size", len(bytes), "SSRC", pkt.SSRC) + } if pkt.SSRC == uint32(track.SSRC()) { buff.SetSenderReportData(&livekit.RTCPSenderReportState{ RtpTimestamp: pkt.RTPTime, @@ -321,6 +328,9 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track sfu.TrackRe }) } case *rtcp.ExtendedReport: + if isLargePacket { + t.params.Logger.Infow("large RTCP packet received with extendedreport", "size", len(bytes)) + } rttFromXR: for _, report := range pkt.Reports { if rr, ok := report.(*rtcp.DLRRReportBlock); ok { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 13b3276ad..21a620f09 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1432,6 +1432,11 @@ func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType { } func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error { + // TODO-CLEANUP-PACKET-SIZE: remove after checking for large packets + raw, _ := rtcp.Marshal(pkts) + if len(raw) > 1400 { + t.params.Logger.Infow("large RTCP packet send", "size", len(raw)) + } return t.pc.WriteRTCP(pkts) } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index e3663eb44..41d76fd26 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -138,9 +138,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili } // Write adds an RTP Packet, ordering is not guaranteed, newer packets may arrive later -// -//go:noinline func (b *Buffer) Write(pkt []byte) (n int, err error) { + if len(pkt) > 1400 { + b.logger.Infow("large RTP packet received", "size", len(pkt)) + } + var rtpPacket rtp.Packet err = rtpPacket.Unmarshal(pkt) if err != nil { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 38cf31569..92cdb8633 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -633,6 +633,9 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.writeStream = t.WriteStream() if rr := d.params.BufferFactory.GetOrNew(packetio.RTCPBufferPacket, d.ssrc).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { + if len(pkt) > 1400 { + d.params.Logger.Infow("large RTCP packet received primary", "size", len(pkt)) + } d.handleRTCP(pkt) }) d.rtcpReader = rr @@ -640,6 +643,9 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, if d.ssrcRTX != 0 { if rr := d.params.BufferFactory.GetOrNew(packetio.RTCPBufferPacket, d.ssrcRTX).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { + if len(pkt) > 1400 { + d.params.Logger.Infow("large RTCP packet received rtx", "size", len(pkt)) + } d.handleRTCPRTX(pkt) }) d.rtcpReaderRTX = rr diff --git a/pkg/sfu/pacer/base.go b/pkg/sfu/pacer/base.go index 96e3be250..bf8ac7cc3 100644 --- a/pkg/sfu/pacer/base.go +++ b/pkg/sfu/pacer/base.go @@ -76,6 +76,10 @@ func (b *Base) SendPacket(p *Packet) (int, error) { return 0, err } + if (p.HeaderSize + len(p.Payload)) > 1400 { + b.logger.Infow("large RTP packet send", "size", p.HeaderSize+len(p.Payload)) + } + var written int written, err = p.WriteStream.WriteRTP(p.Header, p.Payload) if err != nil {