integration test around receive before publish

This commit is contained in:
David Zhao
2021-02-02 22:53:48 -08:00
parent 88e247d95a
commit d117cce37b
6 changed files with 102 additions and 11 deletions

View File

@@ -12,6 +12,8 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/thoas/go-funk"
"google.golang.org/protobuf/encoding/protojson"
@@ -48,6 +50,10 @@ type RTCClient struct {
pendingCandidates []*webrtc.ICECandidate
pendingTrackWriters []*TrackWriter
OnConnected func()
// map of track Id and last packet
lastPackets map[string]*rtp.Packet
bytesReceived map[string]uint64
}
var (
@@ -97,6 +103,8 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
remoteParticipants: make(map[string]*livekit.ParticipantInfo),
PeerConn: peerConn,
me: &webrtc.MediaEngine{},
lastPackets: make(map[string]*rtp.Packet),
bytesReceived: make(map[string]uint64),
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.me.RegisterDefaultCodecs()
@@ -543,6 +551,10 @@ func (c *RTCClient) processTrack(track *webrtc.TrackRemote) {
logger.Debugw("error reading RTP", "err", err)
continue
}
c.lock.Lock()
c.lastPackets[pId] = pkt
c.bytesReceived[pId] += uint64(pkt.MarshalSize())
c.lock.Unlock()
numBytes += pkt.MarshalSize()
if time.Now().Sub(lastUpdate) > 30*time.Second {
logger.Debugw("consumed from participant",
@@ -552,3 +564,29 @@ func (c *RTCClient) processTrack(track *webrtc.TrackRemote) {
}
}
}
func (c *RTCClient) BytesReceived() uint64 {
var total uint64
c.lock.Lock()
for _, size := range c.bytesReceived {
total += size
}
c.lock.Unlock()
return total
}
func (c *RTCClient) SendNacks(count int) {
var packets []rtcp.Packet
c.lock.Lock()
for _, pkt := range c.lastPackets {
seqs := make([]uint16, 0, count)
for i := 0; i < count; i++ {
seqs = append(seqs, pkt.SequenceNumber-uint16(i))
}
packets = append(packets, &rtcp.TransportLayerNack{
MediaSSRC: pkt.SSRC,
Nacks: rtcp.NackPairsFromSequenceNumbers(seqs),
})
}
c.lock.Unlock()
}

View File

@@ -86,7 +86,7 @@ func (w *TrackWriter) Stop() {
func (w *TrackWriter) writeNull() {
defer w.onWriteComplete()
sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: time.Second}
sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond}
for {
select {
case <-time.After(20 * time.Millisecond):

View File

@@ -219,7 +219,9 @@ func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.S
}
logger.Debugw("answering client offer", "state", p.State().String(),
"participant", p.Identity())
"participant", p.Identity(),
//"sdp", sdp.SDP,
)
if err = p.peerConn.SetRemoteDescription(sdp); err != nil {
return
@@ -244,6 +246,10 @@ func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.S
}
p.negotiationCond.L.Unlock()
logger.Debugw("sending to client answer",
"participant", p.Identity(),
//"sdp", sdp.SDP,
)
err = p.responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
@@ -288,7 +294,9 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
return ErrUnexpectedOffer
}
logger.Debugw("setting participant answer",
"participant", p.Identity())
"participant", p.Identity(),
//"sdp", sdp.SDP,
)
if err := p.peerConn.SetRemoteDescription(sdp); err != nil {
return errors.Wrap(err, "could not set remote description")
}
@@ -491,7 +499,9 @@ func (p *ParticipantImpl) negotiate() {
}
logger.Debugw("sending offer to participant",
"participant", p.Identity())
"participant", p.Identity(),
//"sdp", offer.SDP,
)
err = p.responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),

View File

@@ -104,6 +104,7 @@ func waitForServerToStart(s *service.LivekitServer) {
}
func withTimeout(t *testing.T, description string, f func() bool) bool {
logger.Infow(description)
ctx, _ := context.WithTimeout(context.Background(), connectTimeout)
for {
select {

View File

@@ -94,19 +94,34 @@ func TestConnectWithoutCreation(t *testing.T) {
}
// testing multiple scenarios rooms
func TestMultinodeScenarios(t *testing.T) {
func TestMultinodePublishingUponJoining(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
logger.Infow("---Starting TestScenarios---")
defer logger.Infow("---Finishing TestScenarios---")
logger.Infow("---Starting TestMultinodePublishingUponJoining---")
defer logger.Infow("---Finishing TestMultinodePublishingUponJoining---")
s1, s2 := setupMultiNodeTest()
scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort)
defer s1.Stop()
defer s2.Stop()
scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort)
}
func TestMultinodeReceiveBeforePublish(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
logger.Infow("---Starting TestMultinodeReceiveBeforePublish---")
defer logger.Infow("---Finishing TestMultinodeReceiveBeforePublish---")
s1, s2 := setupMultiNodeTest()
defer s1.Stop()
defer s2.Stop()
scenarioReceiveBeforePublish(t)
}

View File

@@ -69,7 +69,6 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) {
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
logger.Infow("waiting for reconnected c2 tracks to publish")
success = withTimeout(t, "new c2 tracks should be published again", func() bool {
tracks := c3.SubscribedTracks()
if len(tracks[c2.ID()]) != 2 {
@@ -85,6 +84,34 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) {
}
}
func scenarioReceiveBeforePublish(t *testing.T) {
c1 := createRTCClient("rbp_1", defaultServerPort)
c2 := createRTCClient("rbp_2", defaultServerPort)
waitUntilConnected(t, c1, c2)
defer stopClients(c1, c2)
// c1 publishes
writers := publishTracksForClients(t, c1)
defer stopWriters(writers...)
// c2 should see some bytes flowing through
success := withTimeout(t, "waiting to receive bytes on c2", func() bool {
return c2.BytesReceived() > 20
})
if !success {
t.FailNow()
}
// now publish on C2
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
success = withTimeout(t, "waiting to receive c2 tracks on c1", func() bool {
return len(c1.SubscribedTracks()[c2.ID()]) == 2
})
}
// websocket reconnects
func scenarioWSReconnect(t *testing.T) {
c1 := createRTCClient("wsr_1", defaultServerPort)