diff --git a/go.mod b/go.mod index 3555fff74..9ae18f230 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 - github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab + github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e github.com/livekit/psrpc v0.7.0 github.com/mackerelio/go-osstat v0.2.6 github.com/magefile/mage v1.15.0 @@ -41,7 +41,7 @@ require ( github.com/pion/sdp/v3 v3.0.16 github.com/pion/transport/v3 v3.0.8 github.com/pion/turn/v4 v4.1.1 - github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac + github.com/pion/webrtc/v4 v4.1.6 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.0 github.com/redis/go-redis/v9 v9.14.0 @@ -55,7 +55,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b + golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b golang.org/x/mod v0.29.0 golang.org/x/sync v0.17.0 google.golang.org/protobuf v1.36.10 @@ -144,11 +144,9 @@ require ( golang.org/x/net v0.46.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect - golang.org/x/tools v0.37.0 // indirect + golang.org/x/tools v0.38.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251014184007-4626949a642f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251014184007-4626949a642f // indirect google.golang.org/grpc v1.76.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) - -tool github.com/maxbrunsfeld/counterfeiter/v6 diff --git a/go.sum b/go.sum index f5eab17a2..c4695f67b 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo= github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab h1:kPHpoS+vyi+yOxrBe6h/MW0/BZk4YfF3JozfEOUkz5Y= -github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A= +github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e h1:2pgUHq8atL01StoCTrK760eYTKDoDz1nC4a90knEw5g= +github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A= github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU= github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0= @@ -266,8 +266,8 @@ github.com/pion/transport/v3 v3.0.8 h1:oI3myyYnTKUSTthu/NZZ8eu2I5sHbxbUNNFW62ola github.com/pion/transport/v3 v3.0.8/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ= github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc= github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8= -github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac h1:Xy4aPmPyPpSM7nPK+OUkKx4WQPaa8mkOboalo0J179Q= -github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU= +github.com/pion/webrtc/v4 v4.1.6 h1:srHH2HwvCGwPba25EYJgUzgLqCQoXl1VCUnrGQMSzUw= +github.com/pion/webrtc/v4 v4.1.6/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -369,8 +369,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b h1:18qgiDvlvH7kk8Ioa8Ov+K6xCi0GMvmGfGW0sgd/SYA= +golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -473,8 +473,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= -golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE= -golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8e8d4092c..0a42204b7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1253,7 +1253,7 @@ func (p *ParticipantImpl) HandleICETrickle(trickleRequest *livekit.TrickleReques // HandleOffer an offer from remote participant, used when clients make the initial connection func (p *ParticipantImpl) HandleOffer(sd *livekit.SessionDescription) error { - offer, offerId := protosignalling.FromProtoSessionDescription(sd) + offer, offerId, _ := protosignalling.FromProtoSessionDescription(sd) lgr := p.pubLogger.WithUnlikelyValues( "transport", livekit.SignalTarget_PUBLISHER, "offer", offer, @@ -1317,13 +1317,10 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription, an "transport", livekit.SignalTarget_PUBLISHER, "answer", answer, "answerId", answerId, + "midToTrackID", midToTrackID, ) - if p.params.UseSinglePeerConnection { - return p.sendMappedSdpAnswer(answer, answerId, midToTrackID) - } - - return p.sendSdpAnswer(answer, answerId) + return p.sendSdpAnswer(answer, answerId, midToTrackID) } func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, uint32, error) { @@ -1349,7 +1346,7 @@ func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, uint32, error) // HandleAnswer handles a client answer response, with subscriber PC, server initiates the // offer and client answers func (p *ParticipantImpl) HandleAnswer(sd *livekit.SessionDescription) { - answer, answerId := protosignalling.FromProtoSessionDescription(sd) + answer, answerId, _ := protosignalling.FromProtoSessionDescription(sd) p.subLogger.Debugw( "received answer", "transport", livekit.SignalTarget_SUBSCRIBER, @@ -1987,8 +1984,8 @@ type SubscriberTransportHandler struct { AnyTransportHandler } -func (h SubscriberTransportHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error { - return h.p.onSubscriberOffer(sd, offerId) +func (h SubscriberTransportHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { + return h.p.onSubscriberOffer(sd, offerId, midToTrackID) } func (h SubscriberTransportHandler) OnStreamStateChange(update *streamallocator.StreamStateUpdate) error { @@ -2261,14 +2258,15 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) { } // when the server has an offer for participant -func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription, offerId uint32) error { +func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { p.subLogger.Debugw( "sending offer", "transport", livekit.SignalTarget_SUBSCRIBER, "offer", offer, "offerId", offerId, + "midToTrackID", midToTrackID, ) - return p.sendSdpOffer(offer, offerId) + return p.sendSdpOffer(offer, offerId, midToTrackID) } func (p *ParticipantImpl) removePublishedTrack(track types.MediaTrack) { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 5cd20565d..0e7d50946 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -386,7 +386,7 @@ func TestDisableCodecs(t *testing.T) { sink.WriteMessageCalls(func(msg proto.Message) error { if res, ok := msg.(*livekit.SignalResponse); ok { if res.GetAnswer() != nil { - answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer()) + answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer()) answerReceived.Store(true) answerIdReceived.Store(answerId) } @@ -578,7 +578,7 @@ func TestPreferMediaCodecForPublisher(t *testing.T) { sink.WriteMessageCalls(func(msg proto.Message) error { if res, ok := msg.(*livekit.SignalResponse); ok { if res.GetAnswer() != nil { - answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer()) + answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer()) pc.SetRemoteDescription(answer) answerReceived.Store(true) answerIdReceived.Store(answerId) @@ -698,7 +698,7 @@ func TestPreferAudioCodecForRed(t *testing.T) { sink.WriteMessageCalls(func(msg proto.Message) error { if res, ok := msg.(*livekit.SignalResponse); ok { if res.GetAnswer() != nil { - answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer()) + answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer()) pc.SetRemoteDescription(answer) answerReceived.Store(true) answerIdReceived.Store(answerId) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index b10ff7d57..ae7d8bbf8 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -295,16 +295,12 @@ func (p *ParticipantImpl) sendLeaveRequest( return nil } -func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error { - return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId))) +func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error { + return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId, midToTrackID))) } -func (p *ParticipantImpl) sendMappedSdpAnswer(answer webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error { - return p.signaller.WriteMessage(p.signalling.SignalMappedSdpAnswer(protosignalling.ToProtoMappedSessionDescription(answer, answerId, midToTrackID))) -} - -func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error { - return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId))) +func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { + return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId, midToTrackID))) } func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error { diff --git a/pkg/rtc/signalling/interfaces.go b/pkg/rtc/signalling/interfaces.go index 1ec960981..e49832452 100644 --- a/pkg/rtc/signalling/interfaces.go +++ b/pkg/rtc/signalling/interfaces.go @@ -52,7 +52,6 @@ type ParticipantSignalling interface { SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message - SignalMappedSdpAnswer(mappedAnswer *livekit.MappedSessionDescription) proto.Message SignalSdpOffer(offer *livekit.SessionDescription) proto.Message SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message diff --git a/pkg/rtc/signalling/signalling.go b/pkg/rtc/signalling/signalling.go index c8ff2d887..8b08a6d02 100644 --- a/pkg/rtc/signalling/signalling.go +++ b/pkg/rtc/signalling/signalling.go @@ -173,14 +173,6 @@ func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Messa } } -func (s *signalling) SignalMappedSdpAnswer(mappedAnswer *livekit.MappedSessionDescription) proto.Message { - return &livekit.SignalResponse{ - Message: &livekit.SignalResponse_MappedAnswer{ - MappedAnswer: mappedAnswer, - }, - } -} - func (s *signalling) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message { return &livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ diff --git a/pkg/rtc/signalling/signallingunimplemented.go b/pkg/rtc/signalling/signallingunimplemented.go index a1e5f507e..595877b02 100644 --- a/pkg/rtc/signalling/signallingunimplemented.go +++ b/pkg/rtc/signalling/signallingunimplemented.go @@ -88,10 +88,6 @@ func (u *signallingUnimplemented) SignalSdpAnswer(answer *livekit.SessionDescrip return nil } -func (u *signallingUnimplemented) SignalMappedSdpAnswer(answer *livekit.MappedSessionDescription) proto.Message { - return nil -} - func (u *signallingUnimplemented) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message { return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index ecff64b27..f8e5b5a0d 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -2524,7 +2524,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { ) } - if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil { + if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping()); err != nil { prometheus.RecordServiceOperationError("offer", "write_message") return errors.Wrap(err, "could not send offer") } @@ -2640,13 +2640,6 @@ func (t *PCTransport) createAndSendAnswer() error { return errors.Wrap(err, "setting local description failed") } - midToTrackID := map[string]string{} - for _, tr := range t.pc.GetTransceivers() { - if tr.Sender() != nil && tr.Mid() != "" { - midToTrackID[tr.Mid()] = tr.Sender().Track().ID() - } - } - // // Filter after setting local description as pion expects the answer // to match between CreateAnswer and SetLocalDescription. @@ -2668,7 +2661,8 @@ func (t *PCTransport) createAndSendAnswer() error { } answerId := t.remoteOfferId.Load() - if err := t.params.Handler.OnAnswer(answer, answerId, midToTrackID); err != nil { + + if err := t.params.Handler.OnAnswer(answer, answerId, t.getMidToTrackIDMapping()); err != nil { prometheus.RecordServiceOperationError("answer", "write_message") return errors.Wrap(err, "could not send answer") } @@ -2850,7 +2844,7 @@ func (t *PCTransport) doICERestart() error { ) } - err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc()) + err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping()) if err != nil { prometheus.RecordServiceOperationError("offer", "write_message") } else { @@ -2907,6 +2901,17 @@ func (t *PCTransport) outputAndClearICEStats() { } } +func (t *PCTransport) getMidToTrackIDMapping() map[string]string { + transceivers := t.pc.GetTransceivers() + midToTrackID := make(map[string]string, len(transceivers)) + for _, tr := range transceivers { + if tr.Sender() != nil && tr.Sender().Track() != nil && tr.Mid() != "" { + midToTrackID[tr.Mid()] = tr.Sender().Track().ID() + } + } + return midToTrackID +} + // ---------------------- type configureSenderParams struct { diff --git a/pkg/rtc/transport/handler.go b/pkg/rtc/transport/handler.go index f3bf2c518..980f28a77 100644 --- a/pkg/rtc/transport/handler.go +++ b/pkg/rtc/transport/handler.go @@ -42,7 +42,7 @@ type Handler interface { OnDataMessage(kind livekit.DataPacket_Kind, data []byte) OnDataMessageUnlabeled(data []byte) OnDataSendError(err error) - OnOffer(sd webrtc.SessionDescription, offerId uint32) error + OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error OnSetRemoteDescriptionOffer() OnAnswer(sd webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error OnNegotiationStateChanged(state NegotiationState) @@ -63,7 +63,7 @@ func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *we func (h UnimplementedHandler) OnDataMessage(kind livekit.DataPacket_Kind, data []byte) {} func (h UnimplementedHandler) OnDataMessageUnlabeled(data []byte) {} func (h UnimplementedHandler) OnDataSendError(err error) {} -func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error { +func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { return ErrNoOfferHandler } func (h UnimplementedHandler) OnSetRemoteDescriptionOffer() {} diff --git a/pkg/rtc/transport/transportfakes/fake_handler.go b/pkg/rtc/transport/transportfakes/fake_handler.go index 23c916042..ce332e145 100644 --- a/pkg/rtc/transport/transportfakes/fake_handler.go +++ b/pkg/rtc/transport/transportfakes/fake_handler.go @@ -76,11 +76,12 @@ type FakeHandler struct { onNegotiationStateChangedArgsForCall []struct { arg1 transport.NegotiationState } - OnOfferStub func(webrtc.SessionDescription, uint32) error + OnOfferStub func(webrtc.SessionDescription, uint32, map[string]string) error onOfferMutex sync.RWMutex onOfferArgsForCall []struct { arg1 webrtc.SessionDescription arg2 uint32 + arg3 map[string]string } onOfferReturns struct { result1 error @@ -494,19 +495,20 @@ func (fake *FakeHandler) OnNegotiationStateChangedArgsForCall(i int) transport.N return argsForCall.arg1 } -func (fake *FakeHandler) OnOffer(arg1 webrtc.SessionDescription, arg2 uint32) error { +func (fake *FakeHandler) OnOffer(arg1 webrtc.SessionDescription, arg2 uint32, arg3 map[string]string) error { fake.onOfferMutex.Lock() ret, specificReturn := fake.onOfferReturnsOnCall[len(fake.onOfferArgsForCall)] fake.onOfferArgsForCall = append(fake.onOfferArgsForCall, struct { arg1 webrtc.SessionDescription arg2 uint32 - }{arg1, arg2}) + arg3 map[string]string + }{arg1, arg2, arg3}) stub := fake.OnOfferStub fakeReturns := fake.onOfferReturns - fake.recordInvocation("OnOffer", []interface{}{arg1, arg2}) + fake.recordInvocation("OnOffer", []interface{}{arg1, arg2, arg3}) fake.onOfferMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -520,17 +522,17 @@ func (fake *FakeHandler) OnOfferCallCount() int { return len(fake.onOfferArgsForCall) } -func (fake *FakeHandler) OnOfferCalls(stub func(webrtc.SessionDescription, uint32) error) { +func (fake *FakeHandler) OnOfferCalls(stub func(webrtc.SessionDescription, uint32, map[string]string) error) { fake.onOfferMutex.Lock() defer fake.onOfferMutex.Unlock() fake.OnOfferStub = stub } -func (fake *FakeHandler) OnOfferArgsForCall(i int) (webrtc.SessionDescription, uint32) { +func (fake *FakeHandler) OnOfferArgsForCall(i int) (webrtc.SessionDescription, uint32, map[string]string) { fake.onOfferMutex.RLock() defer fake.onOfferMutex.RUnlock() argsForCall := fake.onOfferArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeHandler) OnOfferReturns(result1 error) { diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index e4a873f2b..d27ec0aff 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -68,7 +68,7 @@ func TestMissingAnswerDuringICERestart(t *testing.T) { // offer again, but missed var offerReceived atomic.Bool - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32, _midToTrackID map[string]string) error { require.Equal(t, webrtc.SignalingStateHaveLocalOffer, transportA.pc.SignalingState()) require.Equal(t, transport.NegotiationStateRemote, negotiationState.Load().(transport.NegotiationState)) offerReceived.Store(true) @@ -115,7 +115,7 @@ func TestNegotiationTiming(t *testing.T) { firstOffer := atomic.Value{} firstOfferId := atomic.Uint32{} secondOffer := atomic.Value{} - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { if _, ok := firstOffer.Load().(*webrtc.SessionDescription); !ok { firstOffer.Store(&sd) firstOfferId.Store(offerId) @@ -231,7 +231,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) { // first offer missed var firstOfferReceived atomic.Bool - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32, _midToTrackID map[string]string) error { firstOfferReceived.Store(true) return nil }) @@ -249,7 +249,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) { }) var offerCount atomic.Int32 - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { offerCount.Inc() // the second offer is a ice restart offer, so we wait transportB complete the ice gathering @@ -312,7 +312,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) { } return nil }) - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { transportB.HandleRemoteDescription(sd, offerId) return nil }) @@ -326,7 +326,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) { // first one is recover from missed offer // second one is restartICE var offerCount atomic.Int32 - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { offerCount.Inc() // the second offer is a ice restart offer, so we wait for transportB to complete ICE gathering @@ -382,7 +382,7 @@ func TestNegotiationFailed(t *testing.T) { connectTransports(t, transportA, transportB, handlerA, handlerB, false, 1, 1) // reset OnOffer to force a negotiation failure - handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error { + handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { return nil }) var failed atomic.Int32 @@ -544,7 +544,7 @@ func connectTransports(t *testing.T, offerer, answerer *PCTransport, offererHand return nil }) - offererHandler.OnOfferCalls(func(offer webrtc.SessionDescription, offerId uint32) error { + offererHandler.OnOfferCalls(func(offer webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error { offerCount.Inc() answerer.HandleRemoteDescription(offer, offerId) return nil diff --git a/test/client/client.go b/test/client/client.go index 6d856264d..55b01d43f 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -395,7 +395,7 @@ func NewRTCClient(conn *websocket.Conn, useSinglePeerConnection bool, opts *Opti ) return c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Answer{ - Answer: signalling.ToProtoSessionDescription(answer, answerId), + Answer: signalling.ToProtoSessionDescription(answer, answerId, nil), }, }) }) @@ -468,22 +468,16 @@ func (c *RTCClient) handleSignalResponse(res *livekit.SignalResponse) error { "answer", msg.Answer.Sdp, ) c.handleAnswer(signalling.FromProtoSessionDescription(msg.Answer)) - case *livekit.SignalResponse_MappedAnswer: - logger.Infow( - "received mapped server answer", - "participant", c.localParticipant.Identity, - "answer", msg.MappedAnswer.SessionDescription.Sdp, - ) - c.handleAnswer(signalling.FromProtoSessionDescription(msg.MappedAnswer.SessionDescription)) case *livekit.SignalResponse_Offer: - desc, offerId := signalling.FromProtoSessionDescription(msg.Offer) + desc, offerId, midToTrackID := signalling.FromProtoSessionDescription(msg.Offer) logger.Infow( "received server offer", "participant", c.localParticipant.Identity, "sdp", desc, "offerId", offerId, + "midToTrackID", midToTrackID, ) - c.handleOffer(desc, offerId) + c.handleOffer(desc, offerId, midToTrackID) case *livekit.SignalResponse_Trickle: candidateInit, err := signalling.FromProtoTrickle(msg.Trickle) if err != nil { @@ -932,14 +926,14 @@ func (c *RTCClient) handleDataMessageUnlabeled(data []byte) { } // handles a server initiated offer, handle on subscriber PC -func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32) { +func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) { logger.Infow("handling server offer", "participant", c.localParticipant.Identity) c.subscriber.HandleRemoteDescription(desc, offerId) c.processPendingRemoteTracks() } // the client handles answer on the publisher PC -func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription, answerId uint32) { +func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription, answerId uint32, _midToTrackID map[string]string) { logger.Infow("handling server answer", "participant", c.localParticipant.Identity) // remote answered the offer, establish connection @@ -977,18 +971,19 @@ func (c *RTCClient) handleMediaSectionsRequirement(mediaSectionsRequirement *liv c.publisher.Negotiate(false) } -func (c *RTCClient) onOffer(offer webrtc.SessionDescription, offerId uint32) error { +func (c *RTCClient) onOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { if c.localParticipant != nil { logger.Infow("starting negotiation", "participant", c.localParticipant.Identity) logger.Infow( "sending publisher offer", "participant", c.localParticipant.Identity, "offer", offer, + "midToTrackID", midToTrackID, ) } return c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Offer{ - Offer: signalling.ToProtoSessionDescription(offer, offerId), + Offer: signalling.ToProtoSessionDescription(offer, offerId, nil), }, }) }