This commit is contained in:
David Zhao
2021-02-02 23:25:29 -08:00
parent 5578b74798
commit 5dec5b1ae2
7 changed files with 31 additions and 22 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ require (
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pion/ion-log v1.0.0
github.com/pion/ion-sfu v1.8.1
github.com/pion/ion-sfu v1.8.2
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.2
github.com/pion/stun v0.3.5
+7
View File
@@ -158,6 +158,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.5 h1:kxhtnfFVi+rYdOALN0B3k9UT86zVJKfBimRaciULW4I=
github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.4.0 h1:kXcsA/rIGzJImVqPdhfnr6q0xsS9gU0515q1EPpJ9fE=
github.com/google/wire v0.4.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@@ -316,12 +318,16 @@ github.com/pion/dtls/v2 v2.0.4 h1:WuUcqi6oYMu/noNTz92QrF1DaFj4eXbhQ6dzaaAwOiI=
github.com/pion/dtls/v2 v2.0.4/go.mod h1:qAkFscX0ZHoI1E07RfYPoRw3manThveu+mlTDdOxoGI=
github.com/pion/ice/v2 v2.0.14 h1:FxXxauyykf89SWAtkQCfnHkno6G8+bhRkNguSh9zU+4=
github.com/pion/ice/v2 v2.0.14/go.mod h1:wqaUbOq5ObDNU5ox1hRsEst0rWfsKuH1zXjQFEWiZwM=
github.com/pion/ice/v2 v2.0.15 h1:KZrwa2ciL9od8+TUVJiYTNsCW9J5lktBjGwW1MacEnQ=
github.com/pion/ice/v2 v2.0.15/go.mod h1:ZIiVGevpgAxF/cXiIVmuIUtCb3Xs4gCzCbXB6+nFkSI=
github.com/pion/interceptor v0.0.9 h1:fk5hTdyLO3KURQsf/+RjMpEm4NE3yeTY9Kh97b5BvwA=
github.com/pion/interceptor v0.0.9/go.mod h1:dHgEP5dtxOTf21MObuBAjJeAayPxLUAZjerGH8Xr07c=
github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q=
github.com/pion/ion-log v1.0.0/go.mod h1:jwcla9KoB9bB/4FxYDSRJPcPYSLp5XiUUMnOLaqwl4E=
github.com/pion/ion-sfu v1.8.1 h1:N3xaDZ2Om2pCzPLlusPmV7BJYVedE5VEx8um6d0Crns=
github.com/pion/ion-sfu v1.8.1/go.mod h1:b6FI2WFTBsifw9EhxvfDks5/9SGYmVvm00IogZEgaik=
github.com/pion/ion-sfu v1.8.2 h1:YIkelxhX5EZgWkmj6XY+0AVKi/se8W1bkOobEe2qjYc=
github.com/pion/ion-sfu v1.8.2/go.mod h1:MKDh4JSLpBA/hFeYAYALb6nYpe6Ruknd3pmxPMbdpM8=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=
@@ -347,6 +353,7 @@ github.com/pion/transport v0.10.1 h1:2W+yJT+0mOQ160ThZYUx5Zp2skzshiNgxrNE9GUfhJM
github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A=
github.com/pion/transport v0.12.0 h1:UFmOBBZkTZ3LgvLRf/NGrfWdZEubcU6zkLU3PsA9YvU=
github.com/pion/transport v0.12.0/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/transport v0.12.1/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/transport v0.12.2 h1:WYEjhloRHt1R86LhUKjC5y+P52Y11/QqEUalvtzVoys=
github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/turn/v2 v2.0.5 h1:iwMHqDfPEDEOFzwWKT56eFmh6DYC6o/+xnLAEzgISbA=
+2 -2
View File
@@ -273,7 +273,7 @@ func (t *MediaTrack) forwardRTPWorker() {
// "srcParticipant", t.participantId,
// "destParticipant", dstId,
// "track", t.ID())
err := dt.WriteRTP(pkt)
err := dt.WriteRTP(pkt.Packet)
if IsEOF(err) {
// this participant unsubscribed, remove it
t.RemoveSubscriber(dstId)
@@ -288,7 +288,7 @@ func (t *MediaTrack) forwardRTPWorker() {
logger.Infow("keyframe required, sending PLI",
"srcParticipant", t.participantId)
rtcpPkts := []rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC: uint32(t.ssrc), MediaSSRC: pkt.SSRC},
&rtcp.PictureLossIndication{SenderSSRC: uint32(t.ssrc), MediaSSRC: pkt.Packet.SSRC},
}
t.rtcpCh.Write(rtcpPkts)
t.lastPLI = time.Now()
+7 -7
View File
@@ -6,8 +6,8 @@ import (
"time"
"github.com/gammazero/workerpool"
"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/assert"
@@ -26,7 +26,7 @@ func TestForwardRTP(t *testing.T) {
t.Run("ensure that forwarders are getting packets", func(t *testing.T) {
mt := newMediaTrackWithReceiver()
receiver := mt.receiver.(*typesfakes.FakeReceiver)
packet := rtp.Packet{}
packet := buffer.ExtPacket{}
receiver.RTPChanReturns(packetGenerator(packet))
dt := &typesfakes.FakeDownTrack{}
@@ -36,7 +36,7 @@ func TestForwardRTP(t *testing.T) {
time.Sleep(testWaitDuration)
assert.Equal(t, 1, dt.WriteRTPCallCount(), "WriteRTP wasn't called on Forwarder")
assert.EqualValues(t, packet, dt.WriteRTPArgsForCall(0))
assert.EqualValues(t, packet.Packet, dt.WriteRTPArgsForCall(0))
})
t.Run("muted tracks do not forward data", func(t *testing.T) {
@@ -80,9 +80,9 @@ func TestMissingKeyFrames(t *testing.T) {
// returns a receiver that reads a packet then returns EOF
func newMediaTrackWithReceiver() *MediaTrack {
packet := rtp.Packet{}
packet := buffer.ExtPacket{}
receiver := &typesfakes.FakeReceiver{
RTPChanStub: func() <-chan rtp.Packet {
RTPChanStub: func() <-chan buffer.ExtPacket {
return packetGenerator(packet)
},
}
@@ -100,8 +100,8 @@ func newMediaTrackWithReceiver() *MediaTrack {
}
}
func packetGenerator(packets ...rtp.Packet) <-chan rtp.Packet {
pc := make(chan rtp.Packet)
func packetGenerator(packets ...buffer.ExtPacket) <-chan buffer.ExtPacket {
pc := make(chan buffer.ExtPacket)
go func() {
defer close(pc)
for _, p := range packets {
+1 -1
View File
@@ -94,6 +94,6 @@ func (r *ReceiverImpl) GetBufferedPacket(pktBuf []byte, sn uint16, snOffset uint
return
}
func (r *ReceiverImpl) RTPChan() <-chan rtp.Packet {
func (r *ReceiverImpl) RTPChan() <-chan buffer.ExtPacket {
return r.buffer.PacketChan()
}
+2 -1
View File
@@ -3,6 +3,7 @@ package types
import (
"time"
"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
@@ -106,7 +107,7 @@ type PublishedTrack interface {
//counterfeiter:generate . Receiver
type Receiver interface {
RTPChan() <-chan rtp.Packet
RTPChan() <-chan buffer.ExtPacket
GetBufferedPacket(pktBuf []byte, sn uint16, snOffset uint16) (rtp.Packet, error)
}
+11 -10
View File
@@ -5,6 +5,7 @@ import (
"sync"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/rtp"
)
@@ -24,15 +25,15 @@ type FakeReceiver struct {
result1 rtp.Packet
result2 error
}
RTPChanStub func() <-chan rtp.Packet
RTPChanStub func() <-chan buffer.ExtPacket
rTPChanMutex sync.RWMutex
rTPChanArgsForCall []struct {
}
rTPChanReturns struct {
result1 <-chan rtp.Packet
result1 <-chan buffer.ExtPacket
}
rTPChanReturnsOnCall map[int]struct {
result1 <-chan rtp.Packet
result1 <-chan buffer.ExtPacket
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
@@ -109,7 +110,7 @@ func (fake *FakeReceiver) GetBufferedPacketReturnsOnCall(i int, result1 rtp.Pack
}{result1, result2}
}
func (fake *FakeReceiver) RTPChan() <-chan rtp.Packet {
func (fake *FakeReceiver) RTPChan() <-chan buffer.ExtPacket {
fake.rTPChanMutex.Lock()
ret, specificReturn := fake.rTPChanReturnsOnCall[len(fake.rTPChanArgsForCall)]
fake.rTPChanArgsForCall = append(fake.rTPChanArgsForCall, struct {
@@ -133,32 +134,32 @@ func (fake *FakeReceiver) RTPChanCallCount() int {
return len(fake.rTPChanArgsForCall)
}
func (fake *FakeReceiver) RTPChanCalls(stub func() <-chan rtp.Packet) {
func (fake *FakeReceiver) RTPChanCalls(stub func() <-chan buffer.ExtPacket) {
fake.rTPChanMutex.Lock()
defer fake.rTPChanMutex.Unlock()
fake.RTPChanStub = stub
}
func (fake *FakeReceiver) RTPChanReturns(result1 <-chan rtp.Packet) {
func (fake *FakeReceiver) RTPChanReturns(result1 <-chan buffer.ExtPacket) {
fake.rTPChanMutex.Lock()
defer fake.rTPChanMutex.Unlock()
fake.RTPChanStub = nil
fake.rTPChanReturns = struct {
result1 <-chan rtp.Packet
result1 <-chan buffer.ExtPacket
}{result1}
}
func (fake *FakeReceiver) RTPChanReturnsOnCall(i int, result1 <-chan rtp.Packet) {
func (fake *FakeReceiver) RTPChanReturnsOnCall(i int, result1 <-chan buffer.ExtPacket) {
fake.rTPChanMutex.Lock()
defer fake.rTPChanMutex.Unlock()
fake.RTPChanStub = nil
if fake.rTPChanReturnsOnCall == nil {
fake.rTPChanReturnsOnCall = make(map[int]struct {
result1 <-chan rtp.Packet
result1 <-chan buffer.ExtPacket
})
}
fake.rTPChanReturnsOnCall[i] = struct {
result1 <-chan rtp.Packet
result1 <-chan buffer.ExtPacket
}{result1}
}