diff --git a/simplexmq.cabal b/simplexmq.cabal index f9998b7ac..8fc52c556 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -57,6 +57,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index d62426e7f..4f52bc3a4 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -47,6 +47,7 @@ module Simplex.Messaging.Agent ackMessageAsync, switchConnectionAsync, deleteConnectionAsync, + deleteConnectionsAsync, createConnection, joinConnection, allowConnection, @@ -113,6 +114,7 @@ import Data.Time.Clock.System (systemToUTCTime) import qualified Database.SQLite.Simple as DB import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Agent.Lock (withLock) import Simplex.Messaging.Agent.NtfSubSupervisor import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval @@ -146,7 +148,7 @@ getSMPAgentClient cfg initServers = newSMPAgentEnv cfg >>= runReaderT runAgent where runAgent = do c <- getAgentClient initServers - void $ race_ (subscriber c) (runNtfSupervisor c) `forkFinally` const (disconnectAgentClient c) + void $ raceAny_ [subscriber c, runNtfSupervisor c, cleanupManager c] `forkFinally` const (disconnectAgentClient c) pure c disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m () @@ -158,14 +160,14 @@ disconnectAgentClient c@AgentClient {agentEnv = Env {ntfSupervisor = ns}} = do resumeAgentClient :: MonadIO m => AgentClient -> m () resumeAgentClient c = atomically $ writeTVar (active c) True --- | type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m) createUser :: AgentErrorMonad m => AgentClient -> NonEmpty SMPServerWithAuth -> m UserId createUser c = withAgentEnv c . createUser' c -deleteUser :: AgentErrorMonad m => AgentClient -> UserId -> m () -deleteUser c = withAgentEnv c . deleteUser' c +-- | Delete user record optionally deleting all user's connections on SMP servers +deleteUser :: AgentErrorMonad m => AgentClient -> UserId -> Bool -> m () +deleteUser c = withAgentEnv c .: deleteUser' c -- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id createConnectionAsync :: forall m c. (AgentErrorMonad m, ConnectionModeI c) => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> m ConnId @@ -192,8 +194,12 @@ switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId - switchConnectionAsync c = withAgentEnv c .: switchConnectionAsync' c -- | Delete SMP agent connection (DEL command) asynchronously, no synchronous response -deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m () -deleteConnectionAsync c = withAgentEnv c .: deleteConnectionAsync' c +deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ConnId -> m () +deleteConnectionAsync c = withAgentEnv c . deleteConnectionAsync' c + +-- -- | Delete SMP agent connections using batch commands asynchronously, no synchronous response +deleteConnectionsAsync :: AgentErrorMonad m => AgentClient -> [ConnId] -> m () +deleteConnectionsAsync c = withAgentEnv c . deleteConnectionsAsync' c -- | Create SMP agent connection (NEW command) createConnection :: AgentErrorMonad m => AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe CRClientData -> m (ConnId, ConnectionRequestUri c) @@ -384,9 +390,11 @@ createUser' c srvs = do atomically $ TM.insert userId srvs $ smpServers c pure userId -deleteUser' :: AgentMonad m => AgentClient -> UserId -> m () -deleteUser' c userId = do - withStore c (`deleteUserRecord` userId) +deleteUser' :: AgentMonad m => AgentClient -> UserId -> Bool -> m () +deleteUser' c userId delSMPQueues = do + if delSMPQueues + then withStore c (`setUserDeleted` userId) >>= deleteConnectionsAsync_ (Just userId) c + else withStore c (`deleteUserRecord` userId) atomically $ TM.delete userId $ smpServers c newConnAsync :: forall m c. (AgentMonad m, ConnectionModeI c) => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> m ConnId @@ -450,23 +458,36 @@ ackMessageAsync' c corrId connId msgId = do (RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId -deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> m () -deleteConnectionAsync' c@AgentClient {subQ} corrId connId = withConnLock c connId "deleteConnectionAsync" $ do +deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () +deleteConnectionAsync' c connId = withConnLock c connId "deleteConnectionAsync" $ do SomeConn _ conn <- withStore c (`getConn` connId) - case conn of - DuplexConnection _ (rq :| _) _ -> enqueueDelete rq - RcvConnection _ rq -> enqueueDelete rq - ContactConnection _ rq -> enqueueDelete rq - SndConnection _ _ -> delete - NewConnection _ -> delete - where - enqueueDelete :: RcvQueue -> m () - enqueueDelete RcvQueue {server} = do - withStore' c $ \db -> setConnDeleted db connId + case connRcvQueues conn of + [] -> delete + rqs -> do + withStore' c (`setConnDeleted` connId) disableConn c connId - enqueueCommand c corrId connId (Just server) $ AInternalCommand ICDeleteConn - delete :: m () - delete = withStore' c (`deleteConn` connId) >> atomically (writeTBQueue subQ (corrId, connId, OK)) + void . forkIO $ do + forM_ rqs $ deleteConnQueue c + delete + where + delete = withStore' c (`deleteConn` connId) + +deleteConnectionsAsync' :: AgentMonad m => AgentClient -> [ConnId] -> m () +deleteConnectionsAsync' = deleteConnectionsAsync_ Nothing + +deleteConnectionsAsync_ :: forall m. AgentMonad m => Maybe UserId -> AgentClient -> [ConnId] -> m () +deleteConnectionsAsync_ userId_ c connIds = case connIds of + [] -> delUser + _ -> do + (_, rqs, connIds') <- prepareDeleteConnections_ getConn c connIds + withStore' c $ forM_ connIds' . setConnDeleted + void . forkIO $ + withLock (deleteLock c) "deleteConnectionsAsync" $ do + void $ deleteConnQueues c rqs + delUser + where + delUser :: m () + delUser = forM_ userId_ $ \userId -> withStore' c (`deleteUserWithoutConns` userId) -- | Add connection to the new receive queue switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m () @@ -851,7 +872,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO ICDeleteConn -> withServer $ \srv -> tryWithLock "ICDeleteConn" $ do - SomeConn _ conn <- withStore c $ \db -> getAnyConn db connId True + SomeConn _ conn <- withStore c $ \db -> getDeletedConn db connId case conn of DuplexConnection _ (rq :| rqs) _ -> delete srv rq $ case rqs of [] -> notify OK @@ -1196,25 +1217,36 @@ suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do NewConnection _ -> throwError $ CMD PROHIBITED -- | Delete SMP agent connection (DEL command) in Reader monad +-- unlike deleteConnectionAsync, this function does not mark connection as deleted in case of deletion failure +-- currently it is used only in tests deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do SomeConn _ conn <- withStore c (`getConn` connId) - case conn of - DuplexConnection _ rqs _ -> mapM_ (deleteConnQueue c) rqs >> disableConn c connId >> deleteConn' - RcvConnection _ rq -> delete rq - ContactConnection _ rq -> delete rq - SndConnection _ _ -> deleteConn' - NewConnection _ -> deleteConn' - where - delete :: RcvQueue -> m () - delete rq = deleteConnQueue c rq >> disableConn c connId >> deleteConn' - deleteConn' = withStore' c (`deleteConn` connId) + let rqs = connRcvQueues conn + unless (null rqs) $ do + disableConn c connId + forM_ rqs $ deleteConnQueue c + withStore' c (`deleteConn` connId) + +connRcvQueues :: Connection d -> [RcvQueue] +connRcvQueues = \case + DuplexConnection _ rqs _ -> L.toList rqs + RcvConnection _ rq -> [rq] + ContactConnection _ rq -> [rq] + SndConnection _ _ -> [] + NewConnection _ -> [] deleteConnQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () deleteConnQueue c rq = do - deleteQueue c rq - withStore' c $ \db -> deleteConnRcvQueue db rq - removeQueueSubscription c rq + maxErrs <- asks $ deleteErrorCount . config + deleteQueue c rq `catchError` handleError maxErrs + withStore' c (`deleteConnRcvQueue` rq) + where + handleError maxErrs e + | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs = do + withStore' c (`incRcvDeleteErrors` rq) + throwError e + | otherwise = pure () disableConn :: AgentMonad m => AgentClient -> ConnId -> m () disableConn c connId = do @@ -1222,31 +1254,54 @@ disableConn c connId = do ns <- asks ntfSupervisor atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) +-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure. deleteConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ())) -deleteConnections' _ [] = pure M.empty -deleteConnections' c connIds = do - conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConn) +deleteConnections' = deleteConnections_ getConn + +deleteDeletedConns :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ())) +deleteDeletedConns = deleteConnections_ getDeletedConn + +prepareDeleteConnections_ :: + forall m. + AgentMonad m => + (DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) -> + AgentClient -> + [ConnId] -> + m (Map ConnId (Either AgentErrorType ()), [RcvQueue], [ConnId]) +prepareDeleteConnections_ getConnection c connIds = do + conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConnection) let (errs, cs) = M.mapEither id conns errs' = M.map (Left . storeError) errs - (delRs, rcvQs) = M.mapEither rcvQueueOrResult cs - rcvRs <- deleteQueues c (concat $ M.elems rcvQs) - forM_ rcvRs $ \case - (rq, Right _) -> withStore' c (`deleteConnRcvQueue` rq) >> removeQueueSubscription c rq - _ -> pure () - let rs = M.unions ([errs', delRs, connResults rcvRs] :: [Map ConnId (Either AgentErrorType ())]) + (delRs, rcvQs) = M.mapEither rcvQueues cs + rqs = concat $ M.elems rcvQs + connIds' = M.keys rcvQs + forM_ connIds' $ disableConn c + withStore' c $ forM_ (M.keys delRs) . deleteConn + pure (errs' <> delRs, rqs, connIds') + where + rcvQueues :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue] + rcvQueues (SomeConn _ conn) = case connRcvQueues conn of + [] -> Left $ Right () + rqs -> Right rqs + +deleteConnQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m (Map ConnId (Either AgentErrorType ())) +deleteConnQueues c rqs = do + rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs) forM_ (M.assocs rs) $ \case - (connId, Right _) -> disableConn c connId >> withStore' c (`deleteConn` connId) + (connId, Right _) -> withStore' c (`deleteConn` connId) _ -> pure () - notifyResultError rs pure rs where - rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue] - rcvQueueOrResult (SomeConn _ conn) = case conn of - DuplexConnection _ rqs _ -> Right $ L.toList rqs - RcvConnection _ rq -> Right [rq] - ContactConnection _ rq -> Right [rq] - SndConnection _ _ -> Left $ Right () - NewConnection _ -> Left $ Right () + deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> m [(RcvQueue, Either AgentErrorType ())] + deleteQueueRecs rs = do + maxErrs <- asks $ deleteErrorCount . config + forM rs $ \(rq, r) -> do + r' <- case r of + Right _ -> withStore' c (`deleteConnRcvQueue` rq) $> r + Left e + | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> withStore' c (`incRcvDeleteErrors` rq) $> r + | otherwise -> withStore' c (`deleteConnRcvQueue` rq) $> Right () + pure (rq, r') connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ()) connResults = M.map snd . foldl' addResult M.empty where @@ -1261,6 +1316,22 @@ deleteConnections' c connIds = do order (Active, Left _) = 1 order (_, Left _) = 2 order _ = 3 + +deleteConnections_ :: + forall m. + AgentMonad m => + (DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) -> + AgentClient -> + [ConnId] -> + m (Map ConnId (Either AgentErrorType ())) +deleteConnections_ _ _ [] = pure M.empty +deleteConnections_ getConnection c connIds = do + (rs, rqs, _) <- prepareDeleteConnections_ getConnection c connIds + rcvRs <- deleteConnQueues c rqs + let rs' = M.union rs rcvRs + notifyResultError rs' + pure rs' + where notifyResultError :: Map ConnId (Either AgentErrorType ()) -> m () notifyResultError rs = do let actual = M.size rs @@ -1520,10 +1591,11 @@ execAgentStoreSQL' :: AgentMonad m => AgentClient -> Text -> m [Text] execAgentStoreSQL' c sql = withStore' c (`execSQL` sql) debugAgentLocks' :: AgentMonad m => AgentClient -> m AgentLocks -debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs} = do +debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock = d} = do connLocks <- getLocks cs srvLocks <- getLocks rs - pure AgentLocks {connLocks, srvLocks} + delLock <- atomically $ tryReadTMVar d + pure AgentLocks {connLocks, srvLocks, delLock} where getLocks ls = atomically $ M.mapKeys (B.unpack . strEncode) . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls) @@ -1557,6 +1629,17 @@ subscriber c@AgentClient {msgQ} = forever $ do Left e -> liftIO $ print e Right _ -> return () +cleanupManager :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () +cleanupManager c = do + threadDelay =<< asks (initialCleanupDelay . config) + int <- asks (cleanupInterval . config) + forever $ do + void . runExceptT $ + withLock (deleteLock c) "cleanupManager" $ do + void $ withStore' c getDeletedConns >>= deleteDeletedConns c + withStore' c deleteUsersWithoutConns + threadDelay int + processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m () processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, sessId, rId, cmd) = do (rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 045ce62df..3975c8d12 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -57,7 +57,6 @@ module Simplex.Messaging.Agent.Client deleteQueues, logServer, logSecret, - removeQueueSubscription, removeSubscription, hasActiveSubscription, agentClientStore, @@ -202,6 +201,8 @@ data AgentClient = AgentClient getMsgLocks :: TMap (SMPServer, SMP.RecipientId) (TMVar ()), -- locks to prevent concurrent operations with connection connLocks :: TMap ConnId Lock, + -- lock to prevent concurrency between periodic and async connection deletions + deleteLock :: Lock, -- locks to prevent concurrent reconnections to SMP servers reconnectLocks :: TMap SMPTransportSession Lock, reconnections :: TAsyncs, @@ -230,7 +231,7 @@ data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int} data AgentState = ASActive | ASSuspending | ASSuspended deriving (Eq, Show) -data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String} +data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String} deriving (Show, Generic) instance ToJSON AgentLocks where toEncoding = J.genericToEncoding J.defaultOptions @@ -273,12 +274,48 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do agentState <- newTVar ASActive getMsgLocks <- TM.empty connLocks <- TM.empty + deleteLock <- createLock reconnectLocks <- TM.empty reconnections <- newTAsyncs asyncClients <- newTAsyncs agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') - return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv} + return + AgentClient + { active, + rcvQ, + subQ, + msgQ, + smpServers, + smpClients, + ntfServers, + ntfClients, + useNetworkConfig, + subscrConns, + activeSubs, + pendingSubs, + pendingMsgsQueued, + smpQueueMsgQueues, + smpQueueMsgDeliveries, + connCmdsQueued, + asyncCmdQueues, + asyncCmdProcesses, + ntfNetworkOp, + rcvNetworkOp, + msgDeliveryOp, + sndNetworkOp, + databaseOp, + agentState, + getMsgLocks, + connLocks, + deleteLock, + reconnectLocks, + reconnections, + asyncClients, + agentStats, + clientId, + agentEnv + } agentClientStore :: AgentClient -> SQLiteStore agentClientStore AgentClient {agentEnv = Env {store}} = store @@ -503,7 +540,7 @@ withConnLock AgentClient {connLocks} connId name = withLockMap_ connLocks connId withLockMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> k -> String -> m a -> m a withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pure where - newLock = newEmptyTMVar >>= \l -> TM.insert key l locks $> l + newLock = createLock >>= \l -> TM.insert key l locks $> l withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient msg -> m a) -> m a withClient_ c tSess@(userId, srv, _) statCmd action = do @@ -641,7 +678,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange = do primary = True, dbReplaceQueueId = Nothing, smpClientVersion = maxVersion vRange, - clientNtfCreds = Nothing + clientNtfCreds = Nothing, + deleteErrors = 0 } pure (rq, SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey) @@ -686,6 +724,7 @@ temporaryOrHostError = \case BROKER _ HOST -> True e -> temporaryAgentError e +-- | Subscribe to queues. The list of results can have a different order. subscribeQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())] subscribeQueues c qs = do (errs, qs') <- partitionEithers <$> mapM checkQueue qs @@ -746,11 +785,6 @@ addSubscription c rq@RcvQueue {connId} = atomically $ do RQ.addQueue rq $ activeSubs c RQ.deleteQueue rq $ pendingSubs c -removeQueueSubscription :: MonadIO m => AgentClient -> RcvQueue -> m () -removeQueueSubscription c rq = atomically $ do - RQ.deleteQueue rq $ activeSubs c - RQ.deleteQueue rq $ pendingSubs c - hasActiveSubscription :: AgentClient -> ConnId -> STM Bool hasActiveSubscription c connId = RQ.hasConn connId $ activeSubs c diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index e4cc01678..b596eb374 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -88,6 +88,9 @@ data AgentConfig = AgentConfig messageRetryInterval :: RetryInterval2, messageTimeout :: NominalDiffTime, helloTimeout :: NominalDiffTime, + initialCleanupDelay :: Int, + cleanupInterval :: Int, + deleteErrorCount :: Int, ntfCron :: Word16, ntfWorkerDelay :: Int, ntfSMPWorkerDelay :: Int, @@ -145,6 +148,9 @@ defaultAgentConfig = messageRetryInterval = defaultMessageRetryInterval, messageTimeout = 2 * nominalDay, helloTimeout = 2 * nominalDay, + initialCleanupDelay = 30 * 1000000, -- 30 seconds + cleanupInterval = 30 * 60 * 1000000, -- 30 minutes + deleteErrorCount = 10, ntfCron = 20, -- minutes ntfWorkerDelay = 100000, -- microseconds ntfSMPWorkerDelay = 500000, -- microseconds diff --git a/src/Simplex/Messaging/Agent/Lock.hs b/src/Simplex/Messaging/Agent/Lock.hs index eca04d0aa..06b8a9efd 100644 --- a/src/Simplex/Messaging/Agent/Lock.hs +++ b/src/Simplex/Messaging/Agent/Lock.hs @@ -10,6 +10,10 @@ import UnliftIO.STM type Lock = TMVar String +createLock :: STM Lock +createLock = newEmptyTMVar +{-# INLINE createLock #-} + withLock :: MonadUnliftIO m => TMVar String -> String -> m a -> m a withLock lock name = E.bracket_ diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 7f9f8bb90..6b8f772e8 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -74,7 +74,8 @@ data RcvQueue = RcvQueue -- | SMP client version smpClientVersion :: Version, -- | credentials used in context of notifications - clientNtfCreds :: Maybe ClientNtfCreds + clientNtfCreds :: Maybe ClientNtfCreds, + deleteErrors :: Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index ff9eb6329..2196ef512 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -30,6 +30,10 @@ module Simplex.Messaging.Agent.Store.SQLite -- * Users createUserRecord, deleteUserRecord, + setUserDeleted, + deleteUserWithoutConns, + deleteUsersWithoutConns, + checkUser, -- * Queues and connections createNewConn, @@ -38,9 +42,10 @@ module Simplex.Messaging.Agent.Store.SQLite createRcvConn, createSndConn, getConn, - getAnyConn, + getDeletedConn, getConnData, setConnDeleted, + getDeletedConns, getRcvConn, deleteConn, upgradeRcvConnToDuplex, @@ -53,6 +58,7 @@ module Simplex.Messaging.Agent.Store.SQLite setRcvQueuePrimary, setSndQueuePrimary, deleteConnRcvQueue, + incRcvDeleteErrors, deleteConnSndQueue, getPrimaryRcvQueue, getRcvQueue, @@ -306,13 +312,46 @@ createUserRecord db = do DB.execute_ db "INSERT INTO users DEFAULT VALUES" insertedRowId db +checkUser :: DB.Connection -> UserId -> IO (Either StoreError ()) +checkUser db userId = + firstRow (\(_ :: Only Int64) -> ()) SEUserNotFound $ + DB.query db "SELECT user_id FROM users WHERE user_id = ? AND deleted = ?" (userId, False) + deleteUserRecord :: DB.Connection -> UserId -> IO (Either StoreError ()) deleteUserRecord db userId = runExceptT $ do - _ :: Only Int64 <- - ExceptT . firstRow id SEUserNotFound $ - DB.query db "SELECT user_id FROM users WHERE user_id = ?" (Only userId) + ExceptT $ checkUser db userId liftIO $ DB.execute db "DELETE FROM users WHERE user_id = ?" (Only userId) +setUserDeleted :: DB.Connection -> UserId -> IO (Either StoreError [ConnId]) +setUserDeleted db userId = runExceptT $ do + ExceptT $ checkUser db userId + liftIO $ do + DB.execute db "UPDATE users SET deleted = ? WHERE user_id = ?" (True, userId) + map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE user_id = ?" (Only userId) + +deleteUserWithoutConns :: DB.Connection -> UserId -> IO () +deleteUserWithoutConns db userId = + DB.execute + db + [sql| + DELETE FROM users u + WHERE user_id = ? + AND u.deleted = ? + AND NOT EXISTS (SELECT * FROM connections WHERE user_id = u.user_id) + |] + (userId, True) + +deleteUsersWithoutConns :: DB.Connection -> IO () +deleteUsersWithoutConns db = + DB.execute + db + [sql| + DELETE FROM users u + WHERE u.deleted = ? + AND NOT EXISTS (SELECT * FROM connections WHERE user_id = u.user_id) + |] + (Only True) + createConn_ :: TVar ChaChaDRG -> ConnData -> @@ -471,6 +510,10 @@ setSndQueuePrimary db connId SndQueue {dbQueueId} = do "UPDATE snd_queues SET snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?" (True, Nothing :: Maybe Int64, connId, dbQueueId) +incRcvDeleteErrors :: DB.Connection -> RcvQueue -> IO () +incRcvDeleteErrors db RcvQueue {connId, dbQueueId} = + DB.execute db "UPDATE rcv_queues SET delete_errors = delete_errors + 1 WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId) + deleteConnRcvQueue :: DB.Connection -> RcvQueue -> IO () deleteConnRcvQueue db RcvQueue {connId, dbQueueId} = DB.execute db "DELETE FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId) @@ -1330,10 +1373,13 @@ newQueueId_ (Only maxId : _) = maxId + 1 -- * getConn helpers getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) -getConn db connId = getAnyConn db connId False +getConn = getAnyConn False -getAnyConn :: DB.Connection -> ConnId -> Bool -> IO (Either StoreError SomeConn) -getAnyConn dbConn connId deleted' = +getDeletedConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) +getDeletedConn = getAnyConn True + +getAnyConn :: Bool -> DB.Connection -> ConnId -> IO (Either StoreError SomeConn) +getAnyConn deleted' dbConn connId = getConnData dbConn connId >>= \case Nothing -> pure $ Left SEConnNotFound Just (cData@ConnData {deleted}, cMode) @@ -1358,6 +1404,9 @@ getConnData dbConn connId' = setConnDeleted :: DB.Connection -> ConnId -> IO () setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId) +getDeletedConns :: DB.Connection -> IO [ConnId] +getDeletedConns db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True) + -- | returns all connection queues, the first queue is the primary one getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue)) getRcvQueuesByConnId_ db connId = @@ -1373,7 +1422,7 @@ rcvQueueQuery = [sql| SELECT c.user_id, s.key_hash, q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, - q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version, + q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version, q.delete_errors, q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret FROM rcv_queues q JOIN servers s ON q.host = s.host AND q.port = s.port @@ -1382,16 +1431,16 @@ rcvQueueQuery = toRcvQueue :: (UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus) - :. (Int64, Bool, Maybe Int64, Maybe Version) + :. (Int64, Bool, Maybe Int64, Maybe Version, Int) :. (Maybe SMP.NtfPublicVerifyKey, Maybe SMP.NtfPrivateSignKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) -> RcvQueue -toRcvQueue ((userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = +toRcvQueue ((userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_, deleteErrors) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = let server = SMPServer host port keyHash smpClientVersion = fromMaybe 1 smpClientVersion_ clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} _ -> Nothing - in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds} + in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds, deleteErrors} getRcvQueueById_ :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue) getRcvQueueById_ db connId dbRcvId = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 994f16590..e7b2322a2 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -39,6 +39,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport.Client (TransportHost) @@ -57,7 +58,8 @@ schemaMigrations = ("m20220905_commands", m20220905_commands), ("m20220915_connection_queues", m20220915_connection_queues), ("m20230110_users", m20230110_users), - ("m20230117_fkey_indexes", m20230117_fkey_indexes) + ("m20230117_fkey_indexes", m20230117_fkey_indexes), + ("m20230120_delete_errors", m20230120_delete_errors) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230120_delete_errors.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230120_delete_errors.hs new file mode 100644 index 000000000..918d2dc79 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230120_delete_errors.hs @@ -0,0 +1,20 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20230120_delete_errors :: Query +m20230120_delete_errors = + [sql| +PRAGMA ignore_check_constraints=ON; + +ALTER TABLE rcv_queues ADD COLUMN delete_errors INTEGER DEFAULT 0 CHECK (delete_errors NOT NULL); +UPDATE rcv_queues SET delete_errors = 0; + +ALTER TABLE users ADD COLUMN deleted INTEGER DEFAULT 0 CHECK (deleted NOT NULL); +UPDATE users SET deleted = 0; + +PRAGMA ignore_check_constraints=OFF; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 9f1c50e9f..215e410a8 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -47,6 +47,7 @@ CREATE TABLE rcv_queues( rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL), rcv_primary INTEGER CHECK(rcv_primary NOT NULL), replace_rcv_queue_id INTEGER NULL, + delete_errors INTEGER DEFAULT 0 CHECK(delete_errors NOT NULL), PRIMARY KEY(host, port, rcv_id), FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE, @@ -230,7 +231,11 @@ CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries( conn_id, snd_queue_id ); -CREATE TABLE users(user_id INTEGER PRIMARY KEY AUTOINCREMENT); +CREATE TABLE users( + user_id INTEGER PRIMARY KEY AUTOINCREMENT + , + deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL) +); CREATE INDEX idx_connections_user ON connections(user_id); CREATE INDEX idx_commands_conn_id ON commands(conn_id); CREATE INDEX idx_commands_host_port ON commands(host, port); diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 07285fd63..e52c151d0 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -670,8 +670,9 @@ testAsyncCommands = do get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False ackMessageAsync alice "7" bobId $ baseId + 4 ("7", _, OK) <- get alice - deleteConnectionAsync alice "8" bobId - ("8", _, OK) <- get alice + deleteConnectionAsync alice bobId + -- deleteConnectionAsync alice "8" bobId + -- ("8", _, OK) <- get alice liftIO $ noMessages alice "nothing else should be delivered to alice" where baseId = 3 @@ -810,9 +811,11 @@ testSwitchDelete servers = do disconnectAgentClient b switchConnectionAsync a "" bId phase a bId QDRcv SPStarted - deleteConnectionAsync a "1" bId - ("1", bId', OK) <- get a - liftIO $ bId `shouldBe` bId' + deleteConnectionAsync a bId + +-- deleteConnectionAsync a "1" bId +-- ("1", bId', OK) <- get a +-- liftIO $ bId `shouldBe` bId' testCreateQueueAuth :: (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int testCreateQueueAuth clnt1 clnt2 = do diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 1636571b6..a43196148 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -168,7 +168,8 @@ rcvQueue1 = primary = True, dbReplaceQueueId = Nothing, smpClientVersion = 1, - clientNtfCreds = Nothing + clientNtfCreds = Nothing, + deleteErrors = 0 } sndQueue1 :: SndQueue @@ -357,7 +358,8 @@ testUpgradeSndConnToDuplex = primary = True, dbReplaceQueueId = Nothing, smpClientVersion = 1, - clientNtfCreds = Nothing + clientNtfCreds = Nothing, + deleteErrors = 0 } upgradeSndConnToDuplex db "conn1" anotherRcvQueue `shouldReturn` Left (SEBadConnType CRcv)