agent: notify about polled message processing (for iOS notifications) (#908)

* agent: notify about polled message processing (for iOS notifications)

* optionally keep key and support re-opening database

* exports

* test that cannot reopen when created with keepKey: false

* set max number of messages to receive for a notification to 3
This commit is contained in:
Evgeny Poberezkin
2023-12-11 13:03:53 +00:00
committed by GitHub
parent a860936072
commit 560dc55312
13 changed files with 145 additions and 83 deletions
+18 -17
View File
@@ -849,23 +849,21 @@ getNotificationMessage' c nonce encNtfInfo = do
(ntfConnId, rcvNtfDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue)
ntfMsgMeta <- (eitherToMaybe . smpDecode <$> agentCbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta) `catchAgentError` \_ -> pure Nothing
maxMsgs <- asks $ ntfMaxMessages . config
(NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta},) <$> getNtfMessages ntfConnId maxMsgs ntfMsgMeta []
(NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta},) <$> getNtfMessages ntfConnId ntfMsgMeta maxMsgs
_ -> throwError $ CMD PROHIBITED
where
getNtfMessages ntfConnId maxMs nMeta ms
| length ms < maxMs =
getConnectionMessage' c ntfConnId >>= \case
Just m@SMP.SMPMsgMeta {msgId, msgTs, msgFlags} -> case nMeta of
Just SMP.NMsgMeta {msgId = msgId', msgTs = msgTs'}
| msgId == msgId' || msgTs > msgTs' -> pure $ reverse (m : ms)
| otherwise -> getMsg (m : ms)
_
| SMP.notification msgFlags -> pure $ reverse (m : ms)
| otherwise -> getMsg (m : ms)
_ -> pure $ reverse ms
| otherwise = pure $ reverse ms
getNtfMessages ntfConnId nMeta = getMsg
where
getMsg = getNtfMessages ntfConnId maxMs nMeta
getMsg 0 = pure []
getMsg n =
getConnectionMessage' c ntfConnId >>= \case
Just m
| lastMsg m -> pure [m]
| otherwise -> (m :) <$> getMsg (n - 1)
Nothing -> pure []
lastMsg SMP.SMPMsgMeta {msgId, msgTs, msgFlags} = case nMeta of
Just SMP.NMsgMeta {msgId = msgId', msgTs = msgTs'} -> msgId == msgId' || msgTs > msgTs'
Nothing -> SMP.notification msgFlags
-- | Send message to the connection (SEND command) in Reader monad
sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
@@ -1887,11 +1885,14 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
conn
cData@ConnData {userId, connId, duplexHandshake, connAgentVersion, ratchetSyncState = rss} =
withConnLock c connId "processSMP" $ case cmd of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
handleNotifyAck $
decryptSMPMessage v rq msg >>= \case
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
handleNotifyAck $ do
msg' <- decryptSMPMessage v rq msg
handleNotifyAck $ case msg' of
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody
SMP.ClientRcvMsgQuota {} -> queueDrained >> ack
whenM (atomically $ hasGetLock c rq) $
notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
where
queueDrained = case conn of
DuplexConnection _ _ sqs -> void $ enqueueMessages c cData sqs SMP.noMsgFlags $ QCONT (sndAddress rq)
+7 -2
View File
@@ -72,6 +72,7 @@ module Simplex.Messaging.Agent.Client
logSecret,
removeSubscription,
hasActiveSubscription,
hasGetLock,
agentClientStore,
agentDRG,
getAgentSubscriptions,
@@ -897,8 +898,8 @@ subscribeQueues c qs = do
-- only "checked" queues are subscribed
(errs <>) <$> sendTSessionBatches "SUB" 90 id (subscribeQueues_ u) c qs'
where
checkQueue rq@RcvQueue {rcvId, server} = do
prohibited <- atomically . TM.member (server, rcvId) $ getMsgLocks c
checkQueue rq = do
prohibited <- atomically $ hasGetLock c rq
pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED) else Right rq
subscribeQueues_ :: UnliftIO m -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
subscribeQueues_ u smp qs' = do
@@ -1049,6 +1050,10 @@ sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do
ackSMPMessage smp rcvPrivateKey rcvId msgId
atomically $ releaseGetLock c rq
hasGetLock :: AgentClient -> RcvQueue -> STM Bool
hasGetLock c RcvQueue {server, rcvId} =
TM.member (server, rcvId) $ getMsgLocks c
releaseGetLock :: AgentClient -> RcvQueue -> STM ()
releaseGetLock c RcvQueue {server, rcvId} =
TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
+4 -3
View File
@@ -34,6 +34,7 @@ import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.ByteArray (ScrubbedBytes)
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Map (Map)
@@ -163,7 +164,7 @@ defaultAgentConfig =
ntfWorkerDelay = 100000, -- microseconds
ntfSMPWorkerDelay = 500000, -- microseconds
ntfSubCheckInterval = nominalDay,
ntfMaxMessages = 4,
ntfMaxMessages = 3,
-- CA certificate private key is not needed for initialization
-- ! we do not generate these
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
@@ -196,8 +197,8 @@ newSMPAgentEnv config@AgentConfig {initialClientId} store = do
multicastSubscribers <- newTMVarIO 0
pure Env {config, store, random, clientCounter, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}
createAgentStore :: FilePath -> String -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createAgentStore dbFilePath dbKey = createSQLiteStore dbFilePath dbKey Migrations.app
createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey keepKey Migrations.app
data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
+8
View File
@@ -193,6 +193,7 @@ import Simplex.Messaging.Protocol
MsgId,
NMsgMeta,
ProtocolServer (..),
SMPMsgMeta,
SMPServer,
SMPServerWithAuth,
SndPublicVerifyKey,
@@ -337,6 +338,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
SENT :: AgentMsgId -> ACommand Agent AEConn
MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent AEConn
MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent AEConn
MSGNTF :: SMPMsgMeta -> ACommand Agent AEConn
ACK :: AgentMsgId -> Maybe MsgReceiptInfo -> ACommand Client AEConn
RCVD :: MsgMeta -> NonEmpty MsgReceipt -> ACommand Agent AEConn
SWCH :: ACommand Client AEConn
@@ -397,6 +399,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
SENT_ :: ACommandTag Agent AEConn
MERR_ :: ACommandTag Agent AEConn
MSG_ :: ACommandTag Agent AEConn
MSGNTF_ :: ACommandTag Agent AEConn
ACK_ :: ACommandTag Client AEConn
RCVD_ :: ACommandTag Agent AEConn
SWCH_ :: ACommandTag Client AEConn
@@ -450,6 +453,7 @@ aCommandTag = \case
SENT _ -> SENT_
MERR {} -> MERR_
MSG {} -> MSG_
MSGNTF {} -> MSGNTF_
ACK {} -> ACK_
RCVD {} -> RCVD_
SWCH -> SWCH_
@@ -1604,6 +1608,7 @@ instance StrEncoding ACmdTag where
"SENT" -> ct SENT_
"MERR" -> ct MERR_
"MSG" -> ct MSG_
"MSGNTF" -> ct MSGNTF_
"ACK" -> t ACK_
"RCVD" -> ct RCVD_
"SWCH" -> t SWCH_
@@ -1659,6 +1664,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where
SENT_ -> "SENT"
MERR_ -> "MERR"
MSG_ -> "MSG"
MSGNTF_ -> "MSGNTF"
ACK_ -> "ACK"
RCVD_ -> "RCVD"
SWCH_ -> "SWCH"
@@ -1727,6 +1733,7 @@ commandP binaryP =
SENT_ -> s (SENT <$> A.decimal)
MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP)
MSG_ -> s (MSG <$> strP <* A.space <*> smpP <* A.space <*> binaryP)
MSGNTF_ -> s (MSGNTF <$> strP)
RCVD_ -> s (RCVD <$> strP <* A.space <*> strP)
DEL_RCVQ_ -> s (DEL_RCVQ <$> strP_ <*> strP_ <*> strP)
DEL_CONN_ -> pure DEL_CONN
@@ -1781,6 +1788,7 @@ serializeCommand = \case
SENT mId -> s (SENT_, Str $ bshow mId)
MERR mId e -> s (MERR_, Str $ bshow mId, e)
MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, s msgMeta, smpEncode msgFlags, serializeBinary msgBody]
MSGNTF smpMsgMeta -> s (MSGNTF_, smpMsgMeta)
ACK mId rcptInfo_ -> s (ACK_, Str $ bshow mId) <> maybe "" (B.cons ' ' . serializeBinary) rcptInfo_
RCVD msgMeta rcpts -> s (RCVD_, msgMeta, rcpts)
SWCH -> s SWCH_
+43 -24
View File
@@ -31,7 +31,10 @@ module Simplex.Messaging.Agent.Store.SQLite
connectSQLiteStore,
closeSQLiteStore,
openSQLiteStore,
reopenSQLiteStore,
sqlString,
keyString,
storeKey,
execSQL,
upMigration, -- used in tests
@@ -221,6 +224,8 @@ import Crypto.Random (ChaChaDRG)
import qualified Data.Aeson.TH as J
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (second)
import Data.ByteArray (ScrubbedBytes)
import qualified Data.ByteArray as BA
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import Data.Char (toLower)
@@ -267,7 +272,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, eitherToMaybe, groupOn, ifM, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, eitherToMaybe, groupOn, ifM, safeDecodeUtf8, ($>>=), (<$$>))
import Simplex.Messaging.Version
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
import System.Exit (exitFailure)
@@ -316,11 +321,11 @@ instance StrEncoding MigrationConfirmation where
"error" -> pure MCError
_ -> fail "invalid MigrationConfirmation"
createSQLiteStore :: FilePath -> String -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createSQLiteStore dbFilePath dbKey migrations confirmMigrations = do
createSQLiteStore :: FilePath -> ScrubbedBytes -> Bool -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createSQLiteStore dbFilePath dbKey keepKey migrations confirmMigrations = do
let dbDir = takeDirectory dbFilePath
createDirectoryIfMissing True dbDir
st <- connectSQLiteStore dbFilePath dbKey
st <- connectSQLiteStore dbFilePath dbKey keepKey
r <- migrateSchema st migrations confirmMigrations `onException` closeSQLiteStore st
case r of
Right () -> pure $ Right st
@@ -366,17 +371,17 @@ confirmOrExit s = do
ok <- getLine
when (map toLower ok /= "y") exitFailure
connectSQLiteStore :: FilePath -> String -> IO SQLiteStore
connectSQLiteStore dbFilePath dbKey = do
connectSQLiteStore :: FilePath -> ScrubbedBytes -> Bool -> IO SQLiteStore
connectSQLiteStore dbFilePath key keepKey = do
dbNew <- not <$> doesFileExist dbFilePath
dbConn <- dbBusyLoop (connectDB dbFilePath dbKey)
dbConn <- dbBusyLoop (connectDB dbFilePath key)
atomically $ do
dbConnection <- newTMVar dbConn
dbEncrypted <- newTVar . not $ null dbKey
dbKey <- newTVar $! storeKey key keepKey
dbClosed <- newTVar False
pure SQLiteStore {dbFilePath, dbEncrypted, dbConnection, dbNew, dbClosed}
pure SQLiteStore {dbFilePath, dbKey, dbConnection, dbNew, dbClosed}
connectDB :: FilePath -> String -> IO DB.Connection
connectDB :: FilePath -> ScrubbedBytes -> IO DB.Connection
connectDB path key = do
db <- DB.open path
prepare db `onException` DB.close db
@@ -385,7 +390,7 @@ connectDB path key = do
where
prepare db = do
let exec = SQLite3.exec $ SQL.connectionHandle $ DB.conn db
unless (null key) . exec $ "PRAGMA key = " <> sqlString key <> ";"
unless (BA.null key) . exec $ "PRAGMA key = " <> keyString key <> ";"
exec . fromQuery $
[sql|
PRAGMA busy_timeout = 100;
@@ -402,22 +407,36 @@ closeSQLiteStore st@SQLiteStore {dbClosed} =
DB.close conn
atomically $ writeTVar dbClosed True
openSQLiteStore :: SQLiteStore -> String -> IO ()
openSQLiteStore SQLiteStore {dbConnection, dbFilePath, dbClosed} key =
ifM (readTVarIO dbClosed) open (putStrLn "closeSQLiteStore: already opened")
openSQLiteStore :: SQLiteStore -> ScrubbedBytes -> Bool -> IO ()
openSQLiteStore st@SQLiteStore {dbClosed} key keepKey =
ifM (readTVarIO dbClosed) (openSQLiteStore_ st key keepKey) (putStrLn "openSQLiteStore: already opened")
openSQLiteStore_ :: SQLiteStore -> ScrubbedBytes -> Bool -> IO ()
openSQLiteStore_ SQLiteStore {dbConnection, dbFilePath, dbKey, dbClosed} key keepKey =
bracketOnError
(atomically $ takeTMVar dbConnection)
(atomically . tryPutTMVar dbConnection)
$ \DB.Connection {slow} -> do
DB.Connection {conn} <- connectDB dbFilePath key
atomically $ do
putTMVar dbConnection DB.Connection {conn, slow}
writeTVar dbClosed False
writeTVar dbKey $! storeKey key keepKey
reopenSQLiteStore :: SQLiteStore -> IO ()
reopenSQLiteStore st@SQLiteStore {dbKey, dbClosed} =
ifM (readTVarIO dbClosed) open (putStrLn "reopenSQLiteStore: already opened")
where
open =
bracketOnError
(atomically $ takeTMVar dbConnection)
(atomically . tryPutTMVar dbConnection)
$ \DB.Connection {slow} -> do
DB.Connection {conn} <- connectDB dbFilePath key
atomically $ do
putTMVar dbConnection DB.Connection {conn, slow}
writeTVar dbClosed False
readTVarIO dbKey >>= \case
Just key -> openSQLiteStore_ st key True
Nothing -> fail "reopenSQLiteStore: no key"
sqlString :: String -> Text
sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote
keyString :: ScrubbedBytes -> Text
keyString = sqlString . safeDecodeUtf8 . BA.convert
sqlString :: Text -> Text
sqlString s = quote <> T.replace quote "''" s <> quote
where
quote = "'"
@@ -10,10 +10,13 @@ module Simplex.Messaging.Agent.Store.SQLite.Common
withTransaction',
withTransactionCtx,
dbBusyLoop,
storeKey,
)
where
import Control.Concurrent (threadDelay)
import Data.ByteArray (ScrubbedBytes)
import qualified Data.ByteArray as BA
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import Database.SQLite.Simple (SQLError)
import qualified Database.SQLite.Simple as SQL
@@ -23,9 +26,12 @@ import UnliftIO.Exception (bracket)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
storeKey :: ScrubbedBytes -> Bool -> Maybe ScrubbedBytes
storeKey key keepKey = if keepKey || BA.null key then Just key else Nothing
data SQLiteStore = SQLiteStore
{ dbFilePath :: FilePath,
dbEncrypted :: TVar Bool,
dbKey :: TVar (Maybe ScrubbedBytes),
dbConnection :: TMVar DB.Connection,
dbClosed :: TVar Bool,
dbNew :: Bool
+8 -1
View File
@@ -460,7 +460,14 @@ data SMPMsgMeta = SMPMsgMeta
msgTs :: SystemTime,
msgFlags :: MsgFlags
}
deriving (Show)
deriving (Eq, Show)
instance StrEncoding SMPMsgMeta where
strEncode SMPMsgMeta {msgId, msgTs, msgFlags} =
strEncode (msgId, msgTs, msgFlags)
strP = do
(msgId, msgTs, msgFlags) <- strP
pure SMPMsgMeta {msgId, msgTs, msgFlags}
rcvMessageMeta :: MsgId -> ClientRcvMsgBody -> SMPMsgMeta
rcvMessageMeta msgId = \case