diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 334a320dc..efe7a4000 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -154,6 +154,7 @@ type DownTrack struct { // Debug info pktsDropped atomic.Uint32 + writeErrors atomic.Uint32 isNACKThrottled atomic.Bool @@ -434,6 +435,9 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if tp.isDroppingRelevant { d.pktsDropped.Inc() } + if err != nil { + d.logger.Errorw("write rtp packet failed", err) + } return err } @@ -444,6 +448,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { payload, err = d.translateVP8PacketTo(extPkt.Packet, &incomingVP8, tp.vp8.Header, pool) if err != nil { d.pktsDropped.Inc() + d.logger.Errorw("write rtp packet failed", err) return err } } @@ -459,6 +464,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { hdr, err := d.getTranslatedRTPHeader(extPkt, tp) if err != nil { d.pktsDropped.Inc() + d.logger.Errorw("write rtp packet failed", err) return err } @@ -467,37 +473,40 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } _, err = d.writeStream.WriteRTP(hdr, payload) - if err == nil { - pktSize := hdr.MarshalSize() + len(payload) - for _, f := range d.onPacketSent { - f(d, pktSize) - } - - if tp.isSwitchingToMaxLayer && d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { - d.onMaxLayerChanged(d, layer) - } - - if extPkt.KeyFrame || tp.switchingToTargetLayer { - d.isNACKThrottled.Store(false) - d.rtpStats.UpdateKeyFrame(1) - - locked, _ := d.forwarder.CheckSync() - if locked { - d.stopKeyFrameRequester() - } - - if !tp.switchingToTargetLayer { - d.logger.Debugw("forwarding key frame", "layer", layer) - } - } - - d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) - } else { - d.logger.Errorw("writing rtp packet err", err) + if err != nil { d.pktsDropped.Inc() + writeErrors := d.writeErrors.Inc() + if (writeErrors % 100) == 1 { + d.logger.Errorw("write rtp packet failed", err, "count", writeErrors) + } + return err } - return err + pktSize := hdr.MarshalSize() + len(payload) + for _, f := range d.onPacketSent { + f(d, pktSize) + } + + if tp.isSwitchingToMaxLayer && d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { + d.onMaxLayerChanged(d, layer) + } + + if extPkt.KeyFrame || tp.switchingToTargetLayer { + d.isNACKThrottled.Store(false) + d.rtpStats.UpdateKeyFrame(1) + + locked, _ := d.forwarder.CheckSync() + if locked { + d.stopKeyFrameRequester() + } + + if !tp.switchingToTargetLayer { + d.logger.Debugw("forwarding key frame", "layer", layer) + } + } + + d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) + return nil } // WritePaddingRTP tries to write as many padding only RTP packets as necessary diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 65d7d3861..7acd2e9cf 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -563,9 +563,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } w.downTrackSpreader.Broadcast(func(dt TrackSender) { - if err := dt.WriteRTP(pkt, spatialLayer); err != nil { - w.logger.Errorw("failed writing to down track", err) - } + _ = dt.WriteRTP(pkt, spatialLayer) }) } }