diff --git a/CHANGELOG.md b/CHANGELOG.md index f0093ba92..e2a7bb5b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# 5.7.4 + +SMP agent: +- remove re-subscription timeouts (as they are tracked per operation, and could cause failed subscriptions). +- reconnect XFTP clients when network settings changes. +- fix lock contention resulting in stuck subscriptions on network change. + # 5.7.3 SMP/NTF protocol: diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index addd889a8..7d07e7366 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -702,12 +702,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = atomically $ putTMVar (sessionVar v) a runSubWorker = do ri <- asks $ reconnectInterval . config - timeoutCounts <- newTVarIO 0 withRetryInterval ri $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do - lift $ waitForUserNetwork c - void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs + liftIO $ waitForUserNetwork c + reconnectSMPClient c tSess qs loop getPending = RQ.getSessQueues tSess $ pendingSubs c cleanup :: SessionVar (Async ()) -> STM () @@ -717,41 +716,26 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = whenM (isEmptyTMVar $ sessionVar v) retry removeSessVar v tSess smpSubWorkers -reconnectSMPClient :: TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM () -reconnectSMPClient tc c tSess@(_, srv, _) qs = do - NetworkConfig {tcpTimeout} <- atomically $ getNetworkConfig c - -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) - let t = (length qs `div` 90 + 1) * tcpTimeout * 3 - ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case - Just _ -> resetTimeouts - -- reset and do not report consecutive timeouts while offline - Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts +reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM' () +reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do + cs <- readTVarIO $ RQ.getConnections $ activeSubs c + (rs, sessId_) <- subscribeQueues c $ L.toList qs + let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs + conns = filter (`M.notMember` cs) okConns + unless (null conns) $ notifySub "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs + mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + forM_ (listToMaybe tempErrs) $ \(connId, e) -> do + when (null okConns && M.null cs && null finalErrs) . liftIO $ + forM_ sessId_ $ \sessId -> do + -- We only close the client session that was used to subscribe. + v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) + mapM_ (closeClient_ c) v_ + notifySub connId $ ERR e where - resetTimeouts = atomically $ writeTVar tc 0 - notifyTimeout = do - tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) - maxTC <- asks $ maxSubscriptionTimeouts . config - when (tc' >= maxTC) $ do - let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess - atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) - resubscribe :: AM () - resubscribe = do - cs <- readTVarIO $ RQ.getConnections $ activeSubs c - (rs, sessId_) <- lift . subscribeQueues c $ L.toList qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - liftIO $ do - let conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - forM_ (listToMaybe tempErrs) $ \(_, err) -> do - when (null okConns && M.null cs && null finalErrs) . liftIO $ - forM_ sessId_ $ \sessId -> do - -- We only close the client session that was used to subscribe. - v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) - mapM_ (closeClient_ c) v_ - throwError err - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () + handleNotify :: AM' () -> AM' () + handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show + notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 9613adf3c..b753a4226 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -102,7 +102,6 @@ data AgentConfig = AgentConfig cleanupInterval :: Int64, cleanupStepInterval :: Int, maxWorkerRestartsPerMin :: Int, - maxSubscriptionTimeouts :: Int, storedMsgDataTTL :: NominalDiffTime, rcvFilesTTL :: NominalDiffTime, sndFilesTTL :: NominalDiffTime, @@ -173,9 +172,6 @@ defaultAgentConfig = cleanupInterval = 30 * 60 * 1000000, -- 30 minutes cleanupStepInterval = 200000, -- 200ms maxWorkerRestartsPerMin = 5, - -- 5 consecutive subscription timeouts will result in alert to the user - -- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking. - maxSubscriptionTimeouts = 5, storedMsgDataTTL = 21 * nominalDay, rcvFilesTTL = 2 * nominalDay, sndFilesTTL = nominalDay, diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 65713aaa4..7a3077532 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -95,23 +95,24 @@ type AEntityTransmission p e = (ACorrId, ConnId, ACommand p e) type AEntityTransmissionOrError p e = (ACorrId, ConnId, Either AgentErrorType (ACommand p e)) tGetAgent :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn) -tGetAgent = tGetAgent' +tGetAgent = tGetAgent' True -tGetAgent' :: forall c e. (Transport c, AEntityI e) => c -> IO (AEntityTransmissionOrError 'Agent e) -tGetAgent' h = do - (corrId, connId, cmdOrErr) <- pGetAgent h +tGetAgent' :: forall c e. (Transport c, AEntityI e) => Bool -> c -> IO (AEntityTransmissionOrError 'Agent e) +tGetAgent' skipErr h = do + (corrId, connId, cmdOrErr) <- pGetAgent skipErr h case cmdOrErr of Right (APC e cmd) -> case testEquality e (sAEntity @e) of Just Refl -> pure (corrId, connId, Right cmd) _ -> error $ "unexpected command " <> show cmd Left err -> pure (corrId, connId, Left err) -pGetAgent :: forall c. Transport c => c -> IO (ATransmissionOrError 'Agent) -pGetAgent h = do +pGetAgent :: forall c. Transport c => Bool -> c -> IO (ATransmissionOrError 'Agent) +pGetAgent skipErr h = do (corrId, connId, cmdOrErr) <- tGet SAgent h case cmdOrErr of - Right (APC _ CONNECT {}) -> pGetAgent h - Right (APC _ DISCONNECT {}) -> pGetAgent h + Right (APC _ CONNECT {}) -> pGetAgent skipErr h + Right (APC _ DISCONNECT {}) -> pGetAgent skipErr h + Right (APC _ (ERR (BROKER _ NETWORK))) | skipErr -> pGetAgent skipErr h cmd -> pure (corrId, connId, cmd) -- | receive message to handle `h` @@ -119,15 +120,18 @@ pGetAgent h = do (<#:) = tGetAgent (<#:?) :: Transport c => c -> IO (ATransmissionOrError 'Agent) -(<#:?) = pGetAgent +(<#:?) = pGetAgent True (<#:.) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AENone) -(<#:.) = tGetAgent' +(<#:.) = tGetAgent' True -- | send transmission `t` to handle `h` and get response (#:) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn) h #: t = tPutRaw h t >> (<#:) h +(#:!) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn) +h #:! t = tPutRaw h t >> tGetAgent' False h + -- | action and expected response -- `h #:t #> r` is the test that sends `t` to `h` and validates that the response is `r` (#>) :: IO (AEntityTransmissionOrError 'Agent 'AEConn) -> AEntityTransmission 'Agent 'AEConn -> Expectation @@ -426,8 +430,8 @@ testServerConnectionAfterError t _ = do withAgent1 $ \bob -> do withAgent2 $ \alice -> do - bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT - alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT + bob #:! ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT + alice #:! ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT withServer $ do alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index d7bddd9ec..ef283a7d9 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -145,6 +145,7 @@ pGet c = do case cmd of CONNECT {} -> pGet c DISCONNECT {} -> pGet c + ERR (BROKER _ NETWORK) -> pGet c _ -> pure t pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand 'Agent e