From d117cce37b8cf13b773dbdd8538b3e73d4b7aca1 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 2 Feb 2021 22:53:48 -0800 Subject: [PATCH] integration test around receive before publish --- cmd/cli/client/client.go | 38 +++++++++++++++++++++++++++++++++++ cmd/cli/client/trackwriter.go | 2 +- pkg/rtc/participant.go | 16 ++++++++++++--- test/integration_helpers.go | 1 + test/multinode_test.go | 27 +++++++++++++++++++------ test/scenarios.go | 29 +++++++++++++++++++++++++- 6 files changed, 102 insertions(+), 11 deletions(-) diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index a31ef2be6..72f083c24 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -12,6 +12,8 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/pion/rtcp" + "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/thoas/go-funk" "google.golang.org/protobuf/encoding/protojson" @@ -48,6 +50,10 @@ type RTCClient struct { pendingCandidates []*webrtc.ICECandidate pendingTrackWriters []*TrackWriter OnConnected func() + + // map of track Id and last packet + lastPackets map[string]*rtp.Packet + bytesReceived map[string]uint64 } var ( @@ -97,6 +103,8 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { remoteParticipants: make(map[string]*livekit.ParticipantInfo), PeerConn: peerConn, me: &webrtc.MediaEngine{}, + lastPackets: make(map[string]*rtp.Packet), + bytesReceived: make(map[string]uint64), } c.ctx, c.cancel = context.WithCancel(context.Background()) c.me.RegisterDefaultCodecs() @@ -543,6 +551,10 @@ func (c *RTCClient) processTrack(track *webrtc.TrackRemote) { logger.Debugw("error reading RTP", "err", err) continue } + c.lock.Lock() + c.lastPackets[pId] = pkt + c.bytesReceived[pId] += uint64(pkt.MarshalSize()) + c.lock.Unlock() numBytes += pkt.MarshalSize() if time.Now().Sub(lastUpdate) > 30*time.Second { logger.Debugw("consumed from participant", @@ -552,3 +564,29 @@ func (c *RTCClient) processTrack(track *webrtc.TrackRemote) { } } } + +func (c *RTCClient) BytesReceived() uint64 { + var total uint64 + c.lock.Lock() + for _, size := range c.bytesReceived { + total += size + } + c.lock.Unlock() + return total +} + +func (c *RTCClient) SendNacks(count int) { + var packets []rtcp.Packet + c.lock.Lock() + for _, pkt := range c.lastPackets { + seqs := make([]uint16, 0, count) + for i := 0; i < count; i++ { + seqs = append(seqs, pkt.SequenceNumber-uint16(i)) + } + packets = append(packets, &rtcp.TransportLayerNack{ + MediaSSRC: pkt.SSRC, + Nacks: rtcp.NackPairsFromSequenceNumbers(seqs), + }) + } + c.lock.Unlock() +} diff --git a/cmd/cli/client/trackwriter.go b/cmd/cli/client/trackwriter.go index 00e9d2ad5..4b187b024 100644 --- a/cmd/cli/client/trackwriter.go +++ b/cmd/cli/client/trackwriter.go @@ -86,7 +86,7 @@ func (w *TrackWriter) Stop() { func (w *TrackWriter) writeNull() { defer w.onWriteComplete() - sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: time.Second} + sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond} for { select { case <-time.After(20 * time.Millisecond): diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index c20290161..fdb4cc5e2 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -219,7 +219,9 @@ func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.S } logger.Debugw("answering client offer", "state", p.State().String(), - "participant", p.Identity()) + "participant", p.Identity(), + //"sdp", sdp.SDP, + ) if err = p.peerConn.SetRemoteDescription(sdp); err != nil { return @@ -244,6 +246,10 @@ func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.S } p.negotiationCond.L.Unlock() + logger.Debugw("sending to client answer", + "participant", p.Identity(), + //"sdp", sdp.SDP, + ) err = p.responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ Answer: ToProtoSessionDescription(answer), @@ -288,7 +294,9 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { return ErrUnexpectedOffer } logger.Debugw("setting participant answer", - "participant", p.Identity()) + "participant", p.Identity(), + //"sdp", sdp.SDP, + ) if err := p.peerConn.SetRemoteDescription(sdp); err != nil { return errors.Wrap(err, "could not set remote description") } @@ -491,7 +499,9 @@ func (p *ParticipantImpl) negotiate() { } logger.Debugw("sending offer to participant", - "participant", p.Identity()) + "participant", p.Identity(), + //"sdp", offer.SDP, + ) err = p.responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 88abfbb16..1ad5e9f8a 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -104,6 +104,7 @@ func waitForServerToStart(s *service.LivekitServer) { } func withTimeout(t *testing.T, description string, f func() bool) bool { + logger.Infow(description) ctx, _ := context.WithTimeout(context.Background(), connectTimeout) for { select { diff --git a/test/multinode_test.go b/test/multinode_test.go index 7640b1560..dddc8a22b 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -94,19 +94,34 @@ func TestConnectWithoutCreation(t *testing.T) { } // testing multiple scenarios rooms -func TestMultinodeScenarios(t *testing.T) { +func TestMultinodePublishingUponJoining(t *testing.T) { if testing.Short() { t.SkipNow() return } - logger.Infow("---Starting TestScenarios---") - defer logger.Infow("---Finishing TestScenarios---") + logger.Infow("---Starting TestMultinodePublishingUponJoining---") + defer logger.Infow("---Finishing TestMultinodePublishingUponJoining---") s1, s2 := setupMultiNodeTest() - - scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort) - defer s1.Stop() defer s2.Stop() + + scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort) +} + +func TestMultinodeReceiveBeforePublish(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + logger.Infow("---Starting TestMultinodeReceiveBeforePublish---") + defer logger.Infow("---Finishing TestMultinodeReceiveBeforePublish---") + + s1, s2 := setupMultiNodeTest() + defer s1.Stop() + defer s2.Stop() + + scenarioReceiveBeforePublish(t) } diff --git a/test/scenarios.go b/test/scenarios.go index 7d8668b01..e821081dc 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -69,7 +69,6 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) { writers = publishTracksForClients(t, c2) defer stopWriters(writers...) - logger.Infow("waiting for reconnected c2 tracks to publish") success = withTimeout(t, "new c2 tracks should be published again", func() bool { tracks := c3.SubscribedTracks() if len(tracks[c2.ID()]) != 2 { @@ -85,6 +84,34 @@ func scenarioPublishingUponJoining(t *testing.T, ports ...int) { } } +func scenarioReceiveBeforePublish(t *testing.T) { + c1 := createRTCClient("rbp_1", defaultServerPort) + c2 := createRTCClient("rbp_2", defaultServerPort) + + waitUntilConnected(t, c1, c2) + defer stopClients(c1, c2) + + // c1 publishes + writers := publishTracksForClients(t, c1) + defer stopWriters(writers...) + + // c2 should see some bytes flowing through + success := withTimeout(t, "waiting to receive bytes on c2", func() bool { + return c2.BytesReceived() > 20 + }) + if !success { + t.FailNow() + } + + // now publish on C2 + writers = publishTracksForClients(t, c2) + defer stopWriters(writers...) + + success = withTimeout(t, "waiting to receive c2 tracks on c1", func() bool { + return len(c1.SubscribedTracks()[c2.ID()]) == 2 + }) +} + // websocket reconnects func scenarioWSReconnect(t *testing.T) { c1 := createRTCClient("wsr_1", defaultServerPort)