From ceea024fdbf8bf5a1caa04f1b9b878c35696f0bf Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 11 Sep 2021 22:50:00 -0700 Subject: [PATCH] fix tests --- test/client/client.go | 9 ++++++++- test/scenarios.go | 6 +++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/test/client/client.go b/test/client/client.go index 0cc49c4a4..104f6f3df 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -196,7 +196,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { c.publisher.OnOffer(c.onOffer) c.subscriber.PeerConnection().OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - logger.Infow("ICE state has changed", "state", connectionState.String(), + logger.Infow("subscriber ICE state has changed", "state", connectionState.String(), "participant", c.localParticipant.Identity) if connectionState == webrtc.ICEConnectionStateConnected { // flush peers @@ -219,6 +219,8 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { }) c.publisher.PeerConnection().OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.Infow("publisher ICE state changed", "state", state.String(), + "participant", c.localParticipant.Identity) if state == webrtc.ICEConnectionStateConnected { c.publisherConnected.TrySet(true) } else { @@ -380,6 +382,7 @@ func (c *RTCClient) RemoteParticipants() []*livekit.ParticipantInfo { } func (c *RTCClient) Stop() { + logger.Infow("stopping client", "ID", c.ID()) _ = c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Leave{ Leave: &livekit.LeaveRequest{}, @@ -525,6 +528,8 @@ func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error if err != nil { return err } + + logger.Infow("trying to publish") if kind == livekit.DataPacket_RELIABLE { return c.reliableDC.Send(payload) } else { @@ -551,6 +556,8 @@ func (c *RTCClient) ensurePublisherConnected() error { return fmt.Errorf("could not connect publisher after timeout") case <-time.After(10 * time.Millisecond): if c.publisherConnected.Get() { + // TODO: pion seem to have a timing issue here. data channel is not actually open at this moment + time.Sleep(10 * time.Millisecond) return nil } } diff --git a/test/scenarios.go b/test/scenarios.go index 0ff28515d..aca4f7c3e 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -129,12 +129,16 @@ func scenarioDataPublish(t *testing.T) { received := utils.AtomicFlag{} c2.OnDataReceived = func(data []byte, sid string) { - if string(data) == payload && sid == c2.ID() { + if string(data) == payload && sid == c1.ID() { received.TrySet(true) } } require.NoError(t, c1.PublishData([]byte(payload), livekit.DataPacket_LOSSY)) + + testutils.WithTimeout(t, "waiting for c2 to receive data", func() bool { + return received.Get() + }) } // websocket reconnects