This commit is contained in:
boks1971
2025-08-15 02:49:53 +05:30
parent 3842882a04
commit 0f746fe265
3 changed files with 37 additions and 10 deletions
+3 -1
View File
@@ -1897,6 +1897,8 @@ func (h PrimaryTransportHandler) OnFullyEstablished() {
h.p.onPrimaryTransportFullyEstablished()
}
// ----------------------------------------------------------
func (p *ParticipantImpl) setupSignalling() {
p.signalling = signalling.NewSignalling(signalling.SignallingParams{
Logger: p.params.Logger,
@@ -1920,7 +1922,7 @@ func (p *ParticipantImpl) setupTransportManager() error {
var pth transport.Handler = PublisherTransportHandler{ath}
var sth transport.Handler = SubscriberTransportHandler{ath}
subscriberAsPrimary := p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() && !p.params.UseOneShotSignallingMode
subscriberAsPrimary := !p.params.UseOneShotSignallingMode && ((p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe()) || p.ProtocolVersion().SupportsSinglePeerConnection())
if subscriberAsPrimary {
sth = PrimaryTransportHandler{sth, p}
} else {
+12
View File
@@ -81,6 +81,7 @@ type RTCClient struct {
publisherFullyEstablished atomic.Bool
subscriberFullyEstablished atomic.Bool
pongReceivedAt atomic.Int64
lastOffer atomic.Pointer[webrtc.SessionDescription]
lastAnswer atomic.Pointer[webrtc.SessionDescription]
// tracks waiting to be acked, cid => trackInfo
@@ -387,6 +388,10 @@ func (c *RTCClient) ID() livekit.ParticipantID {
return c.id
}
func (c *RTCClient) ProtocolVersion() types.ProtocolVersion {
return c.protocolVersion
}
// create an offer for the server
func (c *RTCClient) Run() error {
c.conn.SetCloseHandler(func(code int, text string) error {
@@ -884,6 +889,11 @@ func (c *RTCClient) GetPublishedTrackIDs() []string {
return trackIDs
}
// LastOffer return SDP of the last offer for the subscriber connection
func (c *RTCClient) LastOffer() *webrtc.SessionDescription {
return c.lastOffer.Load()
}
// LastAnswer return SDP of the last answer for the publisher connection
func (c *RTCClient) LastAnswer() *webrtc.SessionDescription {
return c.lastAnswer.Load()
@@ -934,6 +944,8 @@ func (c *RTCClient) handleDataMessageUnlabeled(data []byte) {
// handles a server initiated offer, handle on subscriber PC
func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32) {
logger.Infow("handling server offer", "participant", c.localParticipant.Identity)
c.lastOffer.Store(&desc)
c.subscriber.HandleRemoteDescription(desc, offerId)
}
+22 -9
View File
@@ -567,6 +567,7 @@ func TestSingleNodeAttributes(t *testing.T) {
},
UseJoinRequestQueryParam: true,
})
grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
grant.SetCanSubscribe(false)
at := auth.NewAccessToken(testApiKey, testApiSecret).
@@ -622,22 +623,34 @@ func TestDeviceCodecOverride(t *testing.T) {
require.NoError(t, err)
defer stopWriters(tw)
var sd *webrtc.SessionDescription
// wait for server to receive track
require.Eventually(t, func() bool {
return c1.LastAnswer() != nil
}, waitTimeout, waitTick, "did not receive answer")
if !c1.ProtocolVersion().SupportsSinglePeerConnection() {
require.Eventually(t, func() bool {
return c1.LastAnswer() != nil
}, waitTimeout, waitTick, "did not receive answer")
sd := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: c1.LastAnswer().SDP,
sd = &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: c1.LastAnswer().SDP,
}
} else {
require.Eventually(t, func() bool {
return c1.LastOffer() != nil
}, waitTimeout, waitTick, "did not receive offer")
sd = &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: c1.LastOffer().SDP,
}
}
answer, err := sd.Unmarshal()
marshaled, err := sd.Unmarshal()
require.NoError(t, err)
// video and data channel
require.Len(t, answer.MediaDescriptions, 2)
require.Len(t, marshaled.MediaDescriptions, 2)
var desc *sdp.MediaDescription
for _, md := range answer.MediaDescriptions {
for _, md := range marshaled.MediaDescriptions {
if md.MediaName.Media == "video" {
desc = md
break