core: support closing/re-opening store on chat stop/start (#3132)

* core: support closing/re-opening store on chat stop/start

* update .nix refs

* kotlin: types

* add test

* update simplexmq
This commit is contained in:
Evgeny Poberezkin
2023-09-27 15:26:03 +01:00
committed by GitHub
parent 8709ad6ff3
commit 3c7fc6b0ee
11 changed files with 69 additions and 31 deletions
+26 -13
View File
@@ -83,7 +83,7 @@ import Simplex.Messaging.Agent.Client (AgentStatsKey (..), SubInfo (..), agentCl
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore, defaultAgentConfig)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration, withConnection)
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration, withConnection, closeSQLiteStore, openSQLiteStore)
import Simplex.Messaging.Agent.Store.SQLite.DB (SlowQueryStats (..))
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
@@ -249,9 +249,13 @@ cfgServers p s = case p of
SPSMP -> s.smp
SPXFTP -> s.xftp
startChatController :: forall m. ChatMonad' m => Bool -> Bool -> Bool -> m (Async ())
startChatController subConns enableExpireCIs startXFTPWorkers = do
asks smpAgent >>= resumeAgentClient
startChatController :: forall m. ChatMonad' m => ChatCtrlCfg -> m (Async ())
startChatController ChatCtrlCfg {subConns, enableExpireCIs, startXFTPWorkers, openDBWithKey} = do
ChatController {chatStore, smpAgent} <- ask
forM_ openDBWithKey $ \(DBEncryptionKey dbKey) -> liftIO $ do
openSQLiteStore chatStore dbKey
openSQLiteStore (agentClientStore smpAgent) dbKey
resumeAgentClient smpAgent
unless subConns $
chatWriteVar subscriptionMode SMOnlyCreate
users <- fromRight [] <$> runExceptT (withStoreCtx' (Just "startChatController, getUsers") getUsers)
@@ -323,8 +327,8 @@ restoreCalls = do
calls <- asks currentCalls
atomically $ writeTVar calls callsMap
stopChatController :: forall m. MonadUnliftIO m => ChatController -> m ()
stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIFlags} = do
stopChatController :: forall m. MonadUnliftIO m => ChatController -> Bool -> m ()
stopChatController ChatController {chatStore, smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIFlags} closeStore = do
disconnectAgentClient smpAgent
readTVarIO s >>= mapM_ (\(a1, a2) -> uninterruptibleCancel a1 >> mapM_ uninterruptibleCancel a2)
closeFiles sndFiles
@@ -333,6 +337,9 @@ stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles,
keys <- M.keys <$> readTVar expireCIFlags
forM_ keys $ \k -> TM.insert k False expireCIFlags
writeTVar s Nothing
when closeStore $ liftIO $ do
closeSQLiteStore chatStore
closeSQLiteStore $ agentClientStore smpAgent
where
closeFiles :: TVar (Map Int64 Handle) -> m ()
closeFiles files = do
@@ -462,12 +469,12 @@ processChatCommand = \case
checkDeleteChatUser user'
withChatLock "deleteUser" . procCmd $ deleteChatUser user' delSMPQueues
DeleteUser uName delSMPQueues viewPwd_ -> withUserName uName $ \userId -> APIDeleteUser userId delSMPQueues viewPwd_
StartChat subConns enableExpireCIs startXFTPWorkers -> withUser' $ \_ ->
APIStartChat cfg -> withUser' $ \_ ->
asks agentAsync >>= readTVarIO >>= \case
Just _ -> pure CRChatRunning
_ -> checkStoreNotChanged $ startChatController subConns enableExpireCIs startXFTPWorkers $> CRChatStarted
APIStopChat -> do
ask >>= stopChatController
_ -> checkStoreNotChanged $ startChatController cfg $> CRChatStarted
APIStopChat closeStore -> do
ask >>= (`stopChatController` closeStore)
pure CRChatStopped
APIActivateChat -> withUser $ \_ -> do
restoreCalls
@@ -5379,9 +5386,9 @@ chatCommandP =
"/_delete user " *> (APIDeleteUser <$> A.decimal <* " del_smp=" <*> onOffP <*> optional (A.space *> jsonP)),
"/delete user " *> (DeleteUser <$> displayName <*> pure True <*> optional (A.space *> pwdP)),
("/user" <|> "/u") $> ShowActiveUser,
"/_start subscribe=" *> (StartChat <$> onOffP <* " expire=" <*> onOffP <* " xftp=" <*> onOffP),
"/_start" $> StartChat True True True,
"/_stop" $> APIStopChat,
"/_start" *> (APIStartChat <$> ((A.space *> jsonP) <|> chatCtrlCfgP)),
"/_stop close" $> APIStopChat {closeStore = True},
"/_stop" $> APIStopChat False,
"/_app activate" $> APIActivateChat,
"/_app suspend " *> (APISuspendChat <$> A.decimal),
"/_resubscribe all" $> ResubscribeAllConnections,
@@ -5609,6 +5616,12 @@ chatCommandP =
]
where
choice = A.choice . map (\p -> p <* A.takeWhile (== ' ') <* A.endOfInput)
chatCtrlCfgP = do
subConns <- (" subscribe=" *> onOffP) <|> pure True
enableExpireCIs <- (" expire=" *> onOffP) <|> pure True
startXFTPWorkers <- (" xftp=" *> onOffP) <|> pure True
openDBWithKey <- optional $ " key=" *> dbKeyP
pure ChatCtrlCfg {subConns, enableExpireCIs, startXFTPWorkers, openDBWithKey}
incognitoP = (A.space *> ("incognito" <|> "i")) $> True <|> pure False
incognitoOnOffP = (A.space *> "incognito=" *> onOffP) <|> pure False
imagePrefix = (<>) <$> "data:" <*> ("image/png;base64," <|> "image/jpg;base64,")