Improve panic recovery to use participant logger. (#1375)

Also made IssueFullReconnect public
This commit is contained in:
David Zhao
2023-02-02 14:55:50 -08:00
committed by GitHub
parent 501fb0860e
commit be4764b93b
6 changed files with 58 additions and 21 deletions
+7 -7
View File
@@ -1051,7 +1051,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
p.lock.RUnlock()
if onStateChange != nil {
go func() {
defer Recover()
defer Recover(p.GetLogger())
onStateChange(p, oldState)
}()
}
@@ -1225,7 +1225,7 @@ func (p *ParticipantImpl) onAnyTransportFailed() {
// subscriberRTCPWorker sends SenderReports periodically when the participant is subscribed to
// other publishedTracks in the room.
func (p *ParticipantImpl) subscriberRTCPWorker() {
defer Recover()
defer Recover(p.GetLogger())
for {
if p.IsDisconnected() {
return
@@ -1805,7 +1805,7 @@ func (p *ParticipantImpl) getPublishedTrackBySdpCid(clientId string) types.Media
}
func (p *ParticipantImpl) publisherRTCPWorker() {
defer Recover()
defer Recover(p.GetLogger())
// read from rtcpChan
for pkts := range p.rtcpCh {
@@ -1895,7 +1895,7 @@ func (p *ParticipantImpl) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.R
return nil, sfu.DownTrackState{}
}
func (p *ParticipantImpl) issueFullReconnect(reason types.ParticipantCloseReason) {
func (p *ParticipantImpl) IssueFullReconnect(reason types.ParticipantCloseReason) {
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
@@ -1913,20 +1913,20 @@ func (p *ParticipantImpl) issueFullReconnect(reason types.ParticipantCloseReason
func (p *ParticipantImpl) onPublicationError(trackID livekit.TrackID) {
if p.params.ReconnectOnPublicationError {
p.params.Logger.Infow("issuing full reconnect on publication error", "trackID", trackID)
p.issueFullReconnect(types.ParticipantCloseReasonPublicationError)
p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError)
}
}
func (p *ParticipantImpl) onSubscriptionError(trackID livekit.TrackID) {
if p.params.ReconnectOnSubscriptionError {
p.params.Logger.Infow("issuing full reconnect on subscription error", "trackID", trackID)
p.issueFullReconnect(types.ParticipantCloseReasonPublicationError)
p.IssueFullReconnect(types.ParticipantCloseReasonPublicationError)
}
}
func (p *ParticipantImpl) onAnyTransportNegotiationFailed() {
p.params.Logger.Infow("negotiation failed, starting full reconnect")
p.issueFullReconnect(types.ParticipantCloseReasonNegotiateFailed)
p.IssueFullReconnect(types.ParticipantCloseReasonNegotiateFailed)
}
func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []types.SubscribedCodecQuality) error {
+1
View File
@@ -295,6 +295,7 @@ type LocalParticipant interface {
SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool)
SendRefreshToken(token string) error
SendReconnectResponse(reconnectResponse *livekit.ReconnectResponse) error
IssueFullReconnect(reason ParticipantCloseReason)
// callbacks
OnStateChange(func(p LocalParticipant, oldState livekit.ParticipantInfo_State))
@@ -397,6 +397,11 @@ type FakeLocalParticipant struct {
isSubscribedToReturnsOnCall map[int]struct {
result1 bool
}
IssueFullReconnectStub func(types.ParticipantCloseReason)
issueFullReconnectMutex sync.RWMutex
issueFullReconnectArgsForCall []struct {
arg1 types.ParticipantCloseReason
}
MaybeStartMigrationStub func(bool, func()) bool
maybeStartMigrationMutex sync.RWMutex
maybeStartMigrationArgsForCall []struct {
@@ -2793,6 +2798,38 @@ func (fake *FakeLocalParticipant) IsSubscribedToReturnsOnCall(i int, result1 boo
}{result1}
}
func (fake *FakeLocalParticipant) IssueFullReconnect(arg1 types.ParticipantCloseReason) {
fake.issueFullReconnectMutex.Lock()
fake.issueFullReconnectArgsForCall = append(fake.issueFullReconnectArgsForCall, struct {
arg1 types.ParticipantCloseReason
}{arg1})
stub := fake.IssueFullReconnectStub
fake.recordInvocation("IssueFullReconnect", []interface{}{arg1})
fake.issueFullReconnectMutex.Unlock()
if stub != nil {
fake.IssueFullReconnectStub(arg1)
}
}
func (fake *FakeLocalParticipant) IssueFullReconnectCallCount() int {
fake.issueFullReconnectMutex.RLock()
defer fake.issueFullReconnectMutex.RUnlock()
return len(fake.issueFullReconnectArgsForCall)
}
func (fake *FakeLocalParticipant) IssueFullReconnectCalls(stub func(types.ParticipantCloseReason)) {
fake.issueFullReconnectMutex.Lock()
defer fake.issueFullReconnectMutex.Unlock()
fake.IssueFullReconnectStub = stub
}
func (fake *FakeLocalParticipant) IssueFullReconnectArgsForCall(i int) types.ParticipantCloseReason {
fake.issueFullReconnectMutex.RLock()
defer fake.issueFullReconnectMutex.RUnlock()
argsForCall := fake.issueFullReconnectArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) MaybeStartMigration(arg1 bool, arg2 func()) bool {
fake.maybeStartMigrationMutex.Lock()
ret, specificReturn := fake.maybeStartMigrationReturnsOnCall[len(fake.maybeStartMigrationArgsForCall)]
@@ -5104,6 +5141,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.isRecorderMutex.RUnlock()
fake.isSubscribedToMutex.RLock()
defer fake.isSubscribedToMutex.RUnlock()
fake.issueFullReconnectMutex.RLock()
defer fake.issueFullReconnectMutex.RUnlock()
fake.maybeStartMigrationMutex.RLock()
defer fake.maybeStartMigrationMutex.RUnlock()
fake.migrateStateMutex.RLock()
+5 -2
View File
@@ -112,7 +112,10 @@ func RecoverSilent() {
recover()
}
func Recover() {
func Recover(l logger.Logger) {
if l == nil {
l = logger.GetLogger()
}
if r := recover(); r != nil {
var err error
switch e := r.(type) {
@@ -123,7 +126,7 @@ func Recover() {
default:
err = errors.New("unknown panic")
}
logger.Errorw("recovered panic", err, "panic", r)
l.Errorw("recovered panic", err, "panic", r)
}
}
+5 -11
View File
@@ -468,23 +468,17 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
// manages an RTC session for a participant, runs on the RTC node
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) {
defer func() {
logger.Debugw("RTC session finishing",
"participant", participant.Identity(),
"pID", participant.ID(),
"room", room.Name(),
"roomID", room.ID(),
)
requestSource.Close()
}()
defer rtc.Recover()
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()),
participant.Identity(),
participant.ID(),
false,
)
defer func() {
pLogger.Debugw("RTC session finishing")
requestSource.Close()
}()
defer rtc.Recover(pLogger)
// send first refresh for cases when client token is close to expiring
_ = r.refreshToken(participant)
+1 -1
View File
@@ -261,7 +261,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// we would terminate the signal connection as well
_ = conn.Close()
}()
defer rtc.Recover()
defer rtc.Recover(pLogger)
for {
select {
case <-done: