From 67d38090ed614f1a7c45dd0a52f5dc5b110d5a72 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sun, 8 Sep 2024 15:45:45 +0100 Subject: [PATCH 1/7] xrcp: use SHA3-256 in hybrid key agreement (#1302) --- protocol/xrcp.md | 4 ++-- src/Simplex/Messaging/Crypto/SNTRUP761.hs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/protocol/xrcp.md b/protocol/xrcp.md index 9f7187e66..2ed2c8d62 100644 --- a/protocol/xrcp.md +++ b/protocol/xrcp.md @@ -250,7 +250,7 @@ In pseudo-code: ``` // session 1 hostHelloSecret(1) = dhSecret(1) -sessionSecret(1) = sha256(dhSecret(1) || kemSecret(1)) // to encrypt session 1 data, incl. controller hello +sessionSecret(1) = sha3-256(dhSecret(1) || kemSecret(1)) // to encrypt session 1 data, incl. controller hello dhSecret(1) = dh(hostHelloDhKey(1), controllerInvitationDhKey(1)) kemCiphertext(1) = enc(kemSecret(1), kemEncKey(1)) // kemEncKey is included in host HELLO, kemCiphertext - in controller HELLO @@ -262,7 +262,7 @@ dhSecret(n') = dh(hostHelloDhKey(n - 1), controllerDhKey(n)) // session n hostHelloSecret(n) = dhSecret(n) -sessionSecret(n) = sha256(dhSecret(n) || kemSecret(n)) // to encrypt session n data, incl. controller hello +sessionSecret(n) = sha3-256(dhSecret(n) || kemSecret(n)) // to encrypt session n data, incl. controller hello dhSecret(n) = dh(hostHelloDhKey(n), controllerDhKey(n)) // controllerDhKey(n) is either from invitation or from multicast announcement kemCiphertext(n) = enc(kemSecret(n), kemEncKey(n)) diff --git a/src/Simplex/Messaging/Crypto/SNTRUP761.hs b/src/Simplex/Messaging/Crypto/SNTRUP761.hs index 99b2771f6..6f903804e 100644 --- a/src/Simplex/Messaging/Crypto/SNTRUP761.hs +++ b/src/Simplex/Messaging/Crypto/SNTRUP761.hs @@ -4,7 +4,7 @@ module Simplex.Messaging.Crypto.SNTRUP761 where -import Crypto.Hash (Digest, SHA256, hash) +import Crypto.Hash (Digest, SHA3_256, hash) import Data.ByteArray (ScrubbedBytes) import qualified Data.ByteArray as BA import Data.ByteString (ByteString) @@ -28,4 +28,4 @@ kcbEncrypt (KEMHybridSecret k) = sbEncrypt_ k kemHybridSecret :: PublicKeyX25519 -> PrivateKeyX25519 -> KEMSharedKey -> KEMHybridSecret kemHybridSecret k pk (KEMSharedKey kem) = let DhSecretX25519 dh = C.dh' k pk - in KEMHybridSecret $ BA.convert (hash $ BA.convert dh <> kem :: Digest SHA256) + in KEMHybridSecret $ BA.convert (hash $ BA.convert dh <> kem :: Digest SHA3_256) From 344a295845ceea6a8a926e3f4c10fe79bcf05abe Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sun, 8 Sep 2024 16:50:22 +0100 Subject: [PATCH 2/7] agent: error when user record is not in database (#1303) --- src/Simplex/Messaging/Agent/Client.hs | 1 + src/Simplex/Messaging/Agent/Protocol.hs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 9bd469ec5..c2f1c8aa6 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1930,6 +1930,7 @@ withStoreBatch' c actions = withStoreBatch c (fmap (fmap Right) . actions) storeError :: StoreError -> AgentErrorType storeError = \case SEConnNotFound -> CONN NOT_FOUND + SEUserNotFound -> NO_USER SERatchetNotFound -> CONN NOT_FOUND SEConnDuplicate -> CONN DUPLICATE SEBadConnType CRcv -> CONN SIMPLEX diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 12c29e6a0..178d87109 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -1338,6 +1338,8 @@ data AgentErrorType CMD {cmdErr :: CommandErrorType, errContext :: String} | -- | connection errors CONN {connErr :: ConnectionErrorType} + | -- | user not found in database + NO_USER | -- | SMP protocol errors forwarded to agent clients SMP {serverAddress :: String, smpErr :: ErrorType} | -- | NTF protocol errors forwarded to agent clients From dab1980d79b35634bea9a259b633bd06ed8d5ebf Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 9 Sep 2024 08:08:16 +0100 Subject: [PATCH 3/7] xftp: report receive file error with redirected file ID, when redirect is present (#1304) * xftp: report receive file error with redirected file ID, when redirect is present * fix test --- src/Simplex/FileTransfer/Agent.hs | 21 +++++++++++---------- src/Simplex/Messaging/Agent/Store/SQLite.hs | 13 +++++++------ tests/AgentTests/SQLiteTests.hs | 2 +- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 115ca6946..aabe3ff28 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -45,7 +45,7 @@ import Data.List (foldl', partition, sortOn) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (mapMaybe) +import Data.Maybe (fromMaybe, mapMaybe) import qualified Data.Set as S import Data.Text (Text) import Data.Time.Clock (getCurrentTime) @@ -190,8 +190,9 @@ runXFTPRcvWorker c srv Worker {doWork} = do runXFTPOperation :: AgentConfig -> AM () runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} = withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case - (RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas") - (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do + (RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _, redirectEntityId_) -> + rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) (INTERNAL "chunk has no replicas") + (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays, redirectEntityId_) -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do liftIO $ waitWhileSuspended c @@ -202,7 +203,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do where retryLoop loop e replicaDelay = do flip catchAgentError (\_ -> pure ()) $ do - when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e + when (serverHostError e) $ notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFWARN e) liftIO $ closeXFTPServerClient c userId server digest withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay liftIO $ assertAgentForeground c @@ -211,7 +212,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do atomically . incXFTPServerStat c userId srv $ case e of XFTP _ XFTP.AUTH -> downloadAuthErrs _ -> downloadErrs - rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) e + rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) e downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM () downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED @@ -262,11 +263,11 @@ retryOnError name loop done e = do then loop else done -rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM () -rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do +rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe RcvFileId -> Maybe FilePath -> AgentErrorType -> AM () +rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ tmpPath err = do lift $ forM_ tmpPath (removePath <=< toFSFilePath) withStore' c $ \db -> updateRcvFileError db rcvFileId (show err) - notify c rcvFileEntityId $ RFERR err + notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFERR err) runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM () runXFTPRcvLocalWorker c Worker {doWork} = do @@ -279,8 +280,8 @@ runXFTPRcvLocalWorker c Worker {doWork} = do runXFTPOperation :: AgentConfig -> AM () runXFTPOperation AgentConfig {rcvFilesTTL} = withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $ - \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> - decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath + \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath, redirect} -> + decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath decryptFile :: RcvFile -> AM () decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do let CryptoFile savePath cfArgs = saveFile diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 07074e08f..d3eae9354 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -2525,7 +2525,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () deleteRcvFile' db rcvFileId = DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) -getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool))) +getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool, Maybe RcvFileId))) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd) where @@ -2549,7 +2549,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d LIMIT 1 |] (host, port, keyHash, RFSReceiving, cutoffTs) - getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool)) + getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool, Maybe RcvFileId)) getChunkData (rcvFileChunkReplicaId, _fileId) = firstRow toChunk SEFileNotFound $ DB.query @@ -2558,7 +2558,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d SELECT f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries, - f.approved_relays + f.approved_relays, f.redirect_entity_id FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id @@ -2567,8 +2567,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d |] (Only rcvFileChunkReplicaId) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool) - toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) = + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. (Bool, Maybe RcvFileId)) -> (RcvFileChunk, Bool, Maybe RcvFileId) + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (approvedRelays, redirectEntityId_)) = ( RcvFileChunk { rcvFileId, rcvFileEntityId, @@ -2581,7 +2581,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d chunkTmpPath, replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] }, - approvedRelays + approvedRelays, + redirectEntityId_ ) getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile)) diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 4a8d80dd4..22023cc96 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -741,7 +741,7 @@ testGetNextRcvChunkToDownload st = do show e `shouldContain` "ConversionFailed" DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] - Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400 + Right (Just (RcvFileChunk {rcvFileEntityId}, _, Nothing)) <- getNextRcvChunkToDownload db xftpServer1 86400 rcvFileEntityId `shouldBe` fId2 testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation From 092ed088caef431b49dfd53f09603346422f77f9 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:03:17 +0400 Subject: [PATCH 4/7] ntf: support for multiple messages encoding (#1305) --- src/Simplex/Messaging/Agent.hs | 12 ++++++------ src/Simplex/Messaging/Notifications/Server.hs | 2 +- .../Messaging/Notifications/Server/Push/APNS.hs | 17 +++++++++++++++-- tests/AgentTests/NotificationTests.hs | 4 +++- tests/NtfServerTests.hs | 9 +++++---- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 554c39249..c45c47ab6 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -174,7 +174,7 @@ import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId) -import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) +import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), pnMessagesP) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolType (..), ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, sndAuthKeySMPClientVersion) @@ -334,7 +334,7 @@ createConnection :: AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe createConnection c userId enableNtfs = withAgentEnv c .:: newConn c userId "" enableNtfs {-# INLINE createConnection #-} --- | Changes the user id associated with a connection +-- | Changes the user id associated with a connection changeConnectionUser :: AgentClient -> UserId -> ConnId -> UserId -> AE () changeConnectionUser c oldUserId connId newUserId = withAgentEnv c $ changeConnectionUser' c oldUserId connId newUserId {-# INLINE changeConnectionUser #-} @@ -1020,7 +1020,7 @@ subscribeConnections' c connIds = do SomeConn _ conn -> do let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete ConnData {connId} = toConnData conn - atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) + atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) resumeDelivery :: Map ConnId SomeConn -> AM () resumeDelivery conns = do conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery @@ -1065,7 +1065,7 @@ getNotificationMessage' c nonce encNtfInfo = do withStore' c getActiveNtfToken >>= \case Just NtfToken {ntfDhSecret = Just dhSecret} -> do ntfData <- agentCbDecrypt dhSecret nonce encNtfInfo - PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} <- liftEither (parse strP (INTERNAL "error parsing PNMessageData") ntfData) + PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} :| _ <- liftEither (parse pnMessagesP (INTERNAL "error parsing PNMessageData") ntfData) (ntfConnId, rcvNtfDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue) ntfMsgMeta <- (eitherToMaybe . smpDecode <$> agentCbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta) `catchAgentError` \_ -> pure Nothing msgMeta <- getConnectionMessage' c ntfConnId @@ -1103,8 +1103,8 @@ sendMessagesB_ c reqs connIds = withConnLocks c connIds "sendMessages" $ do where getConn_ :: DB.Connection -> TVar (Maybe (Either AgentErrorType SomeConn)) -> MsgReq -> IO (Either AgentErrorType (MsgReq, SomeConn)) getConn_ db prev req@(connId, _, _, _) = - (req,) <$$> - if B.null connId + (req,) + <$$> if B.null connId then fromMaybe (Left $ INTERNAL "sendMessagesB_: empty prev connId") <$> readTVarIO prev else do conn <- first storeError <$> getConn db connId diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 763c45de6..2f4093ac3 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -221,7 +221,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge liftIO $ updatePeriodStats (activeSubs stats) ntfId atomically $ findNtfSubscriptionToken st smpQueue - >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) + >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage (PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} :| []))) incNtfStat ntfReceived Right SMP.END -> whenM (atomically $ activeClientSession' ca sessionId srv) $ diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 2632ff4b4..00d3c6a81 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -28,12 +28,16 @@ import Data.Aeson (ToJSON, (.=)) import qualified Data.Aeson as J import qualified Data.Aeson.Encoding as JE import qualified Data.Aeson.TH as JQ +import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Builder (lazyByteString) import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Int (Int64) +import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import Data.Maybe (isNothing) import Data.Text (Text) @@ -103,11 +107,20 @@ readECPrivateKey f = do data PushNotification = PNVerification NtfRegCode - | PNMessage PNMessageData + | PNMessage (NonEmpty PNMessageData) | -- | PNAlert Text PNCheckMessages deriving (Show) +-- List of PNMessageData uses semicolon-separated encoding instead of strEncode, +-- because strEncode of NonEmpty list uses comma for separator, +-- and encoding of PNMessageData's smpQueue has comma in list of hosts +encodePNMessages :: NonEmpty PNMessageData -> ByteString +encodePNMessages = B.intercalate ";" . map strEncode . L.toList + +pnMessagesP :: A.Parser (NonEmpty PNMessageData) +pnMessagesP = L.fromList <$> strP `A.sepBy1` A.char ';' + data PNMessageData = PNMessageData { smpQueue :: SMPQueueNtf, ntfTs :: SystemTime, @@ -285,7 +298,7 @@ apnsNotification NtfTknData {tknDhSecret} nonce paddedLen = \case encrypt code $ \code' -> apn APNSBackground {contentAvailable = 1} . Just $ J.object ["nonce" .= nonce, "verification" .= code'] PNMessage pnMessageData -> - encrypt (strEncode pnMessageData) $ \ntfData -> + encrypt (encodePNMessages pnMessageData) $ \ntfData -> apn apnMutableContent . Just $ J.object ["nonce" .= nonce, "message" .= ntfData] -- PNAlert text -> Right $ apn (apnAlert $ APNSAlertText text) Nothing PNCheckMessages -> Right $ apn APNSBackground {contentAvailable = 1} . Just $ J.object ["checkMessages" .= True] diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index c45c37124..5f019540f 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -49,6 +49,7 @@ import Data.Bifunctor (bimap, first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.List.NonEmpty (NonEmpty (..)) import Data.Text.Encoding (encodeUtf8) import Database.SQLite.Simple.QQ (sql) import NtfClient @@ -66,6 +67,7 @@ import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Types (NtfTknAction (..), NtfToken (..)) +import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..)) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) @@ -872,7 +874,7 @@ messageNotificationData :: HasCallStack => AgentClient -> TBQueue APNSMockReques messageNotificationData c apnsQ = do (nonce, message) <- messageNotification apnsQ NtfToken {ntfDhSecret = Just dhSecret} <- getNtfTokenData c - Right pnMsgData <- liftEither . first INTERNAL $ Right . strDecode =<< first show (C.cbDecrypt dhSecret nonce message) + Right (pnMsgData :| _) <- liftEither . first INTERNAL $ Right . parseAll pnMessagesP =<< first show (C.cbDecrypt dhSecret nonce message) pure pnMsgData noNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO () diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index c4b97f18c..b46988b28 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -17,6 +17,7 @@ import qualified Data.Aeson.Types as JT import Data.Bifunctor (first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) +import Data.List.NonEmpty (NonEmpty (..)) import Data.Text.Encoding (encodeUtf8) import NtfClient import SMPClient as SMP @@ -136,8 +137,8 @@ testNotificationSubscription (ATransport t) = Right nonce' = C.cbNonce <$> ntfData' .-> "nonce" Right message = ntfData' .-> "message" Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' message - Right APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} = - parse strP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted + Right (APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} :| _) = + parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted Right nMsgMeta = C.cbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta Right NMsgMeta {msgId, msgTs} = parse smpP (AP.INTERNAL "error parsing NMsgMeta") nMsgMeta smpServer `shouldBe` srv @@ -169,8 +170,8 @@ testNotificationSubscription (ATransport t) = Right nonce3 = C.cbNonce <$> ntfData3 .-> "nonce" Right message3 = ntfData3 .-> "message" Right ntfDataDecrypted3 = C.cbDecrypt dhSecret nonce3 message3 - Right APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer = smpServer3, notifierId = notifierId3}} = - parse strP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted3 + Right (APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer = smpServer3, notifierId = notifierId3}} :| _) = + parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted3 smpServer3 `shouldBe` srv notifierId3 `shouldBe` nId send3 APNSRespOk From 946e16339e16e026f51185ebfb48c3a0c5a5b2e1 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:42:14 +0400 Subject: [PATCH 5/7] agent: process last notification from list (#1307) --- src/Simplex/Messaging/Agent.hs | 3 ++- tests/AgentTests/NotificationTests.hs | 6 +++--- tests/NtfServerTests.hs | 11 +++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index c45c47ab6..2ead387ca 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1065,7 +1065,8 @@ getNotificationMessage' c nonce encNtfInfo = do withStore' c getActiveNtfToken >>= \case Just NtfToken {ntfDhSecret = Just dhSecret} -> do ntfData <- agentCbDecrypt dhSecret nonce encNtfInfo - PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} :| _ <- liftEither (parse pnMessagesP (INTERNAL "error parsing PNMessageData") ntfData) + pnMsgs <- liftEither (parse pnMessagesP (INTERNAL "error parsing PNMessageData") ntfData) + let PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = L.last pnMsgs (ntfConnId, rcvNtfDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue) ntfMsgMeta <- (eitherToMaybe . smpDecode <$> agentCbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta) `catchAgentError` \_ -> pure Nothing msgMeta <- getConnectionMessage' c ntfConnId diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 5f019540f..62429a456 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -49,7 +49,7 @@ import Data.Bifunctor (bimap, first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.List.NonEmpty as L import Data.Text.Encoding (encodeUtf8) import Database.SQLite.Simple.QQ (sql) import NtfClient @@ -874,8 +874,8 @@ messageNotificationData :: HasCallStack => AgentClient -> TBQueue APNSMockReques messageNotificationData c apnsQ = do (nonce, message) <- messageNotification apnsQ NtfToken {ntfDhSecret = Just dhSecret} <- getNtfTokenData c - Right (pnMsgData :| _) <- liftEither . first INTERNAL $ Right . parseAll pnMessagesP =<< first show (C.cbDecrypt dhSecret nonce message) - pure pnMsgData + Right pnMsgs <- liftEither . first INTERNAL $ Right . parseAll pnMessagesP =<< first show (C.cbDecrypt dhSecret nonce message) + pure $ L.last pnMsgs noNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO () noNotification apnsQ = do diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index b46988b28..e3349e9a3 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -17,7 +17,7 @@ import qualified Data.Aeson.Types as JT import Data.Bifunctor (first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) -import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.List.NonEmpty as L import Data.Text.Encoding (encodeUtf8) import NtfClient import SMPClient as SMP @@ -36,7 +36,6 @@ import ServerTests import qualified Simplex.Messaging.Agent.Protocol as AP import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding -import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS import qualified Simplex.Messaging.Notifications.Server.Push.APNS as APNS @@ -137,8 +136,8 @@ testNotificationSubscription (ATransport t) = Right nonce' = C.cbNonce <$> ntfData' .-> "nonce" Right message = ntfData' .-> "message" Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' message - Right (APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} :| _) = - parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted + Right pnMsgs1 = parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted + APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} = L.last pnMsgs1 Right nMsgMeta = C.cbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta Right NMsgMeta {msgId, msgTs} = parse smpP (AP.INTERNAL "error parsing NMsgMeta") nMsgMeta smpServer `shouldBe` srv @@ -170,8 +169,8 @@ testNotificationSubscription (ATransport t) = Right nonce3 = C.cbNonce <$> ntfData3 .-> "nonce" Right message3 = ntfData3 .-> "message" Right ntfDataDecrypted3 = C.cbDecrypt dhSecret nonce3 message3 - Right (APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer = smpServer3, notifierId = notifierId3}} :| _) = - parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted3 + Right pnMsgs2 = parse pnMessagesP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted3 + APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer = smpServer3, notifierId = notifierId3}} = L.last pnMsgs2 smpServer3 `shouldBe` srv notifierId3 `shouldBe` nId send3 APNSRespOk From 75712641ee97d1f0af5de3f7f3bc658c4cb7b8e2 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 9 Sep 2024 14:07:32 +0100 Subject: [PATCH 6/7] rfc: stabilize iOS notifications (#1221) --- rfcs/2024-07-06-ios-notifications.md | 18 ++++++++++++++++++ rfcs/{ => done}/2023-12-29-pqdr.md | 0 rfcs/{ => done}/2024-03-03-pqdr-version.md | 0 rfcs/{ => done}/2024-06-14-fast-connection.md | 0 4 files changed, 18 insertions(+) create mode 100644 rfcs/2024-07-06-ios-notifications.md rename rfcs/{ => done}/2023-12-29-pqdr.md (100%) rename rfcs/{ => done}/2024-03-03-pqdr-version.md (100%) rename rfcs/{ => done}/2024-06-14-fast-connection.md (100%) diff --git a/rfcs/2024-07-06-ios-notifications.md b/rfcs/2024-07-06-ios-notifications.md new file mode 100644 index 000000000..c60d7baf3 --- /dev/null +++ b/rfcs/2024-07-06-ios-notifications.md @@ -0,0 +1,18 @@ +# iOS notifications stability + +## Problem + +iOS notifications may fail to deliver for several reasons, but there are two important reasons that we could address: +- when notification server is not subscribed to SMP server(s), the notifications can be dropped - it can happen because either notification server restarts or becuase SMP server restarted and some messages are received before notification server resubscribed. We lose approximately 3% of notifications because of this reason. +- when user device is offline or has low power condition, Apple does not deliver notification, but puts them to storage. If while the notification is in storage a new one arrives it would overwrite the previous notification. If it was the message to the same message queue, the client will download messages anyway, up to a limit, but if the message was to another queue, it will not be delivered until the app is opened. Apple delivers about 88% of notifications that should be delivered (not accounting for uninstalled apps), the rest is replaced with the newer notifications. + +## Solution + +The first problem can be solved by preserving notifications for a limited time (say 1 hour) in case there is no subscription to notification from notification server. At the very least, they can be preserved in SMP server memory but can also be stored to a file on restart, similar to messages, and be delivered when notification server resubscribes. It is sufficient to store one notification per messaging queue. + +The second problem is both more damaging and more complex to solve. The solution could be to always deliver several last notifications to different queues in one packet (Apple allows up to ~4-5kb notification size, and we are sending packets of fixed size 512 bytes, so we could fit up to 8-10 of them in each notification). + +Every time a client receives such batch of notifications if can: +- check if that notification was already received in the previous batch. +- if it was received, it would be ignored, otherwise it would be processed. +- process them one by one, started from the most recent one while the time allows. diff --git a/rfcs/2023-12-29-pqdr.md b/rfcs/done/2023-12-29-pqdr.md similarity index 100% rename from rfcs/2023-12-29-pqdr.md rename to rfcs/done/2023-12-29-pqdr.md diff --git a/rfcs/2024-03-03-pqdr-version.md b/rfcs/done/2024-03-03-pqdr-version.md similarity index 100% rename from rfcs/2024-03-03-pqdr-version.md rename to rfcs/done/2024-03-03-pqdr-version.md diff --git a/rfcs/2024-06-14-fast-connection.md b/rfcs/done/2024-06-14-fast-connection.md similarity index 100% rename from rfcs/2024-06-14-fast-connection.md rename to rfcs/done/2024-06-14-fast-connection.md From 990dcec3481035f49658331c021a510bba2237c5 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 9 Sep 2024 14:53:11 +0100 Subject: [PATCH 7/7] smp server: add created/updated/used date to queues to manage expiration (#1306) * smp server: add created/updated/used date to queues to manage expiration, all: make Map updates strict in value * remove strict * remove time precision * diff * style * only update when time changed --- src/Simplex/Messaging/Server.hs | 27 +++++-- src/Simplex/Messaging/Server/QueueStore.hs | 19 ++++- .../Messaging/Server/QueueStore/STM.hs | 27 ++++--- src/Simplex/Messaging/Server/StoreLog.hs | 73 +++++++++++++++---- 4 files changed, 112 insertions(+), 34 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index cffd1d6df..e2d5edc4e 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -986,8 +986,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> Just <$> case command of SKEY sKey -> (corrId,entId,) <$> case qr_ of - Just QueueRec {sndSecure, recipientId} - | sndSecure -> secureQueue_ "SKEY" recipientId sKey + Just qr@QueueRec {sndSecure} + | sndSecure -> secureQueue_ "SKEY" qr sKey | otherwise -> pure $ ERR AUTH Nothing -> pure $ ERR INTERNAL SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody @@ -1010,7 +1010,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s GET -> withQueue getMessage ACK msgId -> withQueue (`acknowledgeMsg` msgId) KEY sKey -> (corrId,entId,) <$> case qr_ of - Just QueueRec {recipientId} -> secureQueue_ "KEY" recipientId sKey + Just qr -> secureQueue_ "KEY" qr sKey Nothing -> pure $ ERR INTERNAL NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey NDEL -> deleteQueueNotifier_ st @@ -1021,6 +1021,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode sndSecure = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random + updatedAt <- Just <$> liftIO getSystemDate let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure} qRec (recipientId, senderId) = @@ -1032,7 +1033,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s senderKey = Nothing, notifier = Nothing, status = QueueActive, - sndSecure + sndSecure, + updatedAt } (corrId,entId,) <$> addQueueRetry 3 qik qRec where @@ -1061,9 +1063,10 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s n <- asks $ queueIdBytes . config liftM2 (,) (randomId n) (randomId n) - secureQueue_ :: T.Text -> RecipientId -> SndPublicAuthKey -> M BrokerMsg - secureQueue_ name rId sKey = time name $ do + secureQueue_ :: T.Text -> QueueRec -> SndPublicAuthKey -> M BrokerMsg + secureQueue_ name qr@QueueRec {recipientId = rId} sKey = time name $ do withLog $ \s -> logSecureQueue s rId sKey + updateQueueDate qr st <- asks queueStore stats <- asks serverStats incStat $ qSecured stats @@ -1172,7 +1175,17 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s pure r withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) - withQueue action = maybe (pure $ err AUTH) action qr_ + withQueue action = case qr_ of + Just qr -> updateQueueDate qr >> action qr + Nothing -> pure $ err INTERNAL + + updateQueueDate :: QueueRec -> M () + updateQueueDate QueueRec {updatedAt, recipientId = rId} = do + t <- liftIO getSystemDate + when (Just t /= updatedAt) $ do + withLog $ \s -> logUpdateQueueTime s rId t + st <- asks queueStore + liftIO $ updateQueueTime st rId t subscribeNotifications :: M (Transmission BrokerMsg) subscribeNotifications = do diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 8d5bd8fff..3f7da8d29 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -1,10 +1,13 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} module Simplex.Messaging.Server.QueueStore where +import Data.Int (Int64) +import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol @@ -16,7 +19,8 @@ data QueueRec = QueueRec senderKey :: !(Maybe SndPublicAuthKey), sndSecure :: !SenderCanSecure, notifier :: !(Maybe NtfCreds), - status :: !ServerQueueStatus + status :: !ServerQueueStatus, + updatedAt :: !(Maybe RoundedSystemTime) } deriving (Show) @@ -34,3 +38,16 @@ instance StrEncoding NtfCreds where pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show) + +newtype RoundedSystemTime = RoundedSystemTime Int64 + deriving (Eq, Ord, Show) + +instance StrEncoding RoundedSystemTime where + strEncode (RoundedSystemTime t) = strEncode t + strP = RoundedSystemTime <$> strP + +getRoundedSystemTime :: Int64 -> IO RoundedSystemTime +getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` prec) * prec) <$> getSystemTime + +getSystemDate :: IO RoundedSystemTime +getSystemDate = getRoundedSystemTime 86400 diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 3a1385269..da9dc4bb3 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -19,6 +19,7 @@ module Simplex.Messaging.Server.QueueStore.STM addQueueNotifier, deleteQueueNotifier, suspendQueue, + updateQueueTime, deleteQueue, ) where @@ -65,8 +66,8 @@ getQueue QueueStore {queues, senders, notifiers} party qId = SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues) secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec) -secureQueue QueueStore {queues} rId sKey = - atomically $ withQueue rId queues $ \qVar -> +secureQueue QueueStore {queues} rId sKey = toResult <$> do + TM.lookupIO rId queues $>>= \qVar -> atomically $ readTVar qVar >>= \q -> case senderKey q of Just k -> pure $ if sKey == k then Just q else Nothing _ -> @@ -74,26 +75,30 @@ secureQueue QueueStore {queues} rId sKey = in writeTVar qVar q' $> Just q' addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType QueueRec) -addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = atomically $ do - ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ +addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do + ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ withQueue rId queues $ \qVar -> do q <- readTVar qVar forM_ (notifier q) $ (`TM.delete` notifiers) . notifierId - writeTVar qVar $! q {notifier = Just ntfCreds} + let !q' = q {notifier = Just ntfCreds} + writeTVar qVar q' TM.insert nId rId notifiers - pure $ Just q + pure q' deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType ()) deleteQueueNotifier QueueStore {queues, notifiers} rId = - atomically $ withQueue rId queues $ \qVar -> do + withQueue rId queues $ \qVar -> do q <- readTVar qVar forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers writeTVar qVar $! q {notifier = Nothing} - pure $ Just () suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ()) suspendQueue QueueStore {queues} rId = - atomically $ withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just () + withQueue rId queues (`modifyTVar'` \q -> q {status = QueueOff}) + +updateQueueTime :: QueueStore -> RecipientId -> RoundedSystemTime -> IO () +updateQueueTime QueueStore {queues} rId t = + void $ withQueue rId queues (`modifyTVar'` \q -> q {updatedAt = Just t}) deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec) deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do @@ -108,5 +113,5 @@ deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do toResult :: Maybe a -> Either ErrorType a toResult = maybe (Left AUTH) Right -withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM (Maybe a)) -> STM (Either ErrorType a) -withQueue rId queues f = toResult <$> TM.lookup rId queues $>>= f +withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM a) -> IO (Either ErrorType a) +withQueue rId queues f = toResult <$> TM.lookupIO rId queues >>= atomically . mapM f diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 94a340d94..c47b06eb5 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -20,12 +20,14 @@ module Simplex.Messaging.Server.StoreLog logSuspendQueue, logDeleteQueue, logDeleteNotifier, + logUpdateQueueTime, readWriteStoreLog, ) where import Control.Applicative (optional, (<|>)) import Control.Monad (foldM, unless, when) +import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) @@ -33,7 +35,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol -import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..)) +import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Util (ifM) import System.Directory (doesFileExist, renameFile) @@ -52,9 +54,19 @@ data StoreLogRecord | SuspendQueue QueueId | DeleteQueue QueueId | DeleteNotifier QueueId + | UpdateTime QueueId RoundedSystemTime + +data SLRTag + = CreateQueue_ + | SecureQueue_ + | AddNotifier_ + | SuspendQueue_ + | DeleteQueue_ + | DeleteNotifier_ + | UpdateTime_ instance StrEncoding QueueRec where - strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier} = + strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} = B.unwords [ "rid=" <> strEncode recipientId, "rk=" <> strEncode recipientKey, @@ -64,8 +76,10 @@ instance StrEncoding QueueRec where ] <> if sndSecure then " sndSecure=" <> strEncode sndSecure else "" <> maybe "" notifierStr notifier + <> maybe "" updatedAtStr updatedAt where notifierStr ntfCreds = " notifier=" <> strEncode ntfCreds + updatedAtStr t = " updated_at=" <> strEncode t strP = do recipientId <- "rid=" *> strP_ @@ -75,24 +89,49 @@ instance StrEncoding QueueRec where senderKey <- "sk=" *> strP sndSecure <- (" sndSecure=" *> strP) <|> pure False notifier <- optional $ " notifier=" *> strP - pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive} + updatedAt <- optional $ " updated_at=" *> strP + pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt} + +instance StrEncoding SLRTag where + strEncode = \case + CreateQueue_ -> "CREATE" + SecureQueue_ -> "SECURE" + AddNotifier_ -> "NOTIFIER" + SuspendQueue_ -> "SUSPEND" + DeleteQueue_ -> "DELETE" + DeleteNotifier_ -> "NDELETE" + UpdateTime_ -> "TIME" + + strP = + A.takeTill (== ' ') >>= \case + "CREATE" -> pure CreateQueue_ + "SECURE" -> pure SecureQueue_ + "NOTIFIER" -> pure AddNotifier_ + "SUSPEND" -> pure SuspendQueue_ + "DELETE" -> pure DeleteQueue_ + "NDELETE" -> pure DeleteNotifier_ + "TIME" -> pure UpdateTime_ + s -> fail $ "invalid log record tag: " <> B.unpack s instance StrEncoding StoreLogRecord where strEncode = \case - CreateQueue q -> strEncode (Str "CREATE", q) - SecureQueue rId sKey -> strEncode (Str "SECURE", rId, sKey) - AddNotifier rId ntfCreds -> strEncode (Str "NOTIFIER", rId, ntfCreds) - SuspendQueue rId -> strEncode (Str "SUSPEND", rId) - DeleteQueue rId -> strEncode (Str "DELETE", rId) - DeleteNotifier rId -> strEncode (Str "NDELETE", rId) + CreateQueue q -> strEncode (CreateQueue_, q) + SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey) + AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds) + SuspendQueue rId -> strEncode (SuspendQueue_, rId) + DeleteQueue rId -> strEncode (DeleteQueue_, rId) + DeleteNotifier rId -> strEncode (DeleteNotifier_, rId) + UpdateTime rId t -> strEncode (UpdateTime_, rId, t) strP = - "CREATE " *> (CreateQueue <$> strP) - <|> "SECURE " *> (SecureQueue <$> strP_ <*> strP) - <|> "NOTIFIER " *> (AddNotifier <$> strP_ <*> strP) - <|> "SUSPEND " *> (SuspendQueue <$> strP) - <|> "DELETE " *> (DeleteQueue <$> strP) - <|> "NDELETE " *> (DeleteNotifier <$> strP) + strP_ >>= \case + CreateQueue_ -> CreateQueue <$> strP + SecureQueue_ -> SecureQueue <$> strP_ <*> strP + AddNotifier_ -> AddNotifier <$> strP_ <*> strP + SuspendQueue_ -> SuspendQueue <$> strP + DeleteQueue_ -> DeleteQueue <$> strP + DeleteNotifier_ -> DeleteNotifier <$> strP + UpdateTime_ -> UpdateTime <$> strP_ <*> strP openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) openWriteStoreLog f = do @@ -138,6 +177,9 @@ logDeleteQueue s = writeStoreLogRecord s . DeleteQueue logDeleteNotifier :: StoreLog 'WriteMode -> QueueId -> IO () logDeleteNotifier s = writeStoreLogRecord s . DeleteNotifier +logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> RoundedSystemTime -> IO () +logUpdateQueueTime s qId t = writeStoreLogRecord s $ UpdateTime qId t + readWriteStoreLog :: FilePath -> IO (Map RecipientId QueueRec, StoreLog 'WriteMode) readWriteStoreLog f = do qs <- ifM (doesFileExist f) readQS (pure M.empty) @@ -169,5 +211,6 @@ readQueues f = foldM processLine M.empty . LB.lines =<< LB.readFile f SuspendQueue qId -> M.adjust (\q -> q {status = QueueOff}) qId m DeleteQueue qId -> M.delete qId m DeleteNotifier qId -> M.adjust (\q -> q {notifier = Nothing}) qId m + UpdateTime qId t -> M.adjust (\q -> q {updatedAt = Just t}) qId m printError :: String -> IO () printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s