mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-04-01 00:56:05 +00:00
ios, core: better notifications processing to avoid contention for database (#3485)
* core: forward notifications about message processing (for iOS notifications) * simplexmq * the option to keep database key, to allow re-opening the database * export new init with keepKey and reopen DB api * stop remote ctrl when suspending chat * ios: close/re-open db on suspend/activate * allow activating chat without restoring (for NSE) * update NSE to suspend/activate (does not work) * simplexmq * suspend chat and close database when last notification in the process is processed * stop reading notifications on message markers * replace async stream with cancellable concurrent queue * better synchronization of app and NSE * remove outside of task * remove unused var * whitespace * more debug logging, handle cancelled read after dequeue * comments * more comments
This commit is contained in:
committed by
GitHub
parent
2f7632a70f
commit
d3059afc99
@@ -9,7 +9,6 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
@@ -28,6 +27,8 @@ import qualified Data.Aeson as J
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (bimap, first)
|
||||
import Data.ByteArray (ScrubbedBytes)
|
||||
import qualified Data.ByteArray as BA
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
@@ -50,7 +51,7 @@ import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
|
||||
import Data.Time (NominalDiffTime, addUTCTime, defaultTimeLocale, formatTime)
|
||||
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDay, nominalDiffTimeToSeconds)
|
||||
import Data.Time.Clock.System (SystemTime, systemToUTCTime)
|
||||
import Data.Time.Clock.System (systemToUTCTime)
|
||||
import Data.Word (Word16, Word32)
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Simplex.Chat.Archive
|
||||
@@ -191,10 +192,10 @@ smallGroupsRcptsMemLimit = 20
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
createChatDatabase :: FilePath -> String -> MigrationConfirmation -> IO (Either MigrationError ChatDatabase)
|
||||
createChatDatabase filePrefix key confirmMigrations = runExceptT $ do
|
||||
chatStore <- ExceptT $ createChatStore (chatStoreFile filePrefix) key confirmMigrations
|
||||
agentStore <- ExceptT $ createAgentStore (agentStoreFile filePrefix) key confirmMigrations
|
||||
createChatDatabase :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError ChatDatabase)
|
||||
createChatDatabase filePrefix key keepKey confirmMigrations = runExceptT $ do
|
||||
chatStore <- ExceptT $ createChatStore (chatStoreFile filePrefix) key keepKey confirmMigrations
|
||||
agentStore <- ExceptT $ createAgentStore (agentStoreFile filePrefix) key keepKey confirmMigrations
|
||||
pure ChatDatabase {chatStore, agentStore}
|
||||
|
||||
newChatController :: ChatDatabase -> Maybe User -> ChatConfig -> ChatOpts -> IO ChatController
|
||||
@@ -538,16 +539,18 @@ processChatCommand = \case
|
||||
APIStopChat -> do
|
||||
ask >>= stopChatController
|
||||
pure CRChatStopped
|
||||
APIActivateChat -> withUser $ \_ -> do
|
||||
restoreCalls
|
||||
APIActivateChat restoreChat -> withUser $ \_ -> do
|
||||
when restoreChat restoreCalls
|
||||
withAgent foregroundAgent
|
||||
users <- withStoreCtx' (Just "APIActivateChat, getUsers") getUsers
|
||||
void . forkIO $ subscribeUsers True users
|
||||
void . forkIO $ startFilesToReceive users
|
||||
setAllExpireCIFlags True
|
||||
when restoreChat $ do
|
||||
users <- withStoreCtx' (Just "APIActivateChat, getUsers") getUsers
|
||||
void . forkIO $ subscribeUsers True users
|
||||
void . forkIO $ startFilesToReceive users
|
||||
setAllExpireCIFlags True
|
||||
ok_
|
||||
APISuspendChat t -> do
|
||||
setAllExpireCIFlags False
|
||||
stopRemoteCtrl
|
||||
withAgent (`suspendAgent` t)
|
||||
ok_
|
||||
ResubscribeAllConnections -> withStoreCtx' (Just "ResubscribeAllConnections, getUsers") getUsers >>= subscribeUsers False >> ok_
|
||||
@@ -1172,16 +1175,13 @@ processChatCommand = \case
|
||||
APIDeleteToken token -> withUser $ \_ -> withAgent (`deleteNtfToken` token) >> ok_
|
||||
APIGetNtfMessage nonce encNtfInfo -> withUser $ \_ -> do
|
||||
(NotificationInfo {ntfConnId, ntfMsgMeta}, msgs) <- withAgent $ \a -> getNotificationMessage a nonce encNtfInfo
|
||||
let ntfMessages = map (\SMP.SMPMsgMeta {msgTs, msgFlags} -> NtfMsgInfo {msgTs = systemToUTCTime msgTs, msgFlags}) msgs
|
||||
getMsgTs :: SMP.NMsgMeta -> SystemTime
|
||||
getMsgTs SMP.NMsgMeta {msgTs} = msgTs
|
||||
msgTs' = systemToUTCTime . getMsgTs <$> ntfMsgMeta
|
||||
let msgTs' = systemToUTCTime . (\SMP.NMsgMeta {msgTs} -> msgTs) <$> ntfMsgMeta
|
||||
agentConnId = AgentConnId ntfConnId
|
||||
user_ <- withStore' (`getUserByAConnId` agentConnId)
|
||||
connEntity <-
|
||||
connEntity_ <-
|
||||
pure user_ $>>= \user ->
|
||||
withStore (\db -> Just <$> getConnectionEntity db user agentConnId) `catchChatError` (\e -> toView (CRChatError (Just user) e) $> Nothing)
|
||||
pure CRNtfMessages {user_, connEntity, msgTs = msgTs', ntfMessages}
|
||||
pure CRNtfMessages {user_, connEntity_, msgTs = msgTs', ntfMessages = map ntfMsgInfo msgs}
|
||||
APIGetUserProtoServers userId (AProtocolType p) -> withUserId userId $ \user -> withServerProtocol p $ do
|
||||
ChatConfig {defaultServers} <- asks config
|
||||
servers <- withStore' (`getProtocolServers` user)
|
||||
@@ -3227,23 +3227,24 @@ processAgentMsgRcvFile _corrId aFileId msg =
|
||||
toView $ CRRcvFileError user ci e
|
||||
|
||||
processAgentMessageConn :: forall m. ChatMonad m => User -> ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
|
||||
processAgentMessageConn user _ agentConnId END =
|
||||
withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= \case
|
||||
RcvDirectMsgConnection _ (Just ct) -> toView $ CRContactAnotherClient user ct
|
||||
entity -> toView $ CRSubscriptionEnd user entity
|
||||
processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
|
||||
entity <- withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus
|
||||
case entity of
|
||||
RcvDirectMsgConnection conn contact_ ->
|
||||
processDirectMessage agentMessage entity conn contact_
|
||||
RcvGroupMsgConnection conn gInfo m ->
|
||||
processGroupMessage agentMessage entity conn gInfo m
|
||||
RcvFileConnection conn ft ->
|
||||
processRcvFileConn agentMessage entity conn ft
|
||||
SndFileConnection conn ft ->
|
||||
processSndFileConn agentMessage entity conn ft
|
||||
UserContactConnection conn uc ->
|
||||
processUserContactRequest agentMessage entity conn uc
|
||||
case agentMessage of
|
||||
END -> case entity of
|
||||
RcvDirectMsgConnection _ (Just ct) -> toView $ CRContactAnotherClient user ct
|
||||
_ -> toView $ CRSubscriptionEnd user entity
|
||||
MSGNTF smpMsgInfo -> toView $ CRNtfMessage user entity $ ntfMsgInfo smpMsgInfo
|
||||
_ -> case entity of
|
||||
RcvDirectMsgConnection conn contact_ ->
|
||||
processDirectMessage agentMessage entity conn contact_
|
||||
RcvGroupMsgConnection conn gInfo m ->
|
||||
processGroupMessage agentMessage entity conn gInfo m
|
||||
RcvFileConnection conn ft ->
|
||||
processRcvFileConn agentMessage entity conn ft
|
||||
SndFileConnection conn ft ->
|
||||
processSndFileConn agentMessage entity conn ft
|
||||
UserContactConnection conn uc ->
|
||||
processUserContactRequest agentMessage entity conn uc
|
||||
where
|
||||
updateConnStatus :: ConnectionEntity -> m ConnectionEntity
|
||||
updateConnStatus acEntity = case agentMsgConnStatus agentMessage of
|
||||
@@ -5959,7 +5960,8 @@ chatCommandP =
|
||||
"/_start subscribe=" *> (StartChat <$> onOffP <* " expire=" <*> onOffP <* " xftp=" <*> onOffP),
|
||||
"/_start" $> StartChat True True True,
|
||||
"/_stop" $> APIStopChat,
|
||||
"/_app activate" $> APIActivateChat,
|
||||
"/_app activate restore=" *> (APIActivateChat <$> onOffP),
|
||||
"/_app activate" $> APIActivateChat True,
|
||||
"/_app suspend " *> (APISuspendChat <$> A.decimal),
|
||||
"/_resubscribe all" $> ResubscribeAllConnections,
|
||||
"/_temp_folder " *> (SetTempFolder <$> filePath),
|
||||
@@ -5974,9 +5976,9 @@ chatCommandP =
|
||||
"/_db import " *> (APIImportArchive <$> jsonP),
|
||||
"/_db delete" $> APIDeleteStorage,
|
||||
"/_db encryption " *> (APIStorageEncryption <$> jsonP),
|
||||
"/db encrypt " *> (APIStorageEncryption . DBEncryptionConfig "" <$> dbKeyP),
|
||||
"/db key " *> (APIStorageEncryption <$> (DBEncryptionConfig <$> dbKeyP <* A.space <*> dbKeyP)),
|
||||
"/db decrypt " *> (APIStorageEncryption . (`DBEncryptionConfig` "") <$> dbKeyP),
|
||||
"/db encrypt " *> (APIStorageEncryption . dbEncryptionConfig "" <$> dbKeyP),
|
||||
"/db key " *> (APIStorageEncryption <$> (dbEncryptionConfig <$> dbKeyP <* A.space <*> dbKeyP)),
|
||||
"/db decrypt " *> (APIStorageEncryption . (`dbEncryptionConfig` "") <$> dbKeyP),
|
||||
"/sql chat " *> (ExecChatStoreSQL <$> textP),
|
||||
"/sql agent " *> (ExecAgentStoreSQL <$> textP),
|
||||
"/sql slow" $> SlowSQLQueries,
|
||||
@@ -6317,7 +6319,8 @@ chatCommandP =
|
||||
A.decimal
|
||||
]
|
||||
dbKeyP = nonEmptyKey <$?> strP
|
||||
nonEmptyKey k@(DBEncryptionKey s) = if null s then Left "empty key" else Right k
|
||||
nonEmptyKey k@(DBEncryptionKey s) = if BA.null s then Left "empty key" else Right k
|
||||
dbEncryptionConfig currentKey newKey = DBEncryptionConfig {currentKey, newKey, keepKey = Just False}
|
||||
autoAcceptP =
|
||||
ifM
|
||||
onOffP
|
||||
|
||||
Reference in New Issue
Block a user