diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index 21a87f07d..9c6608bcf 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -414,6 +414,7 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string) if _, err = c.publisher.PeerConnection().AddTrack(track); err != nil { return } + c.publisher.Negotiate() writer = NewTrackWriter(c.ctx, track, path) // write tracks only after ICE connectivity diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index ec1f2cef5..4ed044cfd 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -24,10 +24,12 @@ type MessageSource interface { } type ParticipantInit struct { - Identity string - Metadata string - Reconnect bool - Permission *livekit.ParticipantPermission + Identity string + Metadata string + Reconnect bool + Permission *livekit.ParticipantPermission + ProtocolVersion int32 + UsePlanB bool } type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) @@ -45,16 +47,18 @@ type Router interface { GetNode(nodeId string) (*livekit.Node, error) ListNodes() ([]*livekit.Node, error) - // participant signal connection is ready to start + // StartParticipantSignal participant signal connection is ready to start StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) - // sends a message to RTC node + // CreateRTCSink sends a message to RTC node CreateRTCSink(roomName, identity string) (MessageSink, error) - // when a new participant's RTC connection is ready to start + // OnNewParticipantRTC is called to start a new participant's RTC connection OnNewParticipantRTC(callback NewParticipantCallback) - // messages to be delivered to RTC node + + // OnRTCMessage is called to execute actions on the RTC node OnRTCMessage(callback RTCMessageCallback) + Start() error Stop() } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 18d7a551c..4b6b78fd8 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -153,9 +153,11 @@ func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit Identity: pi.Identity, Metadata: pi.Metadata, // connection id is to allow the RTC node to identify where to route the message back to - ConnectionId: connectionId, - Reconnect: pi.Reconnect, - Permission: pi.Permission, + ConnectionId: connectionId, + Reconnect: pi.Reconnect, + Permission: pi.Permission, + ProtocolVersion: pi.ProtocolVersion, + UsePlanB: pi.UsePlanB, }) if err != nil { return @@ -217,10 +219,12 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK } pi := ParticipantInit{ - Identity: ss.Identity, - Metadata: ss.Metadata, - Reconnect: ss.Reconnect, - Permission: ss.Permission, + Identity: ss.Identity, + Metadata: ss.Metadata, + Reconnect: ss.Reconnect, + Permission: ss.Permission, + ProtocolVersion: ss.ProtocolVersion, + UsePlanB: ss.UsePlanB, } reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index d7bb42a84..5031c3889 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -131,13 +131,20 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } // using DownTrack from ion-sfu + streamId := t.participantId + if sub.ProtocolVersion().SupportsPackedStreamId() { + // when possible, pack both IDs in streamID to allow new streams to be generated + // react-native-webrtc still uses stream based APIs and require this + streamId = PackStreamID(t.participantId, t.ID()) + } + receiver := NewWrappedReceiver(t.receiver, t.ID(), streamId) downTrack, err := sfu.NewDownTrack(webrtc.RTPCodecCapability{ MimeType: codec.MimeType, ClockRate: codec.ClockRate, Channels: codec.Channels, SDPFmtpLine: codec.SDPFmtpLine, RTCPFeedback: feedbackTypes, - }, t.receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize) + }, receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize) if err != nil { return err } @@ -189,12 +196,14 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } sub.RemoveSubscribedTrack(t.participantId, subTrack) + sub.Negotiate() }) t.subscribedTracks[sub.ID()] = subTrack - sub.AddSubscribedTrack(t.participantId, subTrack) t.receiver.AddDownTrack(downTrack, true) + sub.AddSubscribedTrack(t.participantId, subTrack) + sub.Negotiate() return nil } @@ -242,7 +251,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra }) if t.receiver == nil { - t.receiver = NewWrappedReceiver(sfu.NewWebRTCReceiver(receiver, track, t.participantId), t.ID(), t.participantId) + t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.participantId) t.receiver.SetRTCPCh(t.rtcpCh) t.receiver.OnCloseHandler(func() { t.lock.Lock() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index dcc59e8c0..0a2bab78f 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -27,17 +27,18 @@ const ( ) type ParticipantImpl struct { - id string - publisher *PCTransport - subscriber *PCTransport - responseSink routing.MessageSink - audioConfig config.AudioConfig - isClosed utils.AtomicFlag - conf *WebRTCConfig - identity string - permission *livekit.ParticipantPermission - state atomic.Value // livekit.ParticipantInfo_State - rtcpCh chan []rtcp.Packet + id string + publisher *PCTransport + subscriber *PCTransport + responseSink routing.MessageSink + audioConfig config.AudioConfig + isClosed utils.AtomicFlag + conf *WebRTCConfig + identity string + permission *livekit.ParticipantPermission + state atomic.Value // livekit.ParticipantInfo_State + rtcpCh chan []rtcp.Packet + protocolVersion types.ProtocolVersion // when first connected connectedAt atomic.Value // time.Time @@ -66,7 +67,7 @@ type ParticipantImpl struct { onClose func(types.Participant) } -func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig) (*ParticipantImpl, error) { +func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig, pv types.ProtocolVersion) (*ParticipantImpl, error) { // TODO: check to ensure params are valid, id and identity can't be empty p := &ParticipantImpl{ @@ -75,6 +76,7 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, responseSink: rs, audioConfig: ac, conf: conf, + protocolVersion: pv, rtcpCh: make(chan []rtcp.Packet, 50), subscribedTracks: make(map[string][]types.SubscribedTrack), lock: sync.RWMutex{}, @@ -129,6 +131,10 @@ func (p *ParticipantImpl) State() livekit.ParticipantInfo_State { return p.state.Load().(livekit.ParticipantInfo_State) } +func (p *ParticipantImpl) ProtocolVersion() types.ProtocolVersion { + return p.protocolVersion +} + func (p *ParticipantImpl) IsReady() bool { state := p.State() return state == livekit.ParticipantInfo_JOINED || state == livekit.ParticipantInfo_ACTIVE @@ -290,6 +296,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { "participant", p.Identity(), //"sdp", sdp.SDP, ) + if err := p.subscriber.SetRemoteDescription(sdp); err != nil { return errors.Wrap(err, "could not set remote description") } @@ -349,7 +356,11 @@ func (p *ParticipantImpl) Close() error { return nil } -// Subscribes op to all publishedTracks +func (p *ParticipantImpl) Negotiate() { + p.subscriber.Negotiate() +} + +// AddSubscriber subscribes op to all publishedTracks func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) { p.lock.RLock() tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks)) @@ -566,12 +577,13 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { } // when the server has an offer for participant -func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) { +func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) { if p.State() == livekit.ParticipantInfo_DISCONNECTED { logger.Debugw("skipping server offer", "participant", p.Identity()) // skip when disconnected return } + logger.Debugw("sending server offer to participant", "participant", p.Identity(), //"sdp", offer.SDP, @@ -579,7 +591,7 @@ func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) { _ = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ - Offer: ToProtoSessionDescription(sd), + Offer: ToProtoSessionDescription(offer), }, }) } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 249535b49..592fcd17e 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -152,6 +152,7 @@ func newParticipantForTest(identity string) *ParticipantImpl { identity, rtcConf, &routingfakes.FakeMessageSink{}, - config.AudioConfig{}) + config.AudioConfig{}, + 0) return p } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index e953356fa..9f125b23a 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -68,7 +68,7 @@ func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTranspo debouncedNegotiate: debounce.New(negotiationFrequency), } t.negotiationState.Store(negotiationStateNone) - t.pc.OnNegotiationNeeded(t.negotiate) + //t.pc.OnNegotiationNeeded(t.Negotiate) return t, nil } @@ -111,7 +111,7 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { t.negotiationState.Store(negotiationStateNone) if state == negotiationRetry { // need to Negotiate again - t.negotiate() + t.Negotiate() } return nil @@ -122,7 +122,7 @@ func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) { t.onOffer = f } -func (t *PCTransport) negotiate() { +func (t *PCTransport) Negotiate() { t.debouncedNegotiate(t.handleNegotiate) } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index ab901bb5f..32ec31763 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -25,6 +25,7 @@ type Participant interface { ID() string Identity() string State() livekit.ParticipantInfo_State + ProtocolVersion() ProtocolVersion IsReady() bool ConnectedAt() time.Time ToProto() *livekit.ParticipantInfo @@ -34,6 +35,7 @@ type Participant interface { GetResponseSink() routing.MessageSink SetResponseSink(sink routing.MessageSink) SubscriberMediaEngine() *webrtc.MediaEngine + Negotiate() AddTrack(clientId, name string, trackType livekit.TrackType) GetPublishedTracks() []PublishedTrack diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go new file mode 100644 index 000000000..5798c480a --- /dev/null +++ b/pkg/rtc/types/protocol_version.go @@ -0,0 +1,7 @@ +package types + +type ProtocolVersion int + +func (v ProtocolVersion) SupportsPackedStreamId() bool { + return v > 0 +} diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 6f59fee46..80b245571 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -187,6 +187,10 @@ type FakeParticipant struct { isReadyReturnsOnCall map[int]struct { result1 bool } + NegotiateStub func() + negotiateMutex sync.RWMutex + negotiateArgsForCall []struct { + } OnCloseStub func(func(types.Participant)) onCloseMutex sync.RWMutex onCloseArgsForCall []struct { @@ -212,6 +216,16 @@ type FakeParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.Participant, types.PublishedTrack) } + ProtocolVersionStub func() types.ProtocolVersion + protocolVersionMutex sync.RWMutex + protocolVersionArgsForCall []struct { + } + protocolVersionReturns struct { + result1 types.ProtocolVersion + } + protocolVersionReturnsOnCall map[int]struct { + result1 types.ProtocolVersion + } RTCPChanStub func() chan []rtcp.Packet rTCPChanMutex sync.RWMutex rTCPChanArgsForCall []struct { @@ -1241,6 +1255,30 @@ func (fake *FakeParticipant) IsReadyReturnsOnCall(i int, result1 bool) { }{result1} } +func (fake *FakeParticipant) Negotiate() { + fake.negotiateMutex.Lock() + fake.negotiateArgsForCall = append(fake.negotiateArgsForCall, struct { + }{}) + stub := fake.NegotiateStub + fake.recordInvocation("Negotiate", []interface{}{}) + fake.negotiateMutex.Unlock() + if stub != nil { + fake.NegotiateStub() + } +} + +func (fake *FakeParticipant) NegotiateCallCount() int { + fake.negotiateMutex.RLock() + defer fake.negotiateMutex.RUnlock() + return len(fake.negotiateArgsForCall) +} + +func (fake *FakeParticipant) NegotiateCalls(stub func()) { + fake.negotiateMutex.Lock() + defer fake.negotiateMutex.Unlock() + fake.NegotiateStub = stub +} + func (fake *FakeParticipant) OnClose(arg1 func(types.Participant)) { fake.onCloseMutex.Lock() fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { @@ -1401,6 +1439,59 @@ func (fake *FakeParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Partici return argsForCall.arg1 } +func (fake *FakeParticipant) ProtocolVersion() types.ProtocolVersion { + fake.protocolVersionMutex.Lock() + ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] + fake.protocolVersionArgsForCall = append(fake.protocolVersionArgsForCall, struct { + }{}) + stub := fake.ProtocolVersionStub + fakeReturns := fake.protocolVersionReturns + fake.recordInvocation("ProtocolVersion", []interface{}{}) + fake.protocolVersionMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) ProtocolVersionCallCount() int { + fake.protocolVersionMutex.RLock() + defer fake.protocolVersionMutex.RUnlock() + return len(fake.protocolVersionArgsForCall) +} + +func (fake *FakeParticipant) ProtocolVersionCalls(stub func() types.ProtocolVersion) { + fake.protocolVersionMutex.Lock() + defer fake.protocolVersionMutex.Unlock() + fake.ProtocolVersionStub = stub +} + +func (fake *FakeParticipant) ProtocolVersionReturns(result1 types.ProtocolVersion) { + fake.protocolVersionMutex.Lock() + defer fake.protocolVersionMutex.Unlock() + fake.ProtocolVersionStub = nil + fake.protocolVersionReturns = struct { + result1 types.ProtocolVersion + }{result1} +} + +func (fake *FakeParticipant) ProtocolVersionReturnsOnCall(i int, result1 types.ProtocolVersion) { + fake.protocolVersionMutex.Lock() + defer fake.protocolVersionMutex.Unlock() + fake.ProtocolVersionStub = nil + if fake.protocolVersionReturnsOnCall == nil { + fake.protocolVersionReturnsOnCall = make(map[int]struct { + result1 types.ProtocolVersion + }) + } + fake.protocolVersionReturnsOnCall[i] = struct { + result1 types.ProtocolVersion + }{result1} +} + func (fake *FakeParticipant) RTCPChan() chan []rtcp.Packet { fake.rTCPChanMutex.Lock() ret, specificReturn := fake.rTCPChanReturnsOnCall[len(fake.rTCPChanArgsForCall)] @@ -2126,6 +2217,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.identityMutex.RUnlock() fake.isReadyMutex.RLock() defer fake.isReadyMutex.RUnlock() + fake.negotiateMutex.RLock() + defer fake.negotiateMutex.RUnlock() fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() fake.onMetadataUpdateMutex.RLock() @@ -2136,6 +2229,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.onTrackPublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() + fake.protocolVersionMutex.RLock() + defer fake.protocolVersionMutex.RUnlock() fake.rTCPChanMutex.RLock() defer fake.rTCPChanMutex.RUnlock() fake.removeSubscribedTrackMutex.RLock() diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 4ac83b31c..798ba9ce8 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -17,15 +17,15 @@ const ( trackIdSeparator = "|" ) -func UnpackTrackId(packed string) (peerId string, trackId string) { +func UnpackStreamID(packed string) (peerId string, trackId string) { parts := strings.Split(packed, trackIdSeparator) if len(parts) > 1 { return parts[0], packed[len(parts[0])+1:] } - return "", packed + return packed, "" } -func PackTrackId(participantId, trackId string) string { +func PackStreamID(participantId, trackId string) string { return participantId + trackIdSeparator + trackId } diff --git a/pkg/rtc/utils_test.go b/pkg/rtc/utils_test.go index 6b122bd54..bbe8ade3b 100644 --- a/pkg/rtc/utils_test.go +++ b/pkg/rtc/utils_test.go @@ -6,13 +6,13 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPackTrackId(t *testing.T) { +func TestPackStreamId(t *testing.T) { packed := "PA_123abc|uuid-id" - pId, trackId := UnpackTrackId(packed) + pId, trackId := UnpackStreamID(packed) assert.Equal(t, "PA_123abc", pId) assert.Equal(t, "uuid-id", trackId) - assert.Equal(t, packed, PackTrackId(pId, trackId)) + assert.Equal(t, packed, PackStreamID(pId, trackId)) } func TestPackDataTrackLabel(t *testing.T) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index fb327efc5..d2f18a3bc 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" livekit "github.com/livekit/livekit-server/proto" "github.com/livekit/protocol/utils" + "github.com/pion/webrtc/v3" ) const ( @@ -206,10 +207,16 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, "room", roomName, "node", r.currentNode.Id, "participant", pi.Identity, - "num_participants", len(room.GetParticipants()), + "plan_b", pi.UsePlanB, + "protocol", pi.ProtocolVersion, ) - participant, err = rtc.NewParticipant(pi.Identity, r.rtcConfig, responseSink, r.config.Audio) + pv := types.ProtocolVersion(pi.ProtocolVersion) + rtcConf := *r.rtcConfig + if pi.UsePlanB { + rtcConf.Configuration.SDPSemantics = webrtc.SDPSemanticsPlanB + } + participant, err = rtc.NewParticipant(pi.Identity, &rtcConf, responseSink, r.config.Audio, pv) if err != nil { logger.Errorw("could not create participant", "error", err) return @@ -279,6 +286,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici "participant", participant.Identity(), "room", room.Name, ) + participant.Close() }() defer rtc.Recover() @@ -316,18 +324,12 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici sd := rtc.FromProtoSessionDescription(msg.Answer) if err := participant.HandleAnswer(sd); err != nil { logger.Errorw("could not handle answer", "participant", participant.Identity(), "err", err) - //conn.WriteJSON( - // jsonError(http.StatusInternalServerError, "could not handle negotiate", err.Error())) - return } case *livekit.SignalRequest_Trickle: candidateInit := rtc.FromProtoTrickle(msg.Trickle) //logger.Debugw("adding peer candidate", "participant", participant.ID()) if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil { logger.Errorw("could not handle trickle", "participant", participant.Identity(), "err", err) - //conn.WriteJSON( - // jsonError(http.StatusInternalServerError, "could not handle trickle", err.Error())) - return } case *livekit.SignalRequest_Mute: participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index d3c4cbb2e..a69e152ed 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "github.com/gorilla/websocket" @@ -55,6 +56,8 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { roomName := r.FormValue("room") reconnectParam := r.FormValue("reconnect") + protocolParam := r.FormValue("protocol") + planBParam := r.FormValue("planb") claims := GetGrants(r.Context()) // require a claim @@ -65,6 +68,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { pi := routing.ParticipantInit{ Reconnect: boolValue(reconnectParam), Identity: claims.Identity, + UsePlanB: boolValue(planBParam), } // only use permissions if any of them are set, default permissive if claims.Video.CanPublish || claims.Video.CanSubscribe { @@ -73,6 +77,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { CanPublish: claims.Video.CanPublish, } } + if pv, err := strconv.Atoi(protocolParam); err == nil { + pi.ProtocolVersion = int32(pv) + } onlyName, err := EnsureJoinPermission(r.Context()) if err != nil { diff --git a/proto/livekit_internal.pb.go b/proto/livekit_internal.pb.go index 5c820191b..ff946aadc 100644 --- a/proto/livekit_internal.pb.go +++ b/proto/livekit_internal.pb.go @@ -412,8 +412,10 @@ type StartSession struct { // if a client is reconnecting (i.e. resume instead of restart) Reconnect bool `protobuf:"varint,4,opt,name=reconnect,proto3" json:"reconnect,omitempty"` // metadata to pass to participant - Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` - Permission *ParticipantPermission `protobuf:"bytes,6,opt,name=permission,proto3" json:"permission,omitempty"` + Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` + Permission *ParticipantPermission `protobuf:"bytes,6,opt,name=permission,proto3" json:"permission,omitempty"` + ProtocolVersion int32 `protobuf:"varint,7,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"` + UsePlanB bool `protobuf:"varint,8,opt,name=use_plan_b,json=usePlanB,proto3" json:"use_plan_b,omitempty"` } func (x *StartSession) Reset() { @@ -490,6 +492,20 @@ func (x *StartSession) GetPermission() *ParticipantPermission { return nil } +func (x *StartSession) GetProtocolVersion() int32 { + if x != nil { + return x.ProtocolVersion + } + return 0 +} + +func (x *StartSession) GetUsePlanB() bool { + if x != nil { + return x.UsePlanB + } + return false +} + type EndSession struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -639,8 +655,8 @@ var file_livekit_internal_proto_rawDesc = []byte{ 0x6e, 0x64, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x65, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xe6, - 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xaf, + 0x02, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x6d, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, @@ -654,15 +670,20 @@ var file_livekit_internal_proto_rawDesc = []byte{ 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x50, 0x65, 0x72, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x65, 0x72, - 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, 0x53, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, - 0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, - 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, - 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x63, 0x6f, 0x6c, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x0a, 0x75, 0x73, 0x65, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x62, + 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x75, 0x73, 0x65, 0x50, 0x6c, 0x61, 0x6e, 0x42, + 0x22, 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, + 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, + 0x61, 0x6e, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, + 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/test/multinode_test.go b/test/multinode_test.go index b88b30b71..285350cb7 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -144,3 +144,5 @@ func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) { c3 := createRTCClient("c3", defaultServerPort) waitUntilConnected(t, c3) } + +// TODO: test room with protocol version 1 and 0 participants