diff --git a/go.mod b/go.mod index 28a7a4ec1..56292a256 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a + github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941 github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 diff --git a/go.sum b/go.sum index 963fdb5b9..eaffc0ced 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a h1:634c+R4IECT+TmDp672o4DfWOSMOm93E9DcYulcxltQ= github.com/livekit/protocol v0.13.1-0.20220331092121-f5e8e205306a/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= +github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941 h1:6bVOdzzd18Pg2Ud9KF5elew/BQk6PtyQK7iTj1VR3MY= +github.com/livekit/protocol v0.13.1-0.20220407055643-3c712ad5c941/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f4f32d147..26606f10e 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -216,28 +216,11 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { if p.SubscriberAsPrimary() { primaryPC = p.subscriber.pc secondaryPC = p.publisher.pc - ordered := true - negotiated := p.params.Migration - // also create data channels for subs, this is for legacy clients that do not use subscriber - // as primary channel - p.reliableDCSub, err = primaryPC.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{ - Ordered: &ordered, - Negotiated: &negotiated, - }) - if err != nil { - return nil, err + if !params.Migration { + if err := p.createDataChannelForSubscriberAsPrimary(); err != nil { + return nil, err + } } - p.reliableDCSub.OnOpen(p.incActiveCounter) - retransmits := uint16(0) - p.lossyDCSub, err = primaryPC.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{ - Ordered: &ordered, - MaxRetransmits: &retransmits, - Negotiated: &negotiated, - }) - if err != nil { - return nil, err - } - p.lossyDCSub.OnOpen(p.incActiveCounter) } else { p.activeCounter.Add(2) } @@ -255,6 +238,59 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return p, nil } +func (p *ParticipantImpl) createDataChannelForSubscriberAsPrimary() error { + primaryPC := p.subscriber.pc + ordered := true + // also create data channels for subs, this is for legacy clients that do not use subscriber + // as primary channel + var ( + reliableID, lossyID uint16 + reliableIDPtr, lossyIDPtr *uint16 + ) + // for old version migration clients, they don't send subscriber data channel info + // so we need to create data channels with default ID and don't negotiate as client already has + // data channels with default ID. + // for new version migration clients, we create data channels with new ID and negotiate with client + + for _, dc := range p.pendingDataChannels { + if dc.Target == livekit.SignalTarget_SUBSCRIBER { + if dc.Label == ReliableDataChannel { + // pion use step 2 for auto generated ID, so we need to add 4 to avoid conflict + reliableID = uint16(dc.Id) + 4 + reliableIDPtr = &reliableID + } else if dc.Label == LossyDataChannel { + lossyID = uint16(dc.Id) + 4 + lossyIDPtr = &lossyID + } + } + } + + var err error + negotiated := p.params.Migration && reliableIDPtr == nil + p.reliableDCSub, err = primaryPC.CreateDataChannel(ReliableDataChannel, &webrtc.DataChannelInit{ + Ordered: &ordered, + ID: reliableIDPtr, + Negotiated: &negotiated, + }) + if err != nil { + return err + } + p.reliableDCSub.OnOpen(p.incActiveCounter) + retransmits := uint16(0) + negotiated = p.params.Migration && lossyIDPtr == nil + p.lossyDCSub, err = primaryPC.CreateDataChannel(LossyDataChannel, &webrtc.DataChannelInit{ + Ordered: &ordered, + MaxRetransmits: &retransmits, + ID: lossyIDPtr, + Negotiated: &negotiated, + }) + if err != nil { + return err + } + p.lossyDCSub.OnOpen(p.incActiveCounter) + return nil +} + func (p *ParticipantImpl) GetLogger() logger.Logger { return p.params.Logger } @@ -486,14 +522,20 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { }) } -func (p *ParticipantImpl) SetMigrateInfo(mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) { +func (p *ParticipantImpl) SetMigrateInfo(previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) { p.pendingTracksLock.Lock() - defer p.pendingTracksLock.Unlock() - for _, t := range mediaTracks { p.pendingTracks[t.GetCid()] = &pendingTrackInfo{t.GetTrack(), true} } p.pendingDataChannels = dataChannels + + if p.SubscriberAsPrimary() { + if err := p.createDataChannelForSubscriberAsPrimary(); err != nil { + p.params.Logger.Errorw("create data channel failed", err) + } + } + p.pendingTracksLock.Unlock() + p.subscriber.SetPreviousAnswer(previousAnswer) } // HandleAnswer handles a client answer response, with subscriber PC, server initiates the @@ -536,6 +578,7 @@ func (p *ParticipantImpl) Close(sendLeave bool) error { return nil } + p.params.Logger.Infow("closing participant", "sendLeave", sendLeave) // send leave message if sendLeave { _ = p.writeMessage(&livekit.SignalResponse{ @@ -589,10 +632,6 @@ func (p *ParticipantImpl) Negotiate() { } } -func (p *ParticipantImpl) SetPreviousAnswer(previous *webrtc.SessionDescription) { - p.subscriber.SetPreviousAnswer(previous) -} - func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) { p.lock.Lock() preState := p.MigrateState() @@ -609,7 +648,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) { } p.lock.Unlock() if s == types.MigrateStateComplete { - p.handlePendingDataChannels() + p.handlePendingPublisherDataChannels() } if pendingOffer != nil { _, err := p.HandleOffer(*pendingOffer) @@ -1555,7 +1594,7 @@ func (p *ParticipantImpl) DebugInfo() map[string]interface{} { return info } -func (p *ParticipantImpl) handlePendingDataChannels() { +func (p *ParticipantImpl) handlePendingPublisherDataChannels() { ordered := true negotiated := true for _, ci := range p.pendingDataChannels { @@ -1563,6 +1602,9 @@ func (p *ParticipantImpl) handlePendingDataChannels() { dc *webrtc.DataChannel err error ) + if ci.Target == livekit.SignalTarget_SUBSCRIBER { + continue + } if ci.Label == LossyDataChannel && p.lossyDC == nil { retransmits := uint16(0) id := uint16(ci.GetId()) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 9118383a6..f7e293f34 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -334,6 +334,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) { p.OnDataPacket(nil) // close participant as well + r.Logger.Infow("closing participant for removal", "participantID", p.ID()) _ = p.Close(true) r.lock.RLock() diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 7b0cfa17d..25fb36598 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -74,6 +74,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant return nil } case *livekit.SignalRequest_Leave: + pLogger.Infow("client leaving room") _ = participant.Close(true) case *livekit.SignalRequest_SubscriptionPermission: err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index f8917316e..7cc17985d 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -154,8 +154,7 @@ type LocalParticipant interface { // session migration SetMigrateState(s MigrateState) MigrateState() MigrateState - SetMigrateInfo(mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) - SetPreviousAnswer(previous *webrtc.SessionDescription) + SetMigrateInfo(previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) UpdateRTT(rtt uint32) } diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 2e19ecd6a..0c566a43f 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -444,11 +444,12 @@ type FakeLocalParticipant struct { setMetadataArgsForCall []struct { arg1 string } - SetMigrateInfoStub func([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) + SetMigrateInfoStub func(*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) setMigrateInfoMutex sync.RWMutex setMigrateInfoArgsForCall []struct { - arg1 []*livekit.TrackPublishedResponse - arg2 []*livekit.DataChannelInfo + arg1 *webrtc.SessionDescription + arg2 []*livekit.TrackPublishedResponse + arg3 []*livekit.DataChannelInfo } SetMigrateStateStub func(types.MigrateState) setMigrateStateMutex sync.RWMutex @@ -466,11 +467,6 @@ type FakeLocalParticipant struct { setPermissionReturnsOnCall map[int]struct { result1 bool } - SetPreviousAnswerStub func(*webrtc.SessionDescription) - setPreviousAnswerMutex sync.RWMutex - setPreviousAnswerArgsForCall []struct { - arg1 *webrtc.SessionDescription - } SetResponseSinkStub func(routing.MessageSink) setResponseSinkMutex sync.RWMutex setResponseSinkArgsForCall []struct { @@ -2974,27 +2970,28 @@ func (fake *FakeLocalParticipant) SetMetadataArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 []*livekit.TrackPublishedResponse, arg2 []*livekit.DataChannelInfo) { - var arg1Copy []*livekit.TrackPublishedResponse - if arg1 != nil { - arg1Copy = make([]*livekit.TrackPublishedResponse, len(arg1)) - copy(arg1Copy, arg1) - } - var arg2Copy []*livekit.DataChannelInfo +func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 []*livekit.TrackPublishedResponse, arg3 []*livekit.DataChannelInfo) { + var arg2Copy []*livekit.TrackPublishedResponse if arg2 != nil { - arg2Copy = make([]*livekit.DataChannelInfo, len(arg2)) + arg2Copy = make([]*livekit.TrackPublishedResponse, len(arg2)) copy(arg2Copy, arg2) } + var arg3Copy []*livekit.DataChannelInfo + if arg3 != nil { + arg3Copy = make([]*livekit.DataChannelInfo, len(arg3)) + copy(arg3Copy, arg3) + } fake.setMigrateInfoMutex.Lock() fake.setMigrateInfoArgsForCall = append(fake.setMigrateInfoArgsForCall, struct { - arg1 []*livekit.TrackPublishedResponse - arg2 []*livekit.DataChannelInfo - }{arg1Copy, arg2Copy}) + arg1 *webrtc.SessionDescription + arg2 []*livekit.TrackPublishedResponse + arg3 []*livekit.DataChannelInfo + }{arg1, arg2Copy, arg3Copy}) stub := fake.SetMigrateInfoStub - fake.recordInvocation("SetMigrateInfo", []interface{}{arg1Copy, arg2Copy}) + fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2Copy, arg3Copy}) fake.setMigrateInfoMutex.Unlock() if stub != nil { - fake.SetMigrateInfoStub(arg1, arg2) + fake.SetMigrateInfoStub(arg1, arg2, arg3) } } @@ -3004,17 +3001,17 @@ func (fake *FakeLocalParticipant) SetMigrateInfoCallCount() int { return len(fake.setMigrateInfoArgsForCall) } -func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) { +func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) { fake.setMigrateInfoMutex.Lock() defer fake.setMigrateInfoMutex.Unlock() fake.SetMigrateInfoStub = stub } -func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) ([]*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) { +func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) { fake.setMigrateInfoMutex.RLock() defer fake.setMigrateInfoMutex.RUnlock() argsForCall := fake.setMigrateInfoArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeLocalParticipant) SetMigrateState(arg1 types.MigrateState) { @@ -3110,38 +3107,6 @@ func (fake *FakeLocalParticipant) SetPermissionReturnsOnCall(i int, result1 bool }{result1} } -func (fake *FakeLocalParticipant) SetPreviousAnswer(arg1 *webrtc.SessionDescription) { - fake.setPreviousAnswerMutex.Lock() - fake.setPreviousAnswerArgsForCall = append(fake.setPreviousAnswerArgsForCall, struct { - arg1 *webrtc.SessionDescription - }{arg1}) - stub := fake.SetPreviousAnswerStub - fake.recordInvocation("SetPreviousAnswer", []interface{}{arg1}) - fake.setPreviousAnswerMutex.Unlock() - if stub != nil { - fake.SetPreviousAnswerStub(arg1) - } -} - -func (fake *FakeLocalParticipant) SetPreviousAnswerCallCount() int { - fake.setPreviousAnswerMutex.RLock() - defer fake.setPreviousAnswerMutex.RUnlock() - return len(fake.setPreviousAnswerArgsForCall) -} - -func (fake *FakeLocalParticipant) SetPreviousAnswerCalls(stub func(*webrtc.SessionDescription)) { - fake.setPreviousAnswerMutex.Lock() - defer fake.setPreviousAnswerMutex.Unlock() - fake.SetPreviousAnswerStub = stub -} - -func (fake *FakeLocalParticipant) SetPreviousAnswerArgsForCall(i int) *webrtc.SessionDescription { - fake.setPreviousAnswerMutex.RLock() - defer fake.setPreviousAnswerMutex.RUnlock() - argsForCall := fake.setPreviousAnswerArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeLocalParticipant) SetResponseSink(arg1 routing.MessageSink) { fake.setResponseSinkMutex.Lock() fake.setResponseSinkArgsForCall = append(fake.setResponseSinkArgsForCall, struct { @@ -4030,8 +3995,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setMigrateStateMutex.RUnlock() fake.setPermissionMutex.RLock() defer fake.setPermissionMutex.RUnlock() - fake.setPreviousAnswerMutex.RLock() - defer fake.setPreviousAnswerMutex.RUnlock() fake.setResponseSinkMutex.RLock() defer fake.setResponseSinkMutex.RUnlock() fake.setTrackMutedMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 0b9774d04..1a685aa77 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -486,6 +486,7 @@ func (r *RoomManager) handleRTCMessage(_ context.Context, roomName livekit.RoomN } } case *livekit.RTCNodeMessage_DeleteRoom: + room.Logger.Infow("deleting room") for _, p := range room.GetParticipants() { _ = p.Close(true) }