mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
Manual negotiation and plan-b support
This commit is contained in:
@@ -414,6 +414,7 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string)
|
||||
if _, err = c.publisher.PeerConnection().AddTrack(track); err != nil {
|
||||
return
|
||||
}
|
||||
c.publisher.Negotiate()
|
||||
writer = NewTrackWriter(c.ctx, track, path)
|
||||
|
||||
// write tracks only after ICE connectivity
|
||||
|
||||
@@ -24,10 +24,12 @@ type MessageSource interface {
|
||||
}
|
||||
|
||||
type ParticipantInit struct {
|
||||
Identity string
|
||||
Metadata string
|
||||
Reconnect bool
|
||||
Permission *livekit.ParticipantPermission
|
||||
Identity string
|
||||
Metadata string
|
||||
Reconnect bool
|
||||
Permission *livekit.ParticipantPermission
|
||||
ProtocolVersion int32
|
||||
UsePlanB bool
|
||||
}
|
||||
|
||||
type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
|
||||
@@ -45,16 +47,18 @@ type Router interface {
|
||||
GetNode(nodeId string) (*livekit.Node, error)
|
||||
ListNodes() ([]*livekit.Node, error)
|
||||
|
||||
// participant signal connection is ready to start
|
||||
// StartParticipantSignal participant signal connection is ready to start
|
||||
StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
|
||||
|
||||
// sends a message to RTC node
|
||||
// CreateRTCSink sends a message to RTC node
|
||||
CreateRTCSink(roomName, identity string) (MessageSink, error)
|
||||
|
||||
// when a new participant's RTC connection is ready to start
|
||||
// OnNewParticipantRTC is called to start a new participant's RTC connection
|
||||
OnNewParticipantRTC(callback NewParticipantCallback)
|
||||
// messages to be delivered to RTC node
|
||||
|
||||
// OnRTCMessage is called to execute actions on the RTC node
|
||||
OnRTCMessage(callback RTCMessageCallback)
|
||||
|
||||
Start() error
|
||||
Stop()
|
||||
}
|
||||
|
||||
@@ -153,9 +153,11 @@ func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit
|
||||
Identity: pi.Identity,
|
||||
Metadata: pi.Metadata,
|
||||
// connection id is to allow the RTC node to identify where to route the message back to
|
||||
ConnectionId: connectionId,
|
||||
Reconnect: pi.Reconnect,
|
||||
Permission: pi.Permission,
|
||||
ConnectionId: connectionId,
|
||||
Reconnect: pi.Reconnect,
|
||||
Permission: pi.Permission,
|
||||
ProtocolVersion: pi.ProtocolVersion,
|
||||
UsePlanB: pi.UsePlanB,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
@@ -217,10 +219,12 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
|
||||
}
|
||||
|
||||
pi := ParticipantInit{
|
||||
Identity: ss.Identity,
|
||||
Metadata: ss.Metadata,
|
||||
Reconnect: ss.Reconnect,
|
||||
Permission: ss.Permission,
|
||||
Identity: ss.Identity,
|
||||
Metadata: ss.Metadata,
|
||||
Reconnect: ss.Reconnect,
|
||||
Permission: ss.Permission,
|
||||
ProtocolVersion: ss.ProtocolVersion,
|
||||
UsePlanB: ss.UsePlanB,
|
||||
}
|
||||
|
||||
reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey)
|
||||
|
||||
@@ -131,13 +131,20 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
}
|
||||
|
||||
// using DownTrack from ion-sfu
|
||||
streamId := t.participantId
|
||||
if sub.ProtocolVersion().SupportsPackedStreamId() {
|
||||
// when possible, pack both IDs in streamID to allow new streams to be generated
|
||||
// react-native-webrtc still uses stream based APIs and require this
|
||||
streamId = PackStreamID(t.participantId, t.ID())
|
||||
}
|
||||
receiver := NewWrappedReceiver(t.receiver, t.ID(), streamId)
|
||||
downTrack, err := sfu.NewDownTrack(webrtc.RTPCodecCapability{
|
||||
MimeType: codec.MimeType,
|
||||
ClockRate: codec.ClockRate,
|
||||
Channels: codec.Channels,
|
||||
SDPFmtpLine: codec.SDPFmtpLine,
|
||||
RTCPFeedback: feedbackTypes,
|
||||
}, t.receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize)
|
||||
}, receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -189,12 +196,14 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
}
|
||||
|
||||
sub.RemoveSubscribedTrack(t.participantId, subTrack)
|
||||
sub.Negotiate()
|
||||
})
|
||||
|
||||
t.subscribedTracks[sub.ID()] = subTrack
|
||||
|
||||
sub.AddSubscribedTrack(t.participantId, subTrack)
|
||||
t.receiver.AddDownTrack(downTrack, true)
|
||||
sub.AddSubscribedTrack(t.participantId, subTrack)
|
||||
sub.Negotiate()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -242,7 +251,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
})
|
||||
|
||||
if t.receiver == nil {
|
||||
t.receiver = NewWrappedReceiver(sfu.NewWebRTCReceiver(receiver, track, t.participantId), t.ID(), t.participantId)
|
||||
t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.participantId)
|
||||
t.receiver.SetRTCPCh(t.rtcpCh)
|
||||
t.receiver.OnCloseHandler(func() {
|
||||
t.lock.Lock()
|
||||
|
||||
@@ -27,17 +27,18 @@ const (
|
||||
)
|
||||
|
||||
type ParticipantImpl struct {
|
||||
id string
|
||||
publisher *PCTransport
|
||||
subscriber *PCTransport
|
||||
responseSink routing.MessageSink
|
||||
audioConfig config.AudioConfig
|
||||
isClosed utils.AtomicFlag
|
||||
conf *WebRTCConfig
|
||||
identity string
|
||||
permission *livekit.ParticipantPermission
|
||||
state atomic.Value // livekit.ParticipantInfo_State
|
||||
rtcpCh chan []rtcp.Packet
|
||||
id string
|
||||
publisher *PCTransport
|
||||
subscriber *PCTransport
|
||||
responseSink routing.MessageSink
|
||||
audioConfig config.AudioConfig
|
||||
isClosed utils.AtomicFlag
|
||||
conf *WebRTCConfig
|
||||
identity string
|
||||
permission *livekit.ParticipantPermission
|
||||
state atomic.Value // livekit.ParticipantInfo_State
|
||||
rtcpCh chan []rtcp.Packet
|
||||
protocolVersion types.ProtocolVersion
|
||||
|
||||
// when first connected
|
||||
connectedAt atomic.Value // time.Time
|
||||
@@ -66,7 +67,7 @@ type ParticipantImpl struct {
|
||||
onClose func(types.Participant)
|
||||
}
|
||||
|
||||
func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig) (*ParticipantImpl, error) {
|
||||
func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig, pv types.ProtocolVersion) (*ParticipantImpl, error) {
|
||||
// TODO: check to ensure params are valid, id and identity can't be empty
|
||||
|
||||
p := &ParticipantImpl{
|
||||
@@ -75,6 +76,7 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink,
|
||||
responseSink: rs,
|
||||
audioConfig: ac,
|
||||
conf: conf,
|
||||
protocolVersion: pv,
|
||||
rtcpCh: make(chan []rtcp.Packet, 50),
|
||||
subscribedTracks: make(map[string][]types.SubscribedTrack),
|
||||
lock: sync.RWMutex{},
|
||||
@@ -129,6 +131,10 @@ func (p *ParticipantImpl) State() livekit.ParticipantInfo_State {
|
||||
return p.state.Load().(livekit.ParticipantInfo_State)
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) ProtocolVersion() types.ProtocolVersion {
|
||||
return p.protocolVersion
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) IsReady() bool {
|
||||
state := p.State()
|
||||
return state == livekit.ParticipantInfo_JOINED || state == livekit.ParticipantInfo_ACTIVE
|
||||
@@ -290,6 +296,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
|
||||
"participant", p.Identity(),
|
||||
//"sdp", sdp.SDP,
|
||||
)
|
||||
|
||||
if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
|
||||
return errors.Wrap(err, "could not set remote description")
|
||||
}
|
||||
@@ -349,7 +356,11 @@ func (p *ParticipantImpl) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribes op to all publishedTracks
|
||||
func (p *ParticipantImpl) Negotiate() {
|
||||
p.subscriber.Negotiate()
|
||||
}
|
||||
|
||||
// AddSubscriber subscribes op to all publishedTracks
|
||||
func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
|
||||
p.lock.RLock()
|
||||
tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks))
|
||||
@@ -566,12 +577,13 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
|
||||
}
|
||||
|
||||
// when the server has an offer for participant
|
||||
func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) {
|
||||
func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
|
||||
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
logger.Debugw("skipping server offer", "participant", p.Identity())
|
||||
// skip when disconnected
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugw("sending server offer to participant",
|
||||
"participant", p.Identity(),
|
||||
//"sdp", offer.SDP,
|
||||
@@ -579,7 +591,7 @@ func (p *ParticipantImpl) onOffer(sd webrtc.SessionDescription) {
|
||||
|
||||
_ = p.writeMessage(&livekit.SignalResponse{
|
||||
Message: &livekit.SignalResponse_Offer{
|
||||
Offer: ToProtoSessionDescription(sd),
|
||||
Offer: ToProtoSessionDescription(offer),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -152,6 +152,7 @@ func newParticipantForTest(identity string) *ParticipantImpl {
|
||||
identity,
|
||||
rtcConf,
|
||||
&routingfakes.FakeMessageSink{},
|
||||
config.AudioConfig{})
|
||||
config.AudioConfig{},
|
||||
0)
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTranspo
|
||||
debouncedNegotiate: debounce.New(negotiationFrequency),
|
||||
}
|
||||
t.negotiationState.Store(negotiationStateNone)
|
||||
t.pc.OnNegotiationNeeded(t.negotiate)
|
||||
//t.pc.OnNegotiationNeeded(t.Negotiate)
|
||||
|
||||
return t, nil
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {
|
||||
t.negotiationState.Store(negotiationStateNone)
|
||||
if state == negotiationRetry {
|
||||
// need to Negotiate again
|
||||
t.negotiate()
|
||||
t.Negotiate()
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -122,7 +122,7 @@ func (t *PCTransport) OnOffer(f func(sd webrtc.SessionDescription)) {
|
||||
t.onOffer = f
|
||||
}
|
||||
|
||||
func (t *PCTransport) negotiate() {
|
||||
func (t *PCTransport) Negotiate() {
|
||||
t.debouncedNegotiate(t.handleNegotiate)
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ type Participant interface {
|
||||
ID() string
|
||||
Identity() string
|
||||
State() livekit.ParticipantInfo_State
|
||||
ProtocolVersion() ProtocolVersion
|
||||
IsReady() bool
|
||||
ConnectedAt() time.Time
|
||||
ToProto() *livekit.ParticipantInfo
|
||||
@@ -34,6 +35,7 @@ type Participant interface {
|
||||
GetResponseSink() routing.MessageSink
|
||||
SetResponseSink(sink routing.MessageSink)
|
||||
SubscriberMediaEngine() *webrtc.MediaEngine
|
||||
Negotiate()
|
||||
|
||||
AddTrack(clientId, name string, trackType livekit.TrackType)
|
||||
GetPublishedTracks() []PublishedTrack
|
||||
|
||||
7
pkg/rtc/types/protocol_version.go
Normal file
7
pkg/rtc/types/protocol_version.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package types
|
||||
|
||||
type ProtocolVersion int
|
||||
|
||||
func (v ProtocolVersion) SupportsPackedStreamId() bool {
|
||||
return v > 0
|
||||
}
|
||||
@@ -187,6 +187,10 @@ type FakeParticipant struct {
|
||||
isReadyReturnsOnCall map[int]struct {
|
||||
result1 bool
|
||||
}
|
||||
NegotiateStub func()
|
||||
negotiateMutex sync.RWMutex
|
||||
negotiateArgsForCall []struct {
|
||||
}
|
||||
OnCloseStub func(func(types.Participant))
|
||||
onCloseMutex sync.RWMutex
|
||||
onCloseArgsForCall []struct {
|
||||
@@ -212,6 +216,16 @@ type FakeParticipant struct {
|
||||
onTrackUpdatedArgsForCall []struct {
|
||||
arg1 func(types.Participant, types.PublishedTrack)
|
||||
}
|
||||
ProtocolVersionStub func() types.ProtocolVersion
|
||||
protocolVersionMutex sync.RWMutex
|
||||
protocolVersionArgsForCall []struct {
|
||||
}
|
||||
protocolVersionReturns struct {
|
||||
result1 types.ProtocolVersion
|
||||
}
|
||||
protocolVersionReturnsOnCall map[int]struct {
|
||||
result1 types.ProtocolVersion
|
||||
}
|
||||
RTCPChanStub func() chan []rtcp.Packet
|
||||
rTCPChanMutex sync.RWMutex
|
||||
rTCPChanArgsForCall []struct {
|
||||
@@ -1241,6 +1255,30 @@ func (fake *FakeParticipant) IsReadyReturnsOnCall(i int, result1 bool) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) Negotiate() {
|
||||
fake.negotiateMutex.Lock()
|
||||
fake.negotiateArgsForCall = append(fake.negotiateArgsForCall, struct {
|
||||
}{})
|
||||
stub := fake.NegotiateStub
|
||||
fake.recordInvocation("Negotiate", []interface{}{})
|
||||
fake.negotiateMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.NegotiateStub()
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) NegotiateCallCount() int {
|
||||
fake.negotiateMutex.RLock()
|
||||
defer fake.negotiateMutex.RUnlock()
|
||||
return len(fake.negotiateArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) NegotiateCalls(stub func()) {
|
||||
fake.negotiateMutex.Lock()
|
||||
defer fake.negotiateMutex.Unlock()
|
||||
fake.NegotiateStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) OnClose(arg1 func(types.Participant)) {
|
||||
fake.onCloseMutex.Lock()
|
||||
fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct {
|
||||
@@ -1401,6 +1439,59 @@ func (fake *FakeParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Partici
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) ProtocolVersion() types.ProtocolVersion {
|
||||
fake.protocolVersionMutex.Lock()
|
||||
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
|
||||
fake.protocolVersionArgsForCall = append(fake.protocolVersionArgsForCall, struct {
|
||||
}{})
|
||||
stub := fake.ProtocolVersionStub
|
||||
fakeReturns := fake.protocolVersionReturns
|
||||
fake.recordInvocation("ProtocolVersion", []interface{}{})
|
||||
fake.protocolVersionMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub()
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) ProtocolVersionCallCount() int {
|
||||
fake.protocolVersionMutex.RLock()
|
||||
defer fake.protocolVersionMutex.RUnlock()
|
||||
return len(fake.protocolVersionArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) ProtocolVersionCalls(stub func() types.ProtocolVersion) {
|
||||
fake.protocolVersionMutex.Lock()
|
||||
defer fake.protocolVersionMutex.Unlock()
|
||||
fake.ProtocolVersionStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) ProtocolVersionReturns(result1 types.ProtocolVersion) {
|
||||
fake.protocolVersionMutex.Lock()
|
||||
defer fake.protocolVersionMutex.Unlock()
|
||||
fake.ProtocolVersionStub = nil
|
||||
fake.protocolVersionReturns = struct {
|
||||
result1 types.ProtocolVersion
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) ProtocolVersionReturnsOnCall(i int, result1 types.ProtocolVersion) {
|
||||
fake.protocolVersionMutex.Lock()
|
||||
defer fake.protocolVersionMutex.Unlock()
|
||||
fake.ProtocolVersionStub = nil
|
||||
if fake.protocolVersionReturnsOnCall == nil {
|
||||
fake.protocolVersionReturnsOnCall = make(map[int]struct {
|
||||
result1 types.ProtocolVersion
|
||||
})
|
||||
}
|
||||
fake.protocolVersionReturnsOnCall[i] = struct {
|
||||
result1 types.ProtocolVersion
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) RTCPChan() chan []rtcp.Packet {
|
||||
fake.rTCPChanMutex.Lock()
|
||||
ret, specificReturn := fake.rTCPChanReturnsOnCall[len(fake.rTCPChanArgsForCall)]
|
||||
@@ -2126,6 +2217,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.identityMutex.RUnlock()
|
||||
fake.isReadyMutex.RLock()
|
||||
defer fake.isReadyMutex.RUnlock()
|
||||
fake.negotiateMutex.RLock()
|
||||
defer fake.negotiateMutex.RUnlock()
|
||||
fake.onCloseMutex.RLock()
|
||||
defer fake.onCloseMutex.RUnlock()
|
||||
fake.onMetadataUpdateMutex.RLock()
|
||||
@@ -2136,6 +2229,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.onTrackPublishedMutex.RUnlock()
|
||||
fake.onTrackUpdatedMutex.RLock()
|
||||
defer fake.onTrackUpdatedMutex.RUnlock()
|
||||
fake.protocolVersionMutex.RLock()
|
||||
defer fake.protocolVersionMutex.RUnlock()
|
||||
fake.rTCPChanMutex.RLock()
|
||||
defer fake.rTCPChanMutex.RUnlock()
|
||||
fake.removeSubscribedTrackMutex.RLock()
|
||||
|
||||
@@ -17,15 +17,15 @@ const (
|
||||
trackIdSeparator = "|"
|
||||
)
|
||||
|
||||
func UnpackTrackId(packed string) (peerId string, trackId string) {
|
||||
func UnpackStreamID(packed string) (peerId string, trackId string) {
|
||||
parts := strings.Split(packed, trackIdSeparator)
|
||||
if len(parts) > 1 {
|
||||
return parts[0], packed[len(parts[0])+1:]
|
||||
}
|
||||
return "", packed
|
||||
return packed, ""
|
||||
}
|
||||
|
||||
func PackTrackId(participantId, trackId string) string {
|
||||
func PackStreamID(participantId, trackId string) string {
|
||||
return participantId + trackIdSeparator + trackId
|
||||
}
|
||||
|
||||
|
||||
@@ -6,13 +6,13 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPackTrackId(t *testing.T) {
|
||||
func TestPackStreamId(t *testing.T) {
|
||||
packed := "PA_123abc|uuid-id"
|
||||
pId, trackId := UnpackTrackId(packed)
|
||||
pId, trackId := UnpackStreamID(packed)
|
||||
assert.Equal(t, "PA_123abc", pId)
|
||||
assert.Equal(t, "uuid-id", trackId)
|
||||
|
||||
assert.Equal(t, packed, PackTrackId(pId, trackId))
|
||||
assert.Equal(t, packed, PackStreamID(pId, trackId))
|
||||
}
|
||||
|
||||
func TestPackDataTrackLabel(t *testing.T) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
livekit "github.com/livekit/livekit-server/proto"
|
||||
"github.com/livekit/protocol/utils"
|
||||
"github.com/pion/webrtc/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -206,10 +207,16 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
|
||||
"room", roomName,
|
||||
"node", r.currentNode.Id,
|
||||
"participant", pi.Identity,
|
||||
"num_participants", len(room.GetParticipants()),
|
||||
"plan_b", pi.UsePlanB,
|
||||
"protocol", pi.ProtocolVersion,
|
||||
)
|
||||
|
||||
participant, err = rtc.NewParticipant(pi.Identity, r.rtcConfig, responseSink, r.config.Audio)
|
||||
pv := types.ProtocolVersion(pi.ProtocolVersion)
|
||||
rtcConf := *r.rtcConfig
|
||||
if pi.UsePlanB {
|
||||
rtcConf.Configuration.SDPSemantics = webrtc.SDPSemanticsPlanB
|
||||
}
|
||||
participant, err = rtc.NewParticipant(pi.Identity, &rtcConf, responseSink, r.config.Audio, pv)
|
||||
if err != nil {
|
||||
logger.Errorw("could not create participant", "error", err)
|
||||
return
|
||||
@@ -279,6 +286,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
|
||||
"participant", participant.Identity(),
|
||||
"room", room.Name,
|
||||
)
|
||||
participant.Close()
|
||||
}()
|
||||
defer rtc.Recover()
|
||||
|
||||
@@ -316,18 +324,12 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
|
||||
sd := rtc.FromProtoSessionDescription(msg.Answer)
|
||||
if err := participant.HandleAnswer(sd); err != nil {
|
||||
logger.Errorw("could not handle answer", "participant", participant.Identity(), "err", err)
|
||||
//conn.WriteJSON(
|
||||
// jsonError(http.StatusInternalServerError, "could not handle negotiate", err.Error()))
|
||||
return
|
||||
}
|
||||
case *livekit.SignalRequest_Trickle:
|
||||
candidateInit := rtc.FromProtoTrickle(msg.Trickle)
|
||||
//logger.Debugw("adding peer candidate", "participant", participant.ID())
|
||||
if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil {
|
||||
logger.Errorw("could not handle trickle", "participant", participant.Identity(), "err", err)
|
||||
//conn.WriteJSON(
|
||||
// jsonError(http.StatusInternalServerError, "could not handle trickle", err.Error()))
|
||||
return
|
||||
}
|
||||
case *livekit.SignalRequest_Mute:
|
||||
participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -55,6 +56,8 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
roomName := r.FormValue("room")
|
||||
reconnectParam := r.FormValue("reconnect")
|
||||
protocolParam := r.FormValue("protocol")
|
||||
planBParam := r.FormValue("planb")
|
||||
|
||||
claims := GetGrants(r.Context())
|
||||
// require a claim
|
||||
@@ -65,6 +68,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
pi := routing.ParticipantInit{
|
||||
Reconnect: boolValue(reconnectParam),
|
||||
Identity: claims.Identity,
|
||||
UsePlanB: boolValue(planBParam),
|
||||
}
|
||||
// only use permissions if any of them are set, default permissive
|
||||
if claims.Video.CanPublish || claims.Video.CanSubscribe {
|
||||
@@ -73,6 +77,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
CanPublish: claims.Video.CanPublish,
|
||||
}
|
||||
}
|
||||
if pv, err := strconv.Atoi(protocolParam); err == nil {
|
||||
pi.ProtocolVersion = int32(pv)
|
||||
}
|
||||
|
||||
onlyName, err := EnsureJoinPermission(r.Context())
|
||||
if err != nil {
|
||||
|
||||
@@ -412,8 +412,10 @@ type StartSession struct {
|
||||
// if a client is reconnecting (i.e. resume instead of restart)
|
||||
Reconnect bool `protobuf:"varint,4,opt,name=reconnect,proto3" json:"reconnect,omitempty"`
|
||||
// metadata to pass to participant
|
||||
Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"`
|
||||
Permission *ParticipantPermission `protobuf:"bytes,6,opt,name=permission,proto3" json:"permission,omitempty"`
|
||||
Metadata string `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"`
|
||||
Permission *ParticipantPermission `protobuf:"bytes,6,opt,name=permission,proto3" json:"permission,omitempty"`
|
||||
ProtocolVersion int32 `protobuf:"varint,7,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"`
|
||||
UsePlanB bool `protobuf:"varint,8,opt,name=use_plan_b,json=usePlanB,proto3" json:"use_plan_b,omitempty"`
|
||||
}
|
||||
|
||||
func (x *StartSession) Reset() {
|
||||
@@ -490,6 +492,20 @@ func (x *StartSession) GetPermission() *ParticipantPermission {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *StartSession) GetProtocolVersion() int32 {
|
||||
if x != nil {
|
||||
return x.ProtocolVersion
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *StartSession) GetUsePlanB() bool {
|
||||
if x != nil {
|
||||
return x.UsePlanB
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type EndSession struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -639,8 +655,8 @@ var file_livekit_internal_proto_rawDesc = []byte{
|
||||
0x6e, 0x64, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x53, 0x65,
|
||||
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0a, 0x65, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73,
|
||||
0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xe6,
|
||||
0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12,
|
||||
0x69, 0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xaf,
|
||||
0x02, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12,
|
||||
0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x6d, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08,
|
||||
0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
|
||||
@@ -654,15 +670,20 @@ var file_livekit_internal_proto_rawDesc = []byte{
|
||||
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x6c, 0x69,
|
||||
0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e,
|
||||
0x74, 0x50, 0x65, 0x72, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x65, 0x72,
|
||||
0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, 0x53, 0x65,
|
||||
0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x50,
|
||||
0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61,
|
||||
0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49,
|
||||
0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
|
||||
0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d,
|
||||
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76,
|
||||
0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28,
|
||||
0x05, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69,
|
||||
0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x0a, 0x75, 0x73, 0x65, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x62,
|
||||
0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x75, 0x73, 0x65, 0x50, 0x6c, 0x61, 0x6e, 0x42,
|
||||
0x22, 0x0c, 0x0a, 0x0a, 0x45, 0x6e, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a,
|
||||
0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70,
|
||||
0x61, 0x6e, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61,
|
||||
0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72,
|
||||
0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69,
|
||||
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74,
|
||||
0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x62, 0x06, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -144,3 +144,5 @@ func TestMultinodeReconnectAfterNodeShutdown(t *testing.T) {
|
||||
c3 := createRTCClient("c3", defaultServerPort)
|
||||
waitUntilConnected(t, c3)
|
||||
}
|
||||
|
||||
// TODO: test room with protocol version 1 and 0 participants
|
||||
|
||||
Reference in New Issue
Block a user