From 0846a2ddb6ad66a32acce369544ea79179a73cc9 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 23 Aug 2022 14:31:21 +0100 Subject: [PATCH] add next queue IDs --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent.hs | 11 ++++-- src/Simplex/Messaging/Agent/Client.hs | 1 + src/Simplex/Messaging/Agent/Store.hs | 4 ++ src/Simplex/Messaging/Agent/Store/SQLite.hs | 38 +++++++++++-------- .../Agent/Store/SQLite/Migrations.hs | 4 +- .../Store/SQLite/Migrations/agent_schema.sql | 6 +++ src/Simplex/Messaging/Server/MsgStore/STM.hs | 2 +- src/Simplex/Messaging/Server/QueueStore.hs | 4 +- src/Simplex/Messaging/Server/StoreLog.hs | 2 +- tests/AgentTests/SQLiteTests.hs | 4 ++ 11 files changed, 52 insertions(+), 25 deletions(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index c185f10cc..8410ce723 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -52,6 +52,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220822_queue_rotation Simplex.Messaging.Client Simplex.Messaging.Client.Agent Simplex.Messaging.Crypto diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e3378faba..d732ecb1a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -746,10 +746,12 @@ getConnectionServers' c connId = connServers <$> withStore c (`getConn` connId) where connServers :: SomeConn -> ConnectionStats connServers = \case - SomeConn _ (RcvConnection _ RcvQueue {server}) -> ConnectionStats {rcvServers = [server], sndServers = []} - SomeConn _ (SndConnection _ SndQueue {server}) -> ConnectionStats {rcvServers = [], sndServers = [server]} - SomeConn _ (DuplexConnection _ RcvQueue {server = s1} SndQueue {server = s2}) -> ConnectionStats {rcvServers = [s1], sndServers = [s2]} - SomeConn _ (ContactConnection _ RcvQueue {server}) -> ConnectionStats {rcvServers = [server], sndServers = []} + SomeConn _ (RcvConnection _ rq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = []} + SomeConn _ (SndConnection _ sq) -> ConnectionStats {rcvServers = [], sndServers = sndSrvs sq} + SomeConn _ (DuplexConnection _ rq sq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = sndSrvs sq} + SomeConn _ (ContactConnection _ rq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = []} + rcvSrvs RcvQueue {server} = [server] + sndSrvs SndQueue {server} = [server] -- | Change servers to be used for creating new queues, in Reader monad setSMPServers' :: AgentMonad m => AgentClient -> NonEmpty SMPServer -> m () @@ -1300,5 +1302,6 @@ newSndQueue_ a (Compatible (SMPQueueInfo smpClientVersion SMPQueueAddress {smpSe e2eDhSecret = C.dh' rcvE2ePubDhKey e2ePrivKey, e2ePubKey = Just e2ePubKey, status = New, + dbNextSndQueueId = Nothing, smpClientVersion } diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index a3d382d6e..494ead0f2 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -500,6 +500,7 @@ newRcvQueue_ a c srv vRange = do e2eDhSecret = Nothing, sndId = Just sndId, status = New, + dbNextRcvQueueId = Nothing, smpClientVersion = maxVersion vRange, clientNtfCreds = Nothing } diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 40f727d62..1644aca58 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -52,6 +52,8 @@ data RcvQueue = RcvQueue sndId :: Maybe SMP.SenderId, -- | queue status status :: QueueStatus, + -- | database ID of the new queue created for this queue to switch to (queue rotation) + dbNextRcvQueueId :: Maybe Int64, -- | SMP client version smpClientVersion :: Version, -- | credentials used in context of notifications @@ -84,6 +86,8 @@ data SndQueue = SndQueue e2eDhSecret :: C.DhSecretX25519, -- | queue status status :: QueueStatus, + -- | database ID of the new queue created for this queue to switch to (queue rotation) + dbNextSndQueueId :: Maybe Int64, -- | SMP client version smpClientVersion :: Version } diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 3e10a1def..7a87c6c6f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -265,10 +265,11 @@ createRcvConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandsh insertRcvQueue_ db connId q createSndConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> SndQueue -> IO (Either StoreError ConnId) -createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} = +createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} = do createConn_ gVar cData $ \connId -> do upsertServer_ db server DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion, enableNtfs, duplexHandshake) + -- TODO add queue ID in insertSndQueue_ insertSndQueue_ db connId q getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError SomeConn) @@ -297,6 +298,7 @@ upgradeRcvConnToDuplex db connId sq@SndQueue {server} = getConn db connId $>>= \case (SomeConn _ RcvConnection {}) -> do upsertServer_ db server + -- TODO save with queue ID insertSndQueue_ db connId sq pure $ Right () (SomeConn c _) -> pure . Left . SEBadConnType $ connType c @@ -1128,46 +1130,50 @@ getConnData dbConn connId' = connData [(connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake)] = Just (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake}, cMode) connData _ = Nothing +type ServerRow = (NonEmpty TransportHost, String, C.KeyHash) + +toSMPServer :: ServerRow -> SMPServer +toSMPServer (host, port, keyHash) = SMPServer host port keyHash + getRcvQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe RcvQueue) getRcvQueueByConnId_ dbConn connId = listToMaybe . map rcvQueue <$> DB.query dbConn [sql| - SELECT s.key_hash, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, - q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, q.smp_client_version, + SELECT q.host, q.port, s.key_hash, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, + q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, q.next_rcv_queue_id, q.smp_client_version, q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret FROM rcv_queues q INNER JOIN servers s ON q.host = s.host AND q.port = s.port - WHERE q.conn_id = ?; + WHERE q.conn_id = ? AND (q.next_rcv_queue = ? OR q.next_rcv_queue IS NULL) |] - (Only connId) + (connId, False) where - rcvQueue ((keyHash, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = - let server = SMPServer host port keyHash + rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbNextRcvQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = + let server = toSMPServer srvRow smpClientVersion = fromMaybe 1 smpClientVersion_ clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} _ -> Nothing - in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, smpClientVersion, clientNtfCreds} + in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbNextRcvQueueId, smpClientVersion, clientNtfCreds} getSndQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe SndQueue) getSndQueueByConnId_ dbConn connId = - sndQueue + listToMaybe . map sndQueue <$> DB.query dbConn [sql| - SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.smp_client_version + SELECT q.host, q.port, s.key_hash, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.next_snd_queue_id, q.smp_client_version FROM snd_queues q INNER JOIN servers s ON q.host = s.host AND q.port = s.port - WHERE q.conn_id = ?; + WHERE q.conn_id = ? AND (q.next_snd_queue = ? OR q.next_snd_queue IS NULL) |] - (Only connId) + (connId, False) where - sndQueue [(keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, smpClientVersion)] = - let server = SMPServer host port keyHash - in Just SndQueue {server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, smpClientVersion} - sndQueue _ = Nothing + sndQueue (srvRow :. (sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbNextSndQueueId, smpClientVersion)) = + let server = toSMPServer srvRow + in SndQueue {server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbNextSndQueueId, smpClientVersion} -- * updateRcvIds helpers diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 12e552876..d6681975a 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -36,6 +36,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2 import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220822_queue_rotation import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport.Client (TransportHost) @@ -50,7 +51,8 @@ schemaMigrations = ("20220607_v2", m20220608_v2), ("m20220625_v2_ntf_mode", m20220625_v2_ntf_mode), ("m20220811_onion_hosts", m20220811_onion_hosts), - ("m20220817_connection_ntfs", m20220817_connection_ntfs) + ("m20220817_connection_ntfs", m20220817_connection_ntfs), + ("m20220822_queue_rotation", m20220822_queue_rotation) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index d88a37a0b..df18d55ee 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -41,6 +41,9 @@ CREATE TABLE rcv_queues( ntf_private_key BLOB, ntf_id BLOB, rcv_ntf_dh_secret BLOB, + rcv_queue_id INTEGER NULL, + next_rcv_queue INTEGER DEFAULT 0, + next_rcv_queue_id INTEGER NULL, PRIMARY KEY(host, port, rcv_id), FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE, @@ -58,6 +61,9 @@ CREATE TABLE snd_queues( smp_client_version INTEGER NOT NULL DEFAULT 1, snd_public_key BLOB, e2e_pub_key BLOB, + snd_queue_id INTEGER NULL, + next_snd_queue INTEGER DEFAULT 0, + next_snd_queue_id INTEGER NULL, PRIMARY KEY(host, port, snd_id), FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index ad4a0dd0c..9dab5926f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -9,7 +9,7 @@ module Simplex.Messaging.Server.MsgStore.STM where -import Control.Concurrent.STM.TBQueue (flushTBQueue) +import Control.Concurrent.STM.TBQueue (flushTBQueue, lengthTBQueue) import Control.Monad (when) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index b8a2f10ad..c05a0d3a6 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -15,7 +15,7 @@ data QueueRec = QueueRec senderId :: SenderId, senderKey :: Maybe SndPublicVerifyKey, notifier :: Maybe NtfCreds, - status :: QueueStatus + status :: ServerQueueStatus } deriving (Eq, Show) @@ -32,7 +32,7 @@ instance StrEncoding NtfCreds where (notifierId, notifierKey, rcvNtfDhSecret) <- strP pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -data QueueStatus = QueueActive | QueueOff deriving (Eq, Show) +data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show) class MonadQueueStore s m where addQueue :: s -> QueueRec -> m (Either ErrorType ()) diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 1fe33b72d..70b3532e1 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -37,7 +37,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol -import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), QueueStatus (..)) +import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..)) import Simplex.Messaging.Transport (trimCR) import System.Directory (doesFileExist) import System.IO diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 6a79fdea2..9a974108a 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -161,6 +161,7 @@ rcvQueue1 = e2eDhSecret = Nothing, sndId = Just "2345", status = New, + dbNextRcvQueueId = Nothing, smpClientVersion = 1, clientNtfCreds = Nothing } @@ -175,6 +176,7 @@ sndQueue1 = e2ePubKey = Nothing, e2eDhSecret = testDhSecret, status = New, + dbNextSndQueueId = Nothing, smpClientVersion = 1 } @@ -308,6 +310,7 @@ testUpgradeRcvConnToDuplex = e2ePubKey = Nothing, e2eDhSecret = testDhSecret, status = New, + dbNextSndQueueId = Nothing, smpClientVersion = 1 } upgradeRcvConnToDuplex db "conn1" anotherSndQueue @@ -331,6 +334,7 @@ testUpgradeSndConnToDuplex = e2eDhSecret = Nothing, sndId = Just "4567", status = New, + dbNextRcvQueueId = Nothing, smpClientVersion = 1, clientNtfCreds = Nothing }