From 0f746fe265544b5ea6bdd7b64e6221d1f772189d Mon Sep 17 00:00:00 2001 From: boks1971 Date: Fri, 15 Aug 2025 02:49:53 +0530 Subject: [PATCH] test --- pkg/rtc/participant.go | 4 +++- test/client/client.go | 12 ++++++++++++ test/singlenode_test.go | 31 ++++++++++++++++++++++--------- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 18322e4d7..2cf75ce61 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1897,6 +1897,8 @@ func (h PrimaryTransportHandler) OnFullyEstablished() { h.p.onPrimaryTransportFullyEstablished() } +// ---------------------------------------------------------- + func (p *ParticipantImpl) setupSignalling() { p.signalling = signalling.NewSignalling(signalling.SignallingParams{ Logger: p.params.Logger, @@ -1920,7 +1922,7 @@ func (p *ParticipantImpl) setupTransportManager() error { var pth transport.Handler = PublisherTransportHandler{ath} var sth transport.Handler = SubscriberTransportHandler{ath} - subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() && !p.params.UseOneShotSignallingMode + subscriberAsPrimary := !p.params.UseOneShotSignallingMode && ((p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe()) || p.ProtocolVersion().SupportsSinglePeerConnection()) if subscriberAsPrimary { sth = PrimaryTransportHandler{sth, p} } else { diff --git a/test/client/client.go b/test/client/client.go index 13cd6e84b..f0ccf9e65 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -81,6 +81,7 @@ type RTCClient struct { publisherFullyEstablished atomic.Bool subscriberFullyEstablished atomic.Bool pongReceivedAt atomic.Int64 + lastOffer atomic.Pointer[webrtc.SessionDescription] lastAnswer atomic.Pointer[webrtc.SessionDescription] // tracks waiting to be acked, cid => trackInfo @@ -387,6 +388,10 @@ func (c *RTCClient) ID() livekit.ParticipantID { return c.id } +func (c *RTCClient) ProtocolVersion() types.ProtocolVersion { + return c.protocolVersion +} + // create an offer for the server func (c *RTCClient) Run() error { c.conn.SetCloseHandler(func(code int, text string) error { @@ -884,6 +889,11 @@ func (c *RTCClient) GetPublishedTrackIDs() []string { return trackIDs } +// LastOffer return SDP of the last offer for the subscriber connection +func (c *RTCClient) LastOffer() *webrtc.SessionDescription { + return c.lastOffer.Load() +} + // LastAnswer return SDP of the last answer for the publisher connection func (c *RTCClient) LastAnswer() *webrtc.SessionDescription { return c.lastAnswer.Load() @@ -934,6 +944,8 @@ func (c *RTCClient) handleDataMessageUnlabeled(data []byte) { // handles a server initiated offer, handle on subscriber PC func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32) { + logger.Infow("handling server offer", "participant", c.localParticipant.Identity) + c.lastOffer.Store(&desc) c.subscriber.HandleRemoteDescription(desc, offerId) } diff --git a/test/singlenode_test.go b/test/singlenode_test.go index 6c9350d3c..e2ebe6532 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -567,6 +567,7 @@ func TestSingleNodeAttributes(t *testing.T) { }, UseJoinRequestQueryParam: true, }) + grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom} grant.SetCanSubscribe(false) at := auth.NewAccessToken(testApiKey, testApiSecret). @@ -622,22 +623,34 @@ func TestDeviceCodecOverride(t *testing.T) { require.NoError(t, err) defer stopWriters(tw) + var sd *webrtc.SessionDescription // wait for server to receive track - require.Eventually(t, func() bool { - return c1.LastAnswer() != nil - }, waitTimeout, waitTick, "did not receive answer") + if !c1.ProtocolVersion().SupportsSinglePeerConnection() { + require.Eventually(t, func() bool { + return c1.LastAnswer() != nil + }, waitTimeout, waitTick, "did not receive answer") - sd := webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: c1.LastAnswer().SDP, + sd = &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: c1.LastAnswer().SDP, + } + } else { + require.Eventually(t, func() bool { + return c1.LastOffer() != nil + }, waitTimeout, waitTick, "did not receive offer") + + sd = &webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: c1.LastOffer().SDP, + } } - answer, err := sd.Unmarshal() + marshaled, err := sd.Unmarshal() require.NoError(t, err) // video and data channel - require.Len(t, answer.MediaDescriptions, 2) + require.Len(t, marshaled.MediaDescriptions, 2) var desc *sdp.MediaDescription - for _, md := range answer.MediaDescriptions { + for _, md := range marshaled.MediaDescriptions { if md.MediaName.Media == "video" { desc = md break