agent: drop message after N reception attempts (#1762)

* agent: drop message after N reception attempts

* test

* increase count for message expiration

* fix migration

* update schema

---------

Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
This commit is contained in:
Evgeny
2026-04-11 16:24:30 +01:00
committed by GitHub
parent 97802a30fc
commit 34c0909c1a
12 changed files with 131 additions and 16 deletions
+2
View File
@@ -173,6 +173,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
@@ -223,6 +224,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
Simplex.Messaging.Agent.Store.SQLite.Util
if flag(client_postgres) || flag(server_postgres)
exposed-modules:
+18 -8
View File
@@ -3158,18 +3158,28 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
pure conn''
| otherwise = pure conn'
Right Nothing -> prohibited "msg: bad agent msg" >> ack
Left e@(AGENT A_DUPLICATE) -> do
Left e@(AGENT A_DUPLICATE {}) -> do
atomically $ incSMPServerStat c userId srv recvDuplicates
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> ackDel internalId
| otherwise ->
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
_ -> ack
| otherwise -> do
attempts <- withStore' c $ \db -> incMsgRcvAttempts db connId internalId
AgentConfig {rcvExpireCount, rcvExpireInterval} <- asks config
let firstTs = snd $ recipient msgMeta
brokerTs = snd $ broker msgMeta
now <- liftIO getCurrentTime
if attempts >= rcvExpireCount && diffUTCTime now firstTs >= rcvExpireInterval
then do
notify $ ERR (AGENT $ A_DUPLICATE $ Just DroppedMsg {brokerTs, attempts})
ackDel internalId
else
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
_ -> ack
_ -> checkDuplicateHash e encryptedMsgHash >> ack
Left (AGENT (A_CRYPTO e)) -> do
atomically $ incSMPServerStat c userId srv recvCryptoErrs
+1 -1
View File
@@ -2105,7 +2105,7 @@ cryptoError :: C.CryptoError -> AgentErrorType
cryptoError = \case
C.CryptoLargeMsgError -> CMD LARGE "CryptoLargeMsgError"
C.CryptoHeaderError _ -> AGENT A_MESSAGE -- parsing error
C.CERatchetDuplicateMessage -> AGENT A_DUPLICATE
C.CERatchetDuplicateMessage -> AGENT $ A_DUPLICATE Nothing
C.AESDecryptError -> c DECRYPT_AES
C.CBDecryptError -> c DECRYPT_CB
C.CERatchetHeader -> c RATCHET_HEADER
@@ -172,6 +172,8 @@ data AgentConfig = AgentConfig
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath,
rcvExpireCount :: Int,
rcvExpireInterval :: NominalDiffTime,
e2eEncryptVRange :: VersionRangeE2E,
smpAgentVRange :: VersionRangeSMPA,
smpClientVRange :: VersionRangeSMPC
@@ -247,6 +249,8 @@ defaultAgentConfig =
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
privateKeyFile = "/etc/opt/simplex-agent/agent.key",
certificateFile = "/etc/opt/simplex-agent/agent.crt",
rcvExpireCount = 8,
rcvExpireInterval = nominalDay,
e2eEncryptVRange = supportedE2EEncryptVRange,
smpAgentVRange = supportedSMPAgentVRange,
smpClientVRange = supportedSMPClientVRange
+14 -4
View File
@@ -146,6 +146,7 @@ module Simplex.Messaging.Agent.Protocol
ConnectionErrorType (..),
BrokerErrorType (..),
SMPAgentError (..),
DroppedMsg (..),
AgentCryptoError (..),
cryptoErrToSyncState,
ATransmission,
@@ -788,6 +789,12 @@ data MsgMeta = MsgMeta
}
deriving (Eq, Show)
data DroppedMsg = DroppedMsg
{ brokerTs :: UTCTime,
attempts :: Int
}
deriving (Eq, Show)
data SMPConfirmation = SMPConfirmation
{ -- | sender's public key to use for authentication of sender's commands at the recepient's server
senderKey :: Maybe SndPublicAuthKey,
@@ -2050,12 +2057,13 @@ data SMPAgentError
A_LINK {linkErr :: String}
| -- | cannot decrypt message
A_CRYPTO {cryptoErr :: AgentCryptoError}
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet)
A_DUPLICATE
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown.
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet).
-- when message is dropped after too many reception attempts, DroppedMsg is included.
A_DUPLICATE {droppedMsg_ :: Maybe DroppedMsg}
| -- | error in the message to add/delete/etc queue in connection
A_QUEUE {queueErr :: String}
deriving (Eq, Read, Show, Exception)
deriving (Eq, Show, Exception)
data AgentCryptoError
= -- | AES decryption error
@@ -2165,6 +2173,8 @@ $(J.deriveJSON (sumTypeJSON id) ''ConnectionErrorType)
$(J.deriveJSON (sumTypeJSON id) ''AgentCryptoError)
$(J.deriveJSON defaultJSON ''DroppedMsg)
$(J.deriveJSON (sumTypeJSON id) ''SMPAgentError)
$(J.deriveJSON (sumTypeJSON id) ''AgentErrorType)
@@ -127,6 +127,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
setMsgUserAck,
getRcvMsg,
getLastMsg,
incMsgRcvAttempts,
checkRcvMsgHashExists,
getRcvMsgBrokerTs,
deleteMsg,
@@ -1110,6 +1111,19 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}
incMsgRcvAttempts :: DB.Connection -> ConnId -> InternalId -> IO Int
incMsgRcvAttempts db connId (InternalId msgId) =
fromOnly . head
<$> DB.query
db
[sql|
UPDATE rcv_messages
SET receive_attempts = receive_attempts + 1
WHERE conn_id = ? AND internal_id = ?
RETURNING receive_attempts
|]
(connId, msgId)
checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRcvMsgHashExists db connId hash =
maybeFirstRow' False fromOnlyBI $
@@ -11,6 +11,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitati
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -21,7 +22,8 @@ schemaMigrations =
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
("20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,19 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts where
import Data.Text (Text)
import Text.RawString.QQ (r)
m20260410_receive_attempts :: Text
m20260410_receive_attempts =
[r|
ALTER TABLE rcv_messages ADD COLUMN receive_attempts SMALLINT NOT NULL DEFAULT 0;
|]
down_m20260410_receive_attempts :: Text
down_m20260410_receive_attempts =
[r|
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|]
@@ -47,6 +47,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitation
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -93,7 +94,8 @@ schemaMigrations =
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
("m20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,18 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20260410_receive_attempts :: Query
m20260410_receive_attempts =
[sql|
ALTER TABLE rcv_messages ADD COLUMN receive_attempts INTEGER NOT NULL DEFAULT 0;
|]
down_m20260410_receive_attempts :: Query
down_m20260410_receive_attempts =
[sql|
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|]
@@ -119,6 +119,7 @@ CREATE TABLE rcv_messages(
integrity BLOB NOT NULL,
user_ack INTEGER NULL DEFAULT 0,
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
receive_attempts INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(conn_id, internal_rcv_id),
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
ON DELETE CASCADE
+34 -1
View File
@@ -408,6 +408,7 @@ functionalAPITests ps = do
it "should expire multiple messages" $ testExpireManyMessages ps
it "should expire one message if quota is exceeded" $ testExpireMessageQuota ps
it "should expire multiple messages if quota is exceeded" $ testExpireManyMessagesQuota ps
it "should drop message after too many receive attempts" $ testDropMsgAfterRcvAttempts ps
#if !defined(dbPostgres)
-- TODO [postgres] restore from outdated db backup (we use copyFile/renameFile for sqlite)
describe "Ratchet synchronization" $ do
@@ -2101,6 +2102,38 @@ testExpireManyMessagesQuota (t, msType) = withSmpServerConfigOn t cfg' testPort
where
cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {msgQueueQuota = 1, maxJournalMsgCount = 2}
testDropMsgAfterRcvAttempts :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testDropMsgAfterRcvAttempts ps =
withSmpServerStoreLogOn ps testPort $ \_ -> do
let rcvCfg = agentCfg {rcvExpireCount = 2, rcvExpireInterval = 1}
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
bob <- getSMPAgentClient' 2 rcvCfg initAgentServers testDB2
(aliceId, bobId) <- runRight $ makeConnection alice bob
-- alice sends, bob receives but does NOT ack
runRight_ $ do
2 <- sendMessage alice bobId SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
-- bob disconnects without acking
disposeAgentClient bob
threadDelay 500000
-- bob reconnects, agent sees duplicate, counter=1
bob2 <- getSMPAgentClient' 3 rcvCfg initAgentServers testDB2
runRight_ $ do
subscribeConnection bob2 aliceId
get bob2 =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
-- bob disconnects again without acking
disposeAgentClient bob2
-- wait for rcvExpireInterval (1 second)
threadDelay 500000
-- bob reconnects, agent sees duplicate, counter=2, interval exceeded -> drops
bob3 <- getSMPAgentClient' 4 rcvCfg initAgentServers testDB2
runRight_ $ do
subscribeConnection bob3 aliceId
get bob3 =##> \case ("", c, ERR (AGENT (A_DUPLICATE (Just DroppedMsg {})))) -> c == aliceId; _ -> False
disposeAgentClient bob3
disposeAgentClient alice
testRatchetSync :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testRatchetSync ps = withAgentClients2 $ \alice bob ->
withSmpServerStoreMsgLogOn ps testPort $ \_ -> do
@@ -3224,7 +3257,7 @@ phase c connId d p statsExpectation =
d `shouldBe` d'
p `shouldBe` p'
statsExpectation stats
ERR (AGENT A_DUPLICATE) -> phase c connId d p statsExpectation
ERR (AGENT A_DUPLICATE {}) -> phase c connId d p statsExpectation
r -> do
liftIO . putStrLn $ "expected: " <> show p <> ", received: " <> show r
SWITCH {} <- pure r