create new subscriber datatrack after migration (#600)

* use negotiated data channel in migration case

* create subscriber data track after migration

* fix participant state update

* add participant leave log

* update protocol
This commit is contained in:
cnderrauber
2022-04-07 15:43:24 +08:00
committed by GitHub
parent 93a2730128
commit f74144846b
8 changed files with 100 additions and 91 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a
github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941
github.com/mackerelio/go-osstat v0.2.1
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0

2
go.sum
View File

@@ -134,6 +134,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a h1:634c+R4IECT+TmDp672o4DfWOSMOm93E9DcYulcxltQ=
github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941 h1:6bVOdzzd18Pg2Ud9KF5elew/BQk6PtyQK7iTj1VR3MY=
github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc=
github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=

View File

@@ -216,28 +216,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
if p.SubscriberAsPrimary() {
primaryPC = p.subscriber.pc
secondaryPC = p.publisher.pc
ordered := true
negotiated := p.params.Migration
// also create data channels for subs, this is for legacy clients that do not use subscriber
// as primary channel
p.reliableDCSub, err = primaryPC.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
Negotiated: &negotiated,
})
if err != nil {
return nil, err
if !params.Migration {
if err := p.createDataChannelForSubscriberAsPrimary(); err != nil {
return nil, err
}
}
p.reliableDCSub.OnOpen(p.incActiveCounter)
retransmits := uint16(0)
p.lossyDCSub, err = primaryPC.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &retransmits,
Negotiated: &negotiated,
})
if err != nil {
return nil, err
}
p.lossyDCSub.OnOpen(p.incActiveCounter)
} else {
p.activeCounter.Add(2)
}
@@ -255,6 +238,59 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return p, nil
}
func (p *ParticipantImpl) createDataChannelForSubscriberAsPrimary() error {
primaryPC := p.subscriber.pc
ordered := true
// also create data channels for subs, this is for legacy clients that do not use subscriber
// as primary channel
var (
reliableID, lossyID uint16
reliableIDPtr, lossyIDPtr *uint16
)
// for old version migration clients, they don't send subscriber data channel info
// so we need to create data channels with default ID and don't negotiate as client already has
// data channels with default ID.
// for new version migration clients, we create data channels with new ID and negotiate with client
for _, dc := range p.pendingDataChannels {
if dc.Target == livekit.SignalTarget_SUBSCRIBER {
if dc.Label == ReliableDataChannel {
// pion use step 2 for auto generated ID, so we need to add 4 to avoid conflict
reliableID = uint16(dc.Id) + 4
reliableIDPtr = &reliableID
} else if dc.Label == LossyDataChannel {
lossyID = uint16(dc.Id) + 4
lossyIDPtr = &lossyID
}
}
}
var err error
negotiated := p.params.Migration && reliableIDPtr == nil
p.reliableDCSub, err = primaryPC.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
ID: reliableIDPtr,
Negotiated: &negotiated,
})
if err != nil {
return err
}
p.reliableDCSub.OnOpen(p.incActiveCounter)
retransmits := uint16(0)
negotiated = p.params.Migration && lossyIDPtr == nil
p.lossyDCSub, err = primaryPC.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &retransmits,
ID: lossyIDPtr,
Negotiated: &negotiated,
})
if err != nil {
return err
}
p.lossyDCSub.OnOpen(p.incActiveCounter)
return nil
}
func (p *ParticipantImpl) GetLogger() logger.Logger {
return p.params.Logger
}
@@ -486,14 +522,20 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
})
}
func (p *ParticipantImpl) SetMigrateInfo(mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) {
func (p *ParticipantImpl) SetMigrateInfo(previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) {
p.pendingTracksLock.Lock()
defer p.pendingTracksLock.Unlock()
for _, t := range mediaTracks {
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{t.GetTrack(), true}
}
p.pendingDataChannels = dataChannels
if p.SubscriberAsPrimary() {
if err := p.createDataChannelForSubscriberAsPrimary(); err != nil {
p.params.Logger.Errorw("create data channel failed", err)
}
}
p.pendingTracksLock.Unlock()
p.subscriber.SetPreviousAnswer(previousAnswer)
}
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
@@ -536,6 +578,7 @@ func (p *ParticipantImpl) Close(sendLeave bool) error {
return nil
}
p.params.Logger.Infow("closing participant", "sendLeave", sendLeave)
// send leave message
if sendLeave {
_ = p.writeMessage(&livekit.SignalResponse{
@@ -589,10 +632,6 @@ func (p *ParticipantImpl) Negotiate() {
}
}
func (p *ParticipantImpl) SetPreviousAnswer(previous *webrtc.SessionDescription) {
p.subscriber.SetPreviousAnswer(previous)
}
func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
p.lock.Lock()
preState := p.MigrateState()
@@ -609,7 +648,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
}
p.lock.Unlock()
if s == types.MigrateStateComplete {
p.handlePendingDataChannels()
p.handlePendingPublisherDataChannels()
}
if pendingOffer != nil {
_, err := p.HandleOffer(*pendingOffer)
@@ -1555,7 +1594,7 @@ func (p *ParticipantImpl) DebugInfo() map[string]interface{} {
return info
}
func (p *ParticipantImpl) handlePendingDataChannels() {
func (p *ParticipantImpl) handlePendingPublisherDataChannels() {
ordered := true
negotiated := true
for _, ci := range p.pendingDataChannels {
@@ -1563,6 +1602,9 @@ func (p *ParticipantImpl) handlePendingDataChannels() {
dc *webrtc.DataChannel
err error
)
if ci.Target == livekit.SignalTarget_SUBSCRIBER {
continue
}
if ci.Label == LossyDataChannel && p.lossyDC == nil {
retransmits := uint16(0)
id := uint16(ci.GetId())

View File

@@ -334,6 +334,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
p.OnDataPacket(nil)
// close participant as well
r.Logger.Infow("closing participant for removal", "participantID", p.ID())
_ = p.Close(true)
r.lock.RLock()

View File

@@ -74,6 +74,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
return nil
}
case *livekit.SignalRequest_Leave:
pLogger.Infow("client leaving room")
_ = participant.Close(true)
case *livekit.SignalRequest_SubscriptionPermission:
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)

View File

@@ -154,8 +154,7 @@ type LocalParticipant interface {
// session migration
SetMigrateState(s MigrateState)
MigrateState() MigrateState
SetMigrateInfo(mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo)
SetPreviousAnswer(previous *webrtc.SessionDescription)
SetMigrateInfo(previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo)
UpdateRTT(rtt uint32)
}

View File

@@ -444,11 +444,12 @@ type FakeLocalParticipant struct {
setMetadataArgsForCall []struct {
arg1 string
}
SetMigrateInfoStub func([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)
SetMigrateInfoStub func(*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)
setMigrateInfoMutex sync.RWMutex
setMigrateInfoArgsForCall []struct {
arg1 []*livekit.TrackPublishedResponse
arg2 []*livekit.DataChannelInfo
arg1 *webrtc.SessionDescription
arg2 []*livekit.TrackPublishedResponse
arg3 []*livekit.DataChannelInfo
}
SetMigrateStateStub func(types.MigrateState)
setMigrateStateMutex sync.RWMutex
@@ -466,11 +467,6 @@ type FakeLocalParticipant struct {
setPermissionReturnsOnCall map[int]struct {
result1 bool
}
SetPreviousAnswerStub func(*webrtc.SessionDescription)
setPreviousAnswerMutex sync.RWMutex
setPreviousAnswerArgsForCall []struct {
arg1 *webrtc.SessionDescription
}
SetResponseSinkStub func(routing.MessageSink)
setResponseSinkMutex sync.RWMutex
setResponseSinkArgsForCall []struct {
@@ -2974,27 +2970,28 @@ func (fake *FakeLocalParticipant) SetMetadataArgsForCall(i int) string {
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 []*livekit.TrackPublishedResponse, arg2 []*livekit.DataChannelInfo) {
var arg1Copy []*livekit.TrackPublishedResponse
if arg1 != nil {
arg1Copy = make([]*livekit.TrackPublishedResponse, len(arg1))
copy(arg1Copy, arg1)
}
var arg2Copy []*livekit.DataChannelInfo
func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 []*livekit.TrackPublishedResponse, arg3 []*livekit.DataChannelInfo) {
var arg2Copy []*livekit.TrackPublishedResponse
if arg2 != nil {
arg2Copy = make([]*livekit.DataChannelInfo, len(arg2))
arg2Copy = make([]*livekit.TrackPublishedResponse, len(arg2))
copy(arg2Copy, arg2)
}
var arg3Copy []*livekit.DataChannelInfo
if arg3 != nil {
arg3Copy = make([]*livekit.DataChannelInfo, len(arg3))
copy(arg3Copy, arg3)
}
fake.setMigrateInfoMutex.Lock()
fake.setMigrateInfoArgsForCall = append(fake.setMigrateInfoArgsForCall, struct {
arg1 []*livekit.TrackPublishedResponse
arg2 []*livekit.DataChannelInfo
}{arg1Copy, arg2Copy})
arg1 *webrtc.SessionDescription
arg2 []*livekit.TrackPublishedResponse
arg3 []*livekit.DataChannelInfo
}{arg1, arg2Copy, arg3Copy})
stub := fake.SetMigrateInfoStub
fake.recordInvocation("SetMigrateInfo", []interface{}{arg1Copy, arg2Copy})
fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2Copy, arg3Copy})
fake.setMigrateInfoMutex.Unlock()
if stub != nil {
fake.SetMigrateInfoStub(arg1, arg2)
fake.SetMigrateInfoStub(arg1, arg2, arg3)
}
}
@@ -3004,17 +3001,17 @@ func (fake *FakeLocalParticipant) SetMigrateInfoCallCount() int {
return len(fake.setMigrateInfoArgsForCall)
}
func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) {
func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) {
fake.setMigrateInfoMutex.Lock()
defer fake.setMigrateInfoMutex.Unlock()
fake.SetMigrateInfoStub = stub
}
func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) ([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) {
func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) {
fake.setMigrateInfoMutex.RLock()
defer fake.setMigrateInfoMutex.RUnlock()
argsForCall := fake.setMigrateInfoArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) SetMigrateState(arg1 types.MigrateState) {
@@ -3110,38 +3107,6 @@ func (fake *FakeLocalParticipant) SetPermissionReturnsOnCall(i int, result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) SetPreviousAnswer(arg1 *webrtc.SessionDescription) {
fake.setPreviousAnswerMutex.Lock()
fake.setPreviousAnswerArgsForCall = append(fake.setPreviousAnswerArgsForCall, struct {
arg1 *webrtc.SessionDescription
}{arg1})
stub := fake.SetPreviousAnswerStub
fake.recordInvocation("SetPreviousAnswer", []interface{}{arg1})
fake.setPreviousAnswerMutex.Unlock()
if stub != nil {
fake.SetPreviousAnswerStub(arg1)
}
}
func (fake *FakeLocalParticipant) SetPreviousAnswerCallCount() int {
fake.setPreviousAnswerMutex.RLock()
defer fake.setPreviousAnswerMutex.RUnlock()
return len(fake.setPreviousAnswerArgsForCall)
}
func (fake *FakeLocalParticipant) SetPreviousAnswerCalls(stub func(*webrtc.SessionDescription)) {
fake.setPreviousAnswerMutex.Lock()
defer fake.setPreviousAnswerMutex.Unlock()
fake.SetPreviousAnswerStub = stub
}
func (fake *FakeLocalParticipant) SetPreviousAnswerArgsForCall(i int) *webrtc.SessionDescription {
fake.setPreviousAnswerMutex.RLock()
defer fake.setPreviousAnswerMutex.RUnlock()
argsForCall := fake.setPreviousAnswerArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SetResponseSink(arg1 routing.MessageSink) {
fake.setResponseSinkMutex.Lock()
fake.setResponseSinkArgsForCall = append(fake.setResponseSinkArgsForCall, struct {
@@ -4030,8 +3995,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setMigrateStateMutex.RUnlock()
fake.setPermissionMutex.RLock()
defer fake.setPermissionMutex.RUnlock()
fake.setPreviousAnswerMutex.RLock()
defer fake.setPreviousAnswerMutex.RUnlock()
fake.setResponseSinkMutex.RLock()
defer fake.setResponseSinkMutex.RUnlock()
fake.setTrackMutedMutex.RLock()

View File

@@ -486,6 +486,7 @@ func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomN
}
}
case *livekit.RTCNodeMessage_DeleteRoom:
room.Logger.Infow("deleting room")
for _, p := range room.GetParticipants() {
_ = p.Close(true)
}