fixed forwarder's RTCP target

This commit is contained in:
David Zhao
2020-11-01 23:01:24 -08:00
parent 312a4f1def
commit a2efaccdde
4 changed files with 52 additions and 48 deletions

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/pion/ion-sfu/pkg/log"
"github.com/pion/rtcp"
@@ -16,11 +17,8 @@ import (
// a forwarder publishes data to a target track or datachannel
// manages the RTCP loop with the target peer
type Forwarder interface {
ID() string
ChannelType() ChannelType
PeerID() string
WriteRTP(*rtp.Packet) error
WriteRTCP(pkts []rtcp.Packet) error
Start()
Close()
CreatedAt() time.Time
@@ -28,6 +26,10 @@ type Forwarder interface {
OnClose(func(Forwarder))
}
type RTCPWriter interface {
WriteRTCP(pkts []rtcp.Packet) error
}
type ChannelType int
const (
@@ -37,10 +39,9 @@ const (
)
type SimpleForwarder struct {
id string
ctx context.Context
cancel context.CancelFunc
peer *WebRTCPeer // destination peer
rtcpWriter RTCPWriter // write RTCP to source peer
sender *webrtc.RTPSender // destination sender
track *webrtc.Track // sender track
buffer *sfu.Buffer
@@ -56,18 +57,17 @@ type SimpleForwarder struct {
onClose func(forwarder Forwarder)
}
func NewSimpleForwarder(ctx context.Context, peer *WebRTCPeer, sender *webrtc.RTPSender, buffer *sfu.Buffer) *SimpleForwarder {
func NewSimpleForwarder(ctx context.Context, rtcpWriter RTCPWriter, sender *webrtc.RTPSender, buffer *sfu.Buffer) *SimpleForwarder {
ctx, cancel := context.WithCancel(ctx)
f := &SimpleForwarder{
id: peer.ID(),
ctx: ctx,
cancel: cancel,
peer: peer,
sender: sender,
buffer: buffer,
track: sender.Track(),
payload: sender.Track().PayloadType(),
createdAt: time.Now(),
ctx: ctx,
cancel: cancel,
rtcpWriter: rtcpWriter,
sender: sender,
buffer: buffer,
track: sender.Track(),
payload: sender.Track().PayloadType(),
createdAt: time.Now(),
}
if f.track.Kind() == webrtc.RTPCodecTypeAudio {
@@ -79,15 +79,6 @@ func NewSimpleForwarder(ctx context.Context, peer *WebRTCPeer, sender *webrtc.RT
return f
}
// should be identical to peer id
func (f *SimpleForwarder) ID() string {
return f.id
}
func (f *SimpleForwarder) PeerID() string {
return f.peer.ID()
}
func (f *SimpleForwarder) ChannelType() ChannelType {
return f.channelType
}
@@ -127,10 +118,6 @@ func (f *SimpleForwarder) WriteRTP(pkt *rtp.Packet) error {
return nil
}
func (f *SimpleForwarder) WriteRTCP(pkts []rtcp.Packet) error {
return f.peer.conn.WriteRTCP(pkts)
}
func (f *SimpleForwarder) OnClose(closeFunc func(Forwarder)) {
f.onClose = closeFunc
}
@@ -145,10 +132,6 @@ func (f *SimpleForwarder) rtcpWorker() {
for {
pkts, err := f.sender.ReadRTCP()
if err == io.ErrClosedPipe {
// TODO: handle peer closed
//if recv := s.router.GetReceiver(0); recv != nil {
// recv.DeleteSender(s.id)
//}
f.Close()
return
}
@@ -186,9 +169,9 @@ func (f *SimpleForwarder) rtcpWorker() {
}
if len(fwdPkts) > 0 {
if err := f.WriteRTCP(fwdPkts); err != nil {
// TODO: log error
//log.Errorf("Forwarding rtcp from sender err: %v", err)
if err := f.rtcpWriter.WriteRTCP(fwdPkts); err != nil {
logger.GetLogger().Warnw("could not forward RTCP to peer",
"err", err)
}
}
}

View File

@@ -8,6 +8,8 @@ import (
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
"github.com/livekit/livekit-server/pkg/logger"
)
type WebRTCPeer struct {
@@ -169,8 +171,8 @@ func (p *WebRTCPeer) RemoveSubscriber(peerId string) {
func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) {
// create Receiver
receiver := NewReceiver(p.ctx, rtpReceiver, p.receiverConfig, p.mediaEngine)
pt := NewPeerTrack(p.ctx, p.id, track, receiver)
receiver := NewReceiver(p.ctx, p.id, rtpReceiver, p.receiverConfig, p.mediaEngine)
pt := NewPeerTrack(p.ctx, p.id, p.conn, track, receiver)
p.lock.Lock()
p.tracks = append(p.tracks, pt)
@@ -201,8 +203,10 @@ func (p *WebRTCPeer) rtcpSendWorker() {
p.lock.RUnlock()
if len(pkts) > 0 {
if err := p.conn.WriteRTCP(pkts); err != nil {
// TODO: log error
//log.Errorf("write rtcp err: %v", err)
logger.GetLogger().Errorw("error writing RTCP to peer",
"peer", p.id,
"err", err,
)
}
}
case <-p.ctx.Done():

View File

@@ -6,6 +6,7 @@ import (
"github.com/pion/rtp"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/sfu"
"context"
@@ -21,6 +22,7 @@ const (
// A receiver is responsible for pulling from a track
type Receiver struct {
peerId string
ctx context.Context
cancel context.CancelFunc
rtpReceiver *webrtc.RTPReceiver
@@ -32,12 +34,13 @@ type Receiver struct {
onCloseHandler func(r *Receiver)
}
func NewReceiver(ctx context.Context, rtpReceiver *webrtc.RTPReceiver, conf ReceiverConfig, me *MediaEngine) *Receiver {
func NewReceiver(ctx context.Context, peerId string, rtpReceiver *webrtc.RTPReceiver, conf ReceiverConfig, me *MediaEngine) *Receiver {
ctx, cancel := context.WithCancel(ctx)
track := rtpReceiver.Track()
return &Receiver{
ctx: ctx,
cancel: cancel,
peerId: peerId,
rtpReceiver: rtpReceiver,
track: track,
rtpChan: make(chan *rtp.Packet, maxChanSize),
@@ -102,6 +105,11 @@ func (r *Receiver) rtpWorker() {
if err != nil {
// log and continue
logger.GetLogger().Warnw("receiver error reading RTP",
"peer", r.peerId,
"track", r.track.SSRC(),
"err", err,
)
continue
}
@@ -124,7 +132,11 @@ func (r *Receiver) rtcpWorker() {
return
}
if err != nil {
// TODO: log
logger.GetLogger().Warnw("receiver error reading RTCP",
"peer", r.peerId,
"track", r.track.SSRC(),
"err", err,
)
continue
}

View File

@@ -21,19 +21,22 @@ type PeerTrack struct {
peerId string
// source track
track *webrtc.Track
lock sync.RWMutex
// source RTCPWriter to forward RTCP requests
rtcpWriter RTCPWriter
lock sync.RWMutex
// map of target peerId -> forwarder
forwarders map[string]Forwarder
receiver *Receiver
lastNack int64
}
func NewPeerTrack(ctx context.Context, peerId string, track *webrtc.Track, receiver *Receiver) *PeerTrack {
func NewPeerTrack(ctx context.Context, peerId string, rtcpWriter RTCPWriter, track *webrtc.Track, receiver *Receiver) *PeerTrack {
return &PeerTrack{
id: track.SSRC(),
ctx: ctx,
peerId: peerId,
track: track,
rtcpWriter: rtcpWriter,
lock: sync.RWMutex{},
forwarders: make(map[string]Forwarder),
receiver: receiver,
@@ -60,7 +63,7 @@ func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error {
return err
}
forwarder := NewSimpleForwarder(t.ctx, peer, rtpSender, t.receiver.buffer)
forwarder := NewSimpleForwarder(t.ctx, t.rtcpWriter, rtpSender, t.receiver.buffer)
forwarder.OnClose(func(f Forwarder) {
t.lock.Lock()
delete(t.forwarders, peer.ID())
@@ -68,7 +71,8 @@ func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error {
if err := peer.conn.RemoveTrack(rtpSender); err != nil {
logger.GetLogger().Warnw("could not remove track from forwarder",
"peer", peer.ID())
"peer", peer.ID(),
"err", err)
}
})
@@ -98,7 +102,7 @@ func (t *PeerTrack) forwardWorker() {
for pkt := range t.receiver.RTPChan() {
now := time.Now()
t.lock.RLock()
for _, forwarder := range t.forwarders {
for dstPeerId, forwarder := range t.forwarders {
// There exists a bug in chrome where setLocalDescription
// fails if track RTP arrives before the sfu offer is set.
// We delay sending RTP here to avoid the issue.
@@ -109,8 +113,9 @@ func (t *PeerTrack) forwardWorker() {
if err := forwarder.WriteRTP(pkt); err != nil {
logger.GetLogger().Warnw("could not forward packet to peer",
"srcPeer", t.peerId,
"dstPeer", forwarder.PeerID(),
"track", forwarder.ID())
"dstPeer", dstPeerId,
"track", t.track.SSRC(),
"err", err)
}
}
t.lock.RUnlock()