Include mid -> trackID in both SDP offer and answer. (#4007)

This can be used by dual peer connection clients also.
This commit is contained in:
Raja Subramanian
2025-10-16 09:19:13 +05:30
committed by GitHub
parent 781dfede93
commit e63e8b6f2d
13 changed files with 72 additions and 91 deletions
+4 -6
View File
@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397
github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab
github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e
github.com/livekit/psrpc v0.7.0
github.com/mackerelio/go-osstat v0.2.6
github.com/magefile/mage v1.15.0
@@ -41,7 +41,7 @@ require (
github.com/pion/sdp/v3 v3.0.16
github.com/pion/transport/v3 v3.0.8
github.com/pion/turn/v4 v4.1.1
github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac
github.com/pion/webrtc/v4 v4.1.6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
github.com/redis/go-redis/v9 v9.14.0
@@ -55,7 +55,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b
golang.org/x/mod v0.29.0
golang.org/x/sync v0.17.0
google.golang.org/protobuf v1.36.10
@@ -144,11 +144,9 @@ require (
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/tools v0.37.0 // indirect
golang.org/x/tools v0.38.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251014184007-4626949a642f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251014184007-4626949a642f // indirect
google.golang.org/grpc v1.76.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
tool github.com/maxbrunsfeld/counterfeiter/v6
+8 -8
View File
@@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab h1:kPHpoS+vyi+yOxrBe6h/MW0/BZk4YfF3JozfEOUkz5Y=
github.com/livekit/protocol v1.42.1-0.20251015124854-bfdbffc082ab/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e h1:2pgUHq8atL01StoCTrK760eYTKDoDz1nC4a90knEw5g=
github.com/livekit/protocol v1.42.2-0.20251015173118-0236ffc17f5e/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU=
github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
@@ -266,8 +266,8 @@ github.com/pion/transport/v3 v3.0.8 h1:oI3myyYnTKUSTthu/NZZ8eu2I5sHbxbUNNFW62ola
github.com/pion/transport/v3 v3.0.8/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ=
github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc=
github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8=
github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac h1:Xy4aPmPyPpSM7nPK+OUkKx4WQPaa8mkOboalo0J179Q=
github.com/pion/webrtc/v4 v4.1.6-0.20251015073003-f35dc4efd0ac/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU=
github.com/pion/webrtc/v4 v4.1.6 h1:srHH2HwvCGwPba25EYJgUzgLqCQoXl1VCUnrGQMSzUw=
github.com/pion/webrtc/v4 v4.1.6/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -369,8 +369,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b h1:18qgiDvlvH7kk8Ioa8Ov+K6xCi0GMvmGfGW0sgd/SYA=
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -473,8 +473,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+9 -11
View File
@@ -1253,7 +1253,7 @@ func (p *ParticipantImpl) HandleICETrickle(trickleRequest *livekit.TrickleReques
// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(sd *livekit.SessionDescription) error {
offer, offerId := protosignalling.FromProtoSessionDescription(sd)
offer, offerId, _ := protosignalling.FromProtoSessionDescription(sd)
lgr := p.pubLogger.WithUnlikelyValues(
"transport", livekit.SignalTarget_PUBLISHER,
"offer", offer,
@@ -1317,13 +1317,10 @@ func (p *ParticipantImpl) onPublisherAnswer(answer webrtc.SessionDescription, an
"transport", livekit.SignalTarget_PUBLISHER,
"answer", answer,
"answerId", answerId,
"midToTrackID", midToTrackID,
)
if p.params.UseSinglePeerConnection {
return p.sendMappedSdpAnswer(answer, answerId, midToTrackID)
}
return p.sendSdpAnswer(answer, answerId)
return p.sendSdpAnswer(answer, answerId, midToTrackID)
}
func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, uint32, error) {
@@ -1349,7 +1346,7 @@ func (p *ParticipantImpl) GetAnswer() (webrtc.SessionDescription, uint32, error)
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(sd *livekit.SessionDescription) {
answer, answerId := protosignalling.FromProtoSessionDescription(sd)
answer, answerId, _ := protosignalling.FromProtoSessionDescription(sd)
p.subLogger.Debugw(
"received answer",
"transport", livekit.SignalTarget_SUBSCRIBER,
@@ -1987,8 +1984,8 @@ type SubscriberTransportHandler struct {
AnyTransportHandler
}
func (h SubscriberTransportHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error {
return h.p.onSubscriberOffer(sd, offerId)
func (h SubscriberTransportHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error {
return h.p.onSubscriberOffer(sd, offerId, midToTrackID)
}
func (h SubscriberTransportHandler) OnStreamStateChange(update *streamallocator.StreamStateUpdate) error {
@@ -2261,14 +2258,15 @@ func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
}
// when the server has an offer for participant
func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription, offerId uint32) error {
func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error {
p.subLogger.Debugw(
"sending offer",
"transport", livekit.SignalTarget_SUBSCRIBER,
"offer", offer,
"offerId", offerId,
"midToTrackID", midToTrackID,
)
return p.sendSdpOffer(offer, offerId)
return p.sendSdpOffer(offer, offerId, midToTrackID)
}
func (p *ParticipantImpl) removePublishedTrack(track types.MediaTrack) {
+3 -3
View File
@@ -386,7 +386,7 @@ func TestDisableCodecs(t *testing.T) {
sink.WriteMessageCalls(func(msg proto.Message) error {
if res, ok := msg.(*livekit.SignalResponse); ok {
if res.GetAnswer() != nil {
answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer())
answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer())
answerReceived.Store(true)
answerIdReceived.Store(answerId)
}
@@ -578,7 +578,7 @@ func TestPreferMediaCodecForPublisher(t *testing.T) {
sink.WriteMessageCalls(func(msg proto.Message) error {
if res, ok := msg.(*livekit.SignalResponse); ok {
if res.GetAnswer() != nil {
answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer())
answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer())
pc.SetRemoteDescription(answer)
answerReceived.Store(true)
answerIdReceived.Store(answerId)
@@ -698,7 +698,7 @@ func TestPreferAudioCodecForRed(t *testing.T) {
sink.WriteMessageCalls(func(msg proto.Message) error {
if res, ok := msg.(*livekit.SignalResponse); ok {
if res.GetAnswer() != nil {
answer, answerId = signalling.FromProtoSessionDescription(res.GetAnswer())
answer, answerId, _ = signalling.FromProtoSessionDescription(res.GetAnswer())
pc.SetRemoteDescription(answer)
answerReceived.Store(true)
answerIdReceived.Store(answerId)
+4 -8
View File
@@ -295,16 +295,12 @@ func (p *ParticipantImpl) sendLeaveRequest(
return nil
}
func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId)))
func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId, midToTrackID)))
}
func (p *ParticipantImpl) sendMappedSdpAnswer(answer webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error {
return p.signaller.WriteMessage(p.signalling.SignalMappedSdpAnswer(protosignalling.ToProtoMappedSessionDescription(answer, answerId, midToTrackID)))
}
func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId)))
func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId, midToTrackID)))
}
func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error {
-1
View File
@@ -52,7 +52,6 @@ type ParticipantSignalling interface {
SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message
SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message
SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message
SignalMappedSdpAnswer(mappedAnswer *livekit.MappedSessionDescription) proto.Message
SignalSdpOffer(offer *livekit.SessionDescription) proto.Message
SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message
SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message
-8
View File
@@ -173,14 +173,6 @@ func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Messa
}
}
func (s *signalling) SignalMappedSdpAnswer(mappedAnswer *livekit.MappedSessionDescription) proto.Message {
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_MappedAnswer{
MappedAnswer: mappedAnswer,
},
}
}
func (s *signalling) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message {
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
@@ -88,10 +88,6 @@ func (u *signallingUnimplemented) SignalSdpAnswer(answer *livekit.SessionDescrip
return nil
}
func (u *signallingUnimplemented) SignalMappedSdpAnswer(answer *livekit.MappedSessionDescription) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message {
return nil
}
+15 -10
View File
@@ -2524,7 +2524,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
)
}
if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil {
if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping()); err != nil {
prometheus.RecordServiceOperationError("offer", "write_message")
return errors.Wrap(err, "could not send offer")
}
@@ -2640,13 +2640,6 @@ func (t *PCTransport) createAndSendAnswer() error {
return errors.Wrap(err, "setting local description failed")
}
midToTrackID := map[string]string{}
for _, tr := range t.pc.GetTransceivers() {
if tr.Sender() != nil && tr.Mid() != "" {
midToTrackID[tr.Mid()] = tr.Sender().Track().ID()
}
}
//
// Filter after setting local description as pion expects the answer
// to match between CreateAnswer and SetLocalDescription.
@@ -2668,7 +2661,8 @@ func (t *PCTransport) createAndSendAnswer() error {
}
answerId := t.remoteOfferId.Load()
if err := t.params.Handler.OnAnswer(answer, answerId, midToTrackID); err != nil {
if err := t.params.Handler.OnAnswer(answer, answerId, t.getMidToTrackIDMapping()); err != nil {
prometheus.RecordServiceOperationError("answer", "write_message")
return errors.Wrap(err, "could not send answer")
}
@@ -2850,7 +2844,7 @@ func (t *PCTransport) doICERestart() error {
)
}
err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc())
err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping())
if err != nil {
prometheus.RecordServiceOperationError("offer", "write_message")
} else {
@@ -2907,6 +2901,17 @@ func (t *PCTransport) outputAndClearICEStats() {
}
}
func (t *PCTransport) getMidToTrackIDMapping() map[string]string {
transceivers := t.pc.GetTransceivers()
midToTrackID := make(map[string]string, len(transceivers))
for _, tr := range transceivers {
if tr.Sender() != nil && tr.Sender().Track() != nil && tr.Mid() != "" {
midToTrackID[tr.Mid()] = tr.Sender().Track().ID()
}
}
return midToTrackID
}
// ----------------------
type configureSenderParams struct {
+2 -2
View File
@@ -42,7 +42,7 @@ type Handler interface {
OnDataMessage(kind livekit.DataPacket_Kind, data []byte)
OnDataMessageUnlabeled(data []byte)
OnDataSendError(err error)
OnOffer(sd webrtc.SessionDescription, offerId uint32) error
OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error
OnSetRemoteDescriptionOffer()
OnAnswer(sd webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error
OnNegotiationStateChanged(state NegotiationState)
@@ -63,7 +63,7 @@ func (h UnimplementedHandler) OnTrack(track *webrtc.TrackRemote, rtpReceiver *we
func (h UnimplementedHandler) OnDataMessage(kind livekit.DataPacket_Kind, data []byte) {}
func (h UnimplementedHandler) OnDataMessageUnlabeled(data []byte) {}
func (h UnimplementedHandler) OnDataSendError(err error) {}
func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32) error {
func (h UnimplementedHandler) OnOffer(sd webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error {
return ErrNoOfferHandler
}
func (h UnimplementedHandler) OnSetRemoteDescriptionOffer() {}
@@ -76,11 +76,12 @@ type FakeHandler struct {
onNegotiationStateChangedArgsForCall []struct {
arg1 transport.NegotiationState
}
OnOfferStub func(webrtc.SessionDescription, uint32) error
OnOfferStub func(webrtc.SessionDescription, uint32, map[string]string) error
onOfferMutex sync.RWMutex
onOfferArgsForCall []struct {
arg1 webrtc.SessionDescription
arg2 uint32
arg3 map[string]string
}
onOfferReturns struct {
result1 error
@@ -494,19 +495,20 @@ func (fake *FakeHandler) OnNegotiationStateChangedArgsForCall(i int) transport.N
return argsForCall.arg1
}
func (fake *FakeHandler) OnOffer(arg1 webrtc.SessionDescription, arg2 uint32) error {
func (fake *FakeHandler) OnOffer(arg1 webrtc.SessionDescription, arg2 uint32, arg3 map[string]string) error {
fake.onOfferMutex.Lock()
ret, specificReturn := fake.onOfferReturnsOnCall[len(fake.onOfferArgsForCall)]
fake.onOfferArgsForCall = append(fake.onOfferArgsForCall, struct {
arg1 webrtc.SessionDescription
arg2 uint32
}{arg1, arg2})
arg3 map[string]string
}{arg1, arg2, arg3})
stub := fake.OnOfferStub
fakeReturns := fake.onOfferReturns
fake.recordInvocation("OnOffer", []interface{}{arg1, arg2})
fake.recordInvocation("OnOffer", []interface{}{arg1, arg2, arg3})
fake.onOfferMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -520,17 +522,17 @@ func (fake *FakeHandler) OnOfferCallCount() int {
return len(fake.onOfferArgsForCall)
}
func (fake *FakeHandler) OnOfferCalls(stub func(webrtc.SessionDescription, uint32) error) {
func (fake *FakeHandler) OnOfferCalls(stub func(webrtc.SessionDescription, uint32, map[string]string) error) {
fake.onOfferMutex.Lock()
defer fake.onOfferMutex.Unlock()
fake.OnOfferStub = stub
}
func (fake *FakeHandler) OnOfferArgsForCall(i int) (webrtc.SessionDescription, uint32) {
func (fake *FakeHandler) OnOfferArgsForCall(i int) (webrtc.SessionDescription, uint32, map[string]string) {
fake.onOfferMutex.RLock()
defer fake.onOfferMutex.RUnlock()
argsForCall := fake.onOfferArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeHandler) OnOfferReturns(result1 error) {
+8 -8
View File
@@ -68,7 +68,7 @@ func TestMissingAnswerDuringICERestart(t *testing.T) {
// offer again, but missed
var offerReceived atomic.Bool
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32, _midToTrackID map[string]string) error {
require.Equal(t, webrtc.SignalingStateHaveLocalOffer, transportA.pc.SignalingState())
require.Equal(t, transport.NegotiationStateRemote, negotiationState.Load().(transport.NegotiationState))
offerReceived.Store(true)
@@ -115,7 +115,7 @@ func TestNegotiationTiming(t *testing.T) {
firstOffer := atomic.Value{}
firstOfferId := atomic.Uint32{}
secondOffer := atomic.Value{}
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
if _, ok := firstOffer.Load().(*webrtc.SessionDescription); !ok {
firstOffer.Store(&sd)
firstOfferId.Store(offerId)
@@ -231,7 +231,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) {
// first offer missed
var firstOfferReceived atomic.Bool
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, _offerId uint32, _midToTrackID map[string]string) error {
firstOfferReceived.Store(true)
return nil
})
@@ -249,7 +249,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) {
})
var offerCount atomic.Int32
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
offerCount.Inc()
// the second offer is a ice restart offer, so we wait transportB complete the ice gathering
@@ -312,7 +312,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) {
}
return nil
})
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
transportB.HandleRemoteDescription(sd, offerId)
return nil
})
@@ -326,7 +326,7 @@ func TestFirstAnswerMissedDuringICERestart(t *testing.T) {
// first one is recover from missed offer
// second one is restartICE
var offerCount atomic.Int32
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
offerCount.Inc()
// the second offer is a ice restart offer, so we wait for transportB to complete ICE gathering
@@ -382,7 +382,7 @@ func TestNegotiationFailed(t *testing.T) {
connectTransports(t, transportA, transportB, handlerA, handlerB, false, 1, 1)
// reset OnOffer to force a negotiation failure
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32) error {
handlerA.OnOfferCalls(func(sd webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
return nil
})
var failed atomic.Int32
@@ -544,7 +544,7 @@ func connectTransports(t *testing.T, offerer, answerer *PCTransport, offererHand
return nil
})
offererHandler.OnOfferCalls(func(offer webrtc.SessionDescription, offerId uint32) error {
offererHandler.OnOfferCalls(func(offer webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) error {
offerCount.Inc()
answerer.HandleRemoteDescription(offer, offerId)
return nil
+9 -14
View File
@@ -395,7 +395,7 @@ func NewRTCClient(conn *websocket.Conn, useSinglePeerConnection bool, opts *Opti
)
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Answer{
Answer: signalling.ToProtoSessionDescription(answer, answerId),
Answer: signalling.ToProtoSessionDescription(answer, answerId, nil),
},
})
})
@@ -468,22 +468,16 @@ func (c *RTCClient) handleSignalResponse(res *livekit.SignalResponse) error {
"answer", msg.Answer.Sdp,
)
c.handleAnswer(signalling.FromProtoSessionDescription(msg.Answer))
case *livekit.SignalResponse_MappedAnswer:
logger.Infow(
"received mapped server answer",
"participant", c.localParticipant.Identity,
"answer", msg.MappedAnswer.SessionDescription.Sdp,
)
c.handleAnswer(signalling.FromProtoSessionDescription(msg.MappedAnswer.SessionDescription))
case *livekit.SignalResponse_Offer:
desc, offerId := signalling.FromProtoSessionDescription(msg.Offer)
desc, offerId, midToTrackID := signalling.FromProtoSessionDescription(msg.Offer)
logger.Infow(
"received server offer",
"participant", c.localParticipant.Identity,
"sdp", desc,
"offerId", offerId,
"midToTrackID", midToTrackID,
)
c.handleOffer(desc, offerId)
c.handleOffer(desc, offerId, midToTrackID)
case *livekit.SignalResponse_Trickle:
candidateInit, err := signalling.FromProtoTrickle(msg.Trickle)
if err != nil {
@@ -932,14 +926,14 @@ func (c *RTCClient) handleDataMessageUnlabeled(data []byte) {
}
// handles a server initiated offer, handle on subscriber PC
func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32) {
func (c *RTCClient) handleOffer(desc webrtc.SessionDescription, offerId uint32, _midToTrackID map[string]string) {
logger.Infow("handling server offer", "participant", c.localParticipant.Identity)
c.subscriber.HandleRemoteDescription(desc, offerId)
c.processPendingRemoteTracks()
}
// the client handles answer on the publisher PC
func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription, answerId uint32) {
func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription, answerId uint32, _midToTrackID map[string]string) {
logger.Infow("handling server answer", "participant", c.localParticipant.Identity)
// remote answered the offer, establish connection
@@ -977,18 +971,19 @@ func (c *RTCClient) handleMediaSectionsRequirement(mediaSectionsRequirement *liv
c.publisher.Negotiate(false)
}
func (c *RTCClient) onOffer(offer webrtc.SessionDescription, offerId uint32) error {
func (c *RTCClient) onOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error {
if c.localParticipant != nil {
logger.Infow("starting negotiation", "participant", c.localParticipant.Identity)
logger.Infow(
"sending publisher offer",
"participant", c.localParticipant.Identity,
"offer", offer,
"midToTrackID", midToTrackID,
)
}
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Offer{
Offer: signalling.ToProtoSessionDescription(offer, offerId),
Offer: signalling.ToProtoSessionDescription(offer, offerId, nil),
},
})
}