From af0c9dfdf089b246aa6c4fdec6909c0f7fe08f2e Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 12 Dec 2020 23:05:53 -0800 Subject: [PATCH] server back to working, tho only works when client joins first --- go.mod | 6 ------ pkg/rtc/errors.go | 5 +---- pkg/rtc/forwarder.go | 5 +++++ pkg/rtc/participant.go | 11 ++++++----- pkg/rtc/track.go | 23 +++++++++++++++++------ pkg/sfu/downtrack.go | 5 +---- 6 files changed, 30 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 292751cef..c7888b8e1 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/livekit/livekit-server go 1.15 require ( - github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.2 github.com/google/wire v0.4.0 @@ -14,21 +13,16 @@ require ( github.com/pion/interceptor v0.0.5 github.com/pion/ion-log v1.0.0 github.com/pion/ion-sfu v1.6.3 - github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.1 github.com/pion/sdp/v3 v3.0.3 github.com/pion/stun v0.3.5 github.com/pion/webrtc/v3 v3.0.0-beta.15.0.20201209023348-63401a8837fb github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.6.1 github.com/twitchtv/twirp v7.1.0+incompatible github.com/urfave/cli/v2 v2.2.0 github.com/urfave/negroni v1.0.0 go.uber.org/zap v1.16.0 - golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 // indirect - golang.org/x/net v0.0.0-20201207224615-747e23833adb // indirect - golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect golang.org/x/tools v0.0.0-20200815165600-90abf76919f3 // indirect google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.3.0 diff --git a/pkg/rtc/errors.go b/pkg/rtc/errors.go index ca3f17380..e1bc58266 100644 --- a/pkg/rtc/errors.go +++ b/pkg/rtc/errors.go @@ -3,8 +3,5 @@ package rtc import "errors" var ( - ErrRoomIdMissing = errors.New("roomId is not set") - ErrPeerExists = errors.New("peer already exists") - ErrPeerNotConnected = errors.New("peer has not been connected") - ErrUnsupportedPayloadType = errors.New("peer does not support payload type") + ErrRoomIdMissing = errors.New("roomId is not set") ) diff --git a/pkg/rtc/forwarder.go b/pkg/rtc/forwarder.go index 89c3c9bc4..afe1f72af 100644 --- a/pkg/rtc/forwarder.go +++ b/pkg/rtc/forwarder.go @@ -26,6 +26,7 @@ type Forwarder interface { Start() Close() CreatedAt() time.Time + Track() *sfu.DownTrack OnClose(func(Forwarder)) } @@ -129,6 +130,10 @@ func (f *SimpleForwarder) CreatedAt() time.Time { return f.createdAt } +func (f *SimpleForwarder) Track() *sfu.DownTrack { + return f.track +} + // rtcpWorker receives RTCP packets from the destination peer // this include packet loss packets func (f *SimpleForwarder) rtcpWorker() { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 9635810c4..8fa51caa5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -73,6 +73,7 @@ func NewParticipant(conf *WebRTCConfig, sc SignalConnection, name string) (*Part ctx: ctx, cancel: cancel, bi: bi, + rtcpCh: make(chan []rtcp.Packet, 10), downTracks: make(map[string][]*sfu.DownTrack), state: livekit.ParticipantInfo_JOINING, lock: sync.RWMutex{}, @@ -190,8 +191,6 @@ func (p *Participant) Answer(sdp webrtc.SessionDescription) (answer webrtc.Sessi return } - logger.GetLogger().Debugw("created new offer", "offer", offer, "onOffer", p.OnOffer) - logger.GetLogger().Debugw("sending available offer to participant") err = p.sigConn.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Negotiate{ @@ -230,7 +229,9 @@ func (p *Participant) addDownTrack(streamId string, dt *sfu.DownTrack) { p.lock.Lock() p.downTracks[streamId] = append(p.downTracks[streamId], dt) p.lock.Unlock() - p.scheduleDownTrackBindingReports(streamId) + dt.OnBind(func() { + go p.scheduleDownTrackBindingReports(streamId) + }) } func (p *Participant) removeDownTrack(streamId string, dt *sfu.DownTrack) { @@ -371,10 +372,10 @@ func (p *Participant) scheduleDownTrackBindingReports(streamId string) { } go func() { - pkts := pkts + batch := pkts i := 0 for { - if err := p.peerConn.WriteRTCP(pkts); err != nil { + if err := p.peerConn.WriteRTCP(batch); err != nil { logger.GetLogger().Debugw("Sending track binding reports", "participant", p.id, "err", err) diff --git a/pkg/rtc/track.go b/pkg/rtc/track.go index 0c9ab8a6d..234d4bac5 100644 --- a/pkg/rtc/track.go +++ b/pkg/rtc/track.go @@ -105,11 +105,14 @@ func (t *Track) AddSubscriber(participant *Participant) error { if participant.peerConn.ConnectionState() == webrtc.PeerConnectionStateClosed { return } - if err := participant.peerConn.RemoveTrack(transceiver.Sender()); err != nil { - if _, ok := err.(*rtcerr.InvalidStateError); !ok { - logger.GetLogger().Warnw("could not remove remoteTrack from forwarder", - "participant", participant.ID(), - "err", err) + sender := transceiver.Sender() + if sender != nil { + if err := participant.peerConn.RemoveTrack(sender); err != nil { + if _, ok := err.(*rtcerr.InvalidStateError); !ok { + logger.GetLogger().Warnw("could not remove remoteTrack from forwarder", + "participant", participant.ID(), + "err", err) + } } } @@ -195,7 +198,15 @@ func (t *Track) forwardRTPWorker() { continue } - if err != nil { + if err == sfu.ErrRequiresKeyFrame { + logger.GetLogger().Infow("keyframe required, sending PLI") + // queue up a PLI, but don't block channel + go func() { + t.rtcpCh <- []rtcp.Packet{ + &rtcp.PictureLossIndication{SenderSSRC: forwarder.Track().SSRC(), MediaSSRC: pkt.SSRC}, + } + }() + } else if err != nil { logger.GetLogger().Warnw("could not forward packet to participant", "src", t.participantId, "dest", dstId, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 30f79b95b..8ed86476d 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -230,10 +230,7 @@ func (d *DownTrack) writeSimpleRTP(pkt rtp.Packet) error { } if !relay { // TODO: how do we sent PLI to the source? - //d.receiver.SendRTCP([]rtcp.Packet{ - // &rtcp.PictureLossIndication{SenderSSRC: d.ssrc, MediaSSRC: pkt.SSRC}, - //}) - return ErrRequiresKeyFrame + //return ErrRequiresKeyFrame } } d.snOffset = pkt.SequenceNumber - d.lastSN - 1