diff --git a/pkg/rtc/forwarder.go b/pkg/rtc/forwarder.go index de223be74..6cba872c5 100644 --- a/pkg/rtc/forwarder.go +++ b/pkg/rtc/forwarder.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/sfu" "github.com/pion/ion-sfu/pkg/log" "github.com/pion/rtcp" @@ -16,11 +17,8 @@ import ( // a forwarder publishes data to a target track or datachannel // manages the RTCP loop with the target peer type Forwarder interface { - ID() string ChannelType() ChannelType - PeerID() string WriteRTP(*rtp.Packet) error - WriteRTCP(pkts []rtcp.Packet) error Start() Close() CreatedAt() time.Time @@ -28,6 +26,10 @@ type Forwarder interface { OnClose(func(Forwarder)) } +type RTCPWriter interface { + WriteRTCP(pkts []rtcp.Packet) error +} + type ChannelType int const ( @@ -37,10 +39,9 @@ const ( ) type SimpleForwarder struct { - id string ctx context.Context cancel context.CancelFunc - peer *WebRTCPeer // destination peer + rtcpWriter RTCPWriter // write RTCP to source peer sender *webrtc.RTPSender // destination sender track *webrtc.Track // sender track buffer *sfu.Buffer @@ -56,18 +57,17 @@ type SimpleForwarder struct { onClose func(forwarder Forwarder) } -func NewSimpleForwarder(ctx context.Context, peer *WebRTCPeer, sender *webrtc.RTPSender, buffer *sfu.Buffer) *SimpleForwarder { +func NewSimpleForwarder(ctx context.Context, rtcpWriter RTCPWriter, sender *webrtc.RTPSender, buffer *sfu.Buffer) *SimpleForwarder { ctx, cancel := context.WithCancel(ctx) f := &SimpleForwarder{ - id: peer.ID(), - ctx: ctx, - cancel: cancel, - peer: peer, - sender: sender, - buffer: buffer, - track: sender.Track(), - payload: sender.Track().PayloadType(), - createdAt: time.Now(), + ctx: ctx, + cancel: cancel, + rtcpWriter: rtcpWriter, + sender: sender, + buffer: buffer, + track: sender.Track(), + payload: sender.Track().PayloadType(), + createdAt: time.Now(), } if f.track.Kind() == webrtc.RTPCodecTypeAudio { @@ -79,15 +79,6 @@ func NewSimpleForwarder(ctx context.Context, peer *WebRTCPeer, sender *webrtc.RT return f } -// should be identical to peer id -func (f *SimpleForwarder) ID() string { - return f.id -} - -func (f *SimpleForwarder) PeerID() string { - return f.peer.ID() -} - func (f *SimpleForwarder) ChannelType() ChannelType { return f.channelType } @@ -127,10 +118,6 @@ func (f *SimpleForwarder) WriteRTP(pkt *rtp.Packet) error { return nil } -func (f *SimpleForwarder) WriteRTCP(pkts []rtcp.Packet) error { - return f.peer.conn.WriteRTCP(pkts) -} - func (f *SimpleForwarder) OnClose(closeFunc func(Forwarder)) { f.onClose = closeFunc } @@ -145,10 +132,6 @@ func (f *SimpleForwarder) rtcpWorker() { for { pkts, err := f.sender.ReadRTCP() if err == io.ErrClosedPipe { - // TODO: handle peer closed - //if recv := s.router.GetReceiver(0); recv != nil { - // recv.DeleteSender(s.id) - //} f.Close() return } @@ -186,9 +169,9 @@ func (f *SimpleForwarder) rtcpWorker() { } if len(fwdPkts) > 0 { - if err := f.WriteRTCP(fwdPkts); err != nil { - // TODO: log error - //log.Errorf("Forwarding rtcp from sender err: %v", err) + if err := f.rtcpWriter.WriteRTCP(fwdPkts); err != nil { + logger.GetLogger().Warnw("could not forward RTCP to peer", + "err", err) } } } diff --git a/pkg/rtc/peer.go b/pkg/rtc/peer.go index f260f9ebb..7094e8381 100644 --- a/pkg/rtc/peer.go +++ b/pkg/rtc/peer.go @@ -8,6 +8,8 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pkg/errors" + + "github.com/livekit/livekit-server/pkg/logger" ) type WebRTCPeer struct { @@ -169,8 +171,8 @@ func (p *WebRTCPeer) RemoveSubscriber(peerId string) { func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) { // create Receiver - receiver := NewReceiver(p.ctx, rtpReceiver, p.receiverConfig, p.mediaEngine) - pt := NewPeerTrack(p.ctx, p.id, track, receiver) + receiver := NewReceiver(p.ctx, p.id, rtpReceiver, p.receiverConfig, p.mediaEngine) + pt := NewPeerTrack(p.ctx, p.id, p.conn, track, receiver) p.lock.Lock() p.tracks = append(p.tracks, pt) @@ -201,8 +203,10 @@ func (p *WebRTCPeer) rtcpSendWorker() { p.lock.RUnlock() if len(pkts) > 0 { if err := p.conn.WriteRTCP(pkts); err != nil { - // TODO: log error - //log.Errorf("write rtcp err: %v", err) + logger.GetLogger().Errorw("error writing RTCP to peer", + "peer", p.id, + "err", err, + ) } } case <-p.ctx.Done(): diff --git a/pkg/rtc/receiver.go b/pkg/rtc/receiver.go index c0da73c23..15fbcd9c9 100644 --- a/pkg/rtc/receiver.go +++ b/pkg/rtc/receiver.go @@ -6,6 +6,7 @@ import ( "github.com/pion/rtp" + "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/sfu" "context" @@ -21,6 +22,7 @@ const ( // A receiver is responsible for pulling from a track type Receiver struct { + peerId string ctx context.Context cancel context.CancelFunc rtpReceiver *webrtc.RTPReceiver @@ -32,12 +34,13 @@ type Receiver struct { onCloseHandler func(r *Receiver) } -func NewReceiver(ctx context.Context, rtpReceiver *webrtc.RTPReceiver, conf ReceiverConfig, me *MediaEngine) *Receiver { +func NewReceiver(ctx context.Context, peerId string, rtpReceiver *webrtc.RTPReceiver, conf ReceiverConfig, me *MediaEngine) *Receiver { ctx, cancel := context.WithCancel(ctx) track := rtpReceiver.Track() return &Receiver{ ctx: ctx, cancel: cancel, + peerId: peerId, rtpReceiver: rtpReceiver, track: track, rtpChan: make(chan *rtp.Packet, maxChanSize), @@ -102,6 +105,11 @@ func (r *Receiver) rtpWorker() { if err != nil { // log and continue + logger.GetLogger().Warnw("receiver error reading RTP", + "peer", r.peerId, + "track", r.track.SSRC(), + "err", err, + ) continue } @@ -124,7 +132,11 @@ func (r *Receiver) rtcpWorker() { return } if err != nil { - // TODO: log + logger.GetLogger().Warnw("receiver error reading RTCP", + "peer", r.peerId, + "track", r.track.SSRC(), + "err", err, + ) continue } diff --git a/pkg/rtc/track.go b/pkg/rtc/track.go index 817518405..efa0f4281 100644 --- a/pkg/rtc/track.go +++ b/pkg/rtc/track.go @@ -21,19 +21,22 @@ type PeerTrack struct { peerId string // source track track *webrtc.Track - lock sync.RWMutex + // source RTCPWriter to forward RTCP requests + rtcpWriter RTCPWriter + lock sync.RWMutex // map of target peerId -> forwarder forwarders map[string]Forwarder receiver *Receiver lastNack int64 } -func NewPeerTrack(ctx context.Context, peerId string, track *webrtc.Track, receiver *Receiver) *PeerTrack { +func NewPeerTrack(ctx context.Context, peerId string, rtcpWriter RTCPWriter, track *webrtc.Track, receiver *Receiver) *PeerTrack { return &PeerTrack{ id: track.SSRC(), ctx: ctx, peerId: peerId, track: track, + rtcpWriter: rtcpWriter, lock: sync.RWMutex{}, forwarders: make(map[string]Forwarder), receiver: receiver, @@ -60,7 +63,7 @@ func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error { return err } - forwarder := NewSimpleForwarder(t.ctx, peer, rtpSender, t.receiver.buffer) + forwarder := NewSimpleForwarder(t.ctx, t.rtcpWriter, rtpSender, t.receiver.buffer) forwarder.OnClose(func(f Forwarder) { t.lock.Lock() delete(t.forwarders, peer.ID()) @@ -68,7 +71,8 @@ func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error { if err := peer.conn.RemoveTrack(rtpSender); err != nil { logger.GetLogger().Warnw("could not remove track from forwarder", - "peer", peer.ID()) + "peer", peer.ID(), + "err", err) } }) @@ -98,7 +102,7 @@ func (t *PeerTrack) forwardWorker() { for pkt := range t.receiver.RTPChan() { now := time.Now() t.lock.RLock() - for _, forwarder := range t.forwarders { + for dstPeerId, forwarder := range t.forwarders { // There exists a bug in chrome where setLocalDescription // fails if track RTP arrives before the sfu offer is set. // We delay sending RTP here to avoid the issue. @@ -109,8 +113,9 @@ func (t *PeerTrack) forwardWorker() { if err := forwarder.WriteRTP(pkt); err != nil { logger.GetLogger().Warnw("could not forward packet to peer", "srcPeer", t.peerId, - "dstPeer", forwarder.PeerID(), - "track", forwarder.ID()) + "dstPeer", dstPeerId, + "track", t.track.SSRC(), + "err", err) } } t.lock.RUnlock()