mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-25 22:52:15 +00:00
Merge branch 'master' into rcv-services
This commit is contained in:
@@ -173,7 +173,8 @@ library
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260115_service_certs
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
|
||||
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260411_service_certs
|
||||
else
|
||||
exposed-modules:
|
||||
Simplex.Messaging.Agent.Store.SQLite
|
||||
@@ -224,7 +225,8 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260115_service_certs
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260411_service_certs
|
||||
Simplex.Messaging.Agent.Store.SQLite.Util
|
||||
if flag(client_postgres) || flag(server_postgres)
|
||||
exposed-modules:
|
||||
|
||||
@@ -3240,18 +3240,28 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
|
||||
pure conn''
|
||||
| otherwise = pure conn'
|
||||
Right Nothing -> prohibited "msg: bad agent msg" >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
Left e@(AGENT A_DUPLICATE {}) -> do
|
||||
atomically $ incSMPServerStat c userId srv recvDuplicates
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
|
||||
| userAck -> ackDel internalId
|
||||
| otherwise ->
|
||||
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
|
||||
AgentMessage _ (A_MSG body) -> do
|
||||
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
pure ACKPending
|
||||
_ -> ack
|
||||
| otherwise -> do
|
||||
attempts <- withStore' c $ \db -> incMsgRcvAttempts db connId internalId
|
||||
AgentConfig {rcvExpireCount, rcvExpireInterval} <- asks config
|
||||
let firstTs = snd $ recipient msgMeta
|
||||
brokerTs = snd $ broker msgMeta
|
||||
now <- liftIO getCurrentTime
|
||||
if attempts >= rcvExpireCount && diffUTCTime now firstTs >= rcvExpireInterval
|
||||
then do
|
||||
notify $ ERR (AGENT $ A_DUPLICATE $ Just DroppedMsg {brokerTs, attempts})
|
||||
ackDel internalId
|
||||
else
|
||||
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
|
||||
AgentMessage _ (A_MSG body) -> do
|
||||
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
pure ACKPending
|
||||
_ -> ack
|
||||
_ -> checkDuplicateHash e encryptedMsgHash >> ack
|
||||
Left (AGENT (A_CRYPTO e)) -> do
|
||||
atomically $ incSMPServerStat c userId srv recvCryptoErrs
|
||||
|
||||
@@ -2234,7 +2234,7 @@ cryptoError :: C.CryptoError -> AgentErrorType
|
||||
cryptoError = \case
|
||||
C.CryptoLargeMsgError -> CMD LARGE "CryptoLargeMsgError"
|
||||
C.CryptoHeaderError _ -> AGENT A_MESSAGE -- parsing error
|
||||
C.CERatchetDuplicateMessage -> AGENT A_DUPLICATE
|
||||
C.CERatchetDuplicateMessage -> AGENT $ A_DUPLICATE Nothing
|
||||
C.AESDecryptError -> c DECRYPT_AES
|
||||
C.CBDecryptError -> c DECRYPT_CB
|
||||
C.CERatchetHeader -> c RATCHET_HEADER
|
||||
|
||||
@@ -173,6 +173,8 @@ data AgentConfig = AgentConfig
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
rcvExpireCount :: Int,
|
||||
rcvExpireInterval :: NominalDiffTime,
|
||||
e2eEncryptVRange :: VersionRangeE2E,
|
||||
smpAgentVRange :: VersionRangeSMPA,
|
||||
smpClientVRange :: VersionRangeSMPC
|
||||
@@ -248,6 +250,8 @@ defaultAgentConfig =
|
||||
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
|
||||
privateKeyFile = "/etc/opt/simplex-agent/agent.key",
|
||||
certificateFile = "/etc/opt/simplex-agent/agent.crt",
|
||||
rcvExpireCount = 8,
|
||||
rcvExpireInterval = nominalDay,
|
||||
e2eEncryptVRange = supportedE2EEncryptVRange,
|
||||
smpAgentVRange = supportedSMPAgentVRange,
|
||||
smpClientVRange = supportedSMPClientVRange
|
||||
|
||||
@@ -143,6 +143,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
ConnectionErrorType (..),
|
||||
BrokerErrorType (..),
|
||||
SMPAgentError (..),
|
||||
DroppedMsg (..),
|
||||
AgentCryptoError (..),
|
||||
cryptoErrToSyncState,
|
||||
ATransmission,
|
||||
@@ -798,6 +799,12 @@ data MsgMeta = MsgMeta
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
data DroppedMsg = DroppedMsg
|
||||
{ brokerTs :: UTCTime,
|
||||
attempts :: Int
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
data SMPConfirmation = SMPConfirmation
|
||||
{ -- | sender's public key to use for authentication of sender's commands at the recepient's server
|
||||
senderKey :: Maybe SndPublicAuthKey,
|
||||
@@ -2050,12 +2057,13 @@ data SMPAgentError
|
||||
A_LINK {linkErr :: String}
|
||||
| -- | cannot decrypt message
|
||||
A_CRYPTO {cryptoErr :: AgentCryptoError}
|
||||
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown
|
||||
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet)
|
||||
A_DUPLICATE
|
||||
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown.
|
||||
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet).
|
||||
-- when message is dropped after too many reception attempts, DroppedMsg is included.
|
||||
A_DUPLICATE {droppedMsg_ :: Maybe DroppedMsg}
|
||||
| -- | error in the message to add/delete/etc queue in connection
|
||||
A_QUEUE {queueErr :: String}
|
||||
deriving (Eq, Read, Show, Exception)
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
data AgentCryptoError
|
||||
= -- | AES decryption error
|
||||
@@ -2165,6 +2173,8 @@ $(J.deriveJSON (sumTypeJSON id) ''ConnectionErrorType)
|
||||
|
||||
$(J.deriveJSON (sumTypeJSON id) ''AgentCryptoError)
|
||||
|
||||
$(J.deriveJSON defaultJSON ''DroppedMsg)
|
||||
|
||||
$(J.deriveJSON (sumTypeJSON id) ''SMPAgentError)
|
||||
|
||||
$(J.deriveJSON (sumTypeJSON id) ''AgentErrorType)
|
||||
|
||||
@@ -140,6 +140,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
|
||||
setMsgUserAck,
|
||||
getRcvMsg,
|
||||
getLastMsg,
|
||||
incMsgRcvAttempts,
|
||||
checkRcvMsgHashExists,
|
||||
getRcvMsgBrokerTs,
|
||||
deleteMsg,
|
||||
@@ -1226,6 +1227,19 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
|
||||
msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_
|
||||
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}
|
||||
|
||||
incMsgRcvAttempts :: DB.Connection -> ConnId -> InternalId -> IO Int
|
||||
incMsgRcvAttempts db connId (InternalId msgId) =
|
||||
fromOnly . head
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
UPDATE rcv_messages
|
||||
SET receive_attempts = receive_attempts + 1
|
||||
WHERE conn_id = ? AND internal_id = ?
|
||||
RETURNING receive_attempts
|
||||
|]
|
||||
(connId, msgId)
|
||||
|
||||
checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
|
||||
checkRcvMsgHashExists db connId hash =
|
||||
maybeFirstRow' False fromOnlyBI $
|
||||
|
||||
@@ -11,7 +11,8 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitati
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260115_service_certs
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260411_service_certs
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Text, Maybe Text)]
|
||||
@@ -23,7 +24,8 @@ schemaMigrations =
|
||||
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
|
||||
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
|
||||
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
|
||||
("20260115_service_certs", m20260115_service_certs, Just down_m20260115_service_certs)
|
||||
("20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts),
|
||||
("20260411_service_certs", m20260411_service_certs, Just down_m20260411_service_certs)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts where
|
||||
|
||||
import Data.Text (Text)
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
m20260410_receive_attempts :: Text
|
||||
m20260410_receive_attempts =
|
||||
[r|
|
||||
ALTER TABLE rcv_messages ADD COLUMN receive_attempts SMALLINT NOT NULL DEFAULT 0;
|
||||
|]
|
||||
|
||||
down_m20260410_receive_attempts :: Text
|
||||
down_m20260410_receive_attempts =
|
||||
[r|
|
||||
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|
||||
|]
|
||||
@@ -1,14 +1,14 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260115_service_certs where
|
||||
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260411_service_certs where
|
||||
|
||||
import Data.Text (Text)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
m20260115_service_certs :: Text
|
||||
m20260115_service_certs =
|
||||
m20260411_service_certs :: Text
|
||||
m20260411_service_certs =
|
||||
createXorHashFuncs <> [r|
|
||||
CREATE TABLE client_services(
|
||||
user_id BIGINT NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
|
||||
@@ -92,8 +92,8 @@ AFTER UPDATE ON rcv_queues
|
||||
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_update();
|
||||
|]
|
||||
|
||||
down_m20260115_service_certs :: Text
|
||||
down_m20260115_service_certs =
|
||||
down_m20260411_service_certs :: Text
|
||||
down_m20260411_service_certs =
|
||||
[r|
|
||||
DROP TRIGGER tr_rcv_queue_insert ON rcv_queues;
|
||||
DROP TRIGGER tr_rcv_queue_delete ON rcv_queues;
|
||||
@@ -527,7 +527,8 @@ CREATE TABLE smp_agent_test_protocol_schema.rcv_messages (
|
||||
external_prev_snd_hash bytea NOT NULL,
|
||||
integrity bytea NOT NULL,
|
||||
user_ack smallint DEFAULT 0,
|
||||
rcv_queue_id bigint NOT NULL
|
||||
rcv_queue_id bigint NOT NULL,
|
||||
receive_attempts smallint DEFAULT 0 NOT NULL
|
||||
);
|
||||
|
||||
|
||||
|
||||
@@ -47,7 +47,8 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitation
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260115_service_certs
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260411_service_certs
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Query, Maybe Query)]
|
||||
@@ -95,7 +96,8 @@ schemaMigrations =
|
||||
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
|
||||
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
|
||||
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
|
||||
("m20260115_service_certs", m20260115_service_certs, Just down_m20260115_service_certs)
|
||||
("m20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts),
|
||||
("m20260411_service_certs", m20260411_service_certs, Just down_m20260411_service_certs)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20260410_receive_attempts :: Query
|
||||
m20260410_receive_attempts =
|
||||
[sql|
|
||||
ALTER TABLE rcv_messages ADD COLUMN receive_attempts INTEGER NOT NULL DEFAULT 0;
|
||||
|]
|
||||
|
||||
down_m20260410_receive_attempts :: Query
|
||||
down_m20260410_receive_attempts =
|
||||
[sql|
|
||||
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|
||||
|]
|
||||
@@ -1,12 +1,12 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260115_service_certs where
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260411_service_certs where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20260115_service_certs :: Query
|
||||
m20260115_service_certs =
|
||||
m20260411_service_certs :: Query
|
||||
m20260411_service_certs =
|
||||
[sql|
|
||||
CREATE TABLE client_services(
|
||||
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
|
||||
@@ -76,8 +76,8 @@ BEGIN
|
||||
END;
|
||||
|]
|
||||
|
||||
down_m20260115_service_certs :: Query
|
||||
down_m20260115_service_certs =
|
||||
down_m20260411_service_certs :: Query
|
||||
down_m20260411_service_certs =
|
||||
[sql|
|
||||
DROP TRIGGER tr_rcv_queue_insert;
|
||||
DROP TRIGGER tr_rcv_queue_delete;
|
||||
@@ -120,6 +120,7 @@ CREATE TABLE rcv_messages(
|
||||
integrity BLOB NOT NULL,
|
||||
user_ack INTEGER NULL DEFAULT 0,
|
||||
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
|
||||
receive_attempts INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY(conn_id, internal_rcv_id),
|
||||
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
|
||||
ON DELETE CASCADE
|
||||
|
||||
@@ -406,6 +406,7 @@ functionalAPITests ps = do
|
||||
it "should expire multiple messages" $ testExpireManyMessages ps
|
||||
it "should expire one message if quota is exceeded" $ testExpireMessageQuota ps
|
||||
it "should expire multiple messages if quota is exceeded" $ testExpireManyMessagesQuota ps
|
||||
it "should drop message after too many receive attempts" $ testDropMsgAfterRcvAttempts ps
|
||||
#if !defined(dbPostgres)
|
||||
-- TODO [postgres] restore from outdated db backup (we use copyFile/renameFile for sqlite)
|
||||
describe "Ratchet synchronization" $ do
|
||||
@@ -2105,6 +2106,38 @@ testExpireManyMessagesQuota (t, msType) = withSmpServerConfigOn t cfg' testPort
|
||||
where
|
||||
cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {msgQueueQuota = 1, maxJournalMsgCount = 2}
|
||||
|
||||
testDropMsgAfterRcvAttempts :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
|
||||
testDropMsgAfterRcvAttempts ps =
|
||||
withSmpServerStoreLogOn ps testPort $ \_ -> do
|
||||
let rcvCfg = agentCfg {rcvExpireCount = 2, rcvExpireInterval = 1}
|
||||
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
|
||||
bob <- getSMPAgentClient' 2 rcvCfg initAgentServers testDB2
|
||||
(aliceId, bobId) <- runRight $ makeConnection alice bob
|
||||
-- alice sends, bob receives but does NOT ack
|
||||
runRight_ $ do
|
||||
2 <- sendMessage alice bobId SMP.noMsgFlags "hello"
|
||||
get alice ##> ("", bobId, SENT 2)
|
||||
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
|
||||
-- bob disconnects without acking
|
||||
disposeAgentClient bob
|
||||
threadDelay 500000
|
||||
-- bob reconnects, agent sees duplicate, counter=1
|
||||
bob2 <- getSMPAgentClient' 3 rcvCfg initAgentServers testDB2
|
||||
runRight_ $ do
|
||||
subscribeConnection bob2 aliceId
|
||||
get bob2 =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
|
||||
-- bob disconnects again without acking
|
||||
disposeAgentClient bob2
|
||||
-- wait for rcvExpireInterval (1 second)
|
||||
threadDelay 500000
|
||||
-- bob reconnects, agent sees duplicate, counter=2, interval exceeded -> drops
|
||||
bob3 <- getSMPAgentClient' 4 rcvCfg initAgentServers testDB2
|
||||
runRight_ $ do
|
||||
subscribeConnection bob3 aliceId
|
||||
get bob3 =##> \case ("", c, ERR (AGENT (A_DUPLICATE (Just DroppedMsg {})))) -> c == aliceId; _ -> False
|
||||
disposeAgentClient bob3
|
||||
disposeAgentClient alice
|
||||
|
||||
testRatchetSync :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
|
||||
testRatchetSync ps = withAgentClients2 $ \alice bob ->
|
||||
withSmpServerStoreMsgLogOn ps testPort $ \_ -> do
|
||||
@@ -3228,7 +3261,7 @@ phase c connId d p statsExpectation =
|
||||
d `shouldBe` d'
|
||||
p `shouldBe` p'
|
||||
statsExpectation stats
|
||||
ERR (AGENT A_DUPLICATE) -> phase c connId d p statsExpectation
|
||||
ERR (AGENT A_DUPLICATE {}) -> phase c connId d p statsExpectation
|
||||
r -> do
|
||||
liftIO . putStrLn $ "expected: " <> show p <> ", received: " <> show r
|
||||
SWITCH {} <- pure r
|
||||
|
||||
Reference in New Issue
Block a user