add next queue IDs

This commit is contained in:
Evgeny Poberezkin
2022-08-23 14:31:21 +01:00
parent 799963b9d3
commit 0846a2ddb6
11 changed files with 52 additions and 25 deletions
+1
View File
@@ -52,6 +52,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220822_queue_rotation
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
Simplex.Messaging.Crypto
+7 -4
View File
@@ -746,10 +746,12 @@ getConnectionServers' c connId = connServers <$> withStore c (`getConn` connId)
where
connServers :: SomeConn -> ConnectionStats
connServers = \case
SomeConn _ (RcvConnection _ RcvQueue {server}) -> ConnectionStats {rcvServers = [server], sndServers = []}
SomeConn _ (SndConnection _ SndQueue {server}) -> ConnectionStats {rcvServers = [], sndServers = [server]}
SomeConn _ (DuplexConnection _ RcvQueue {server = s1} SndQueue {server = s2}) -> ConnectionStats {rcvServers = [s1], sndServers = [s2]}
SomeConn _ (ContactConnection _ RcvQueue {server}) -> ConnectionStats {rcvServers = [server], sndServers = []}
SomeConn _ (RcvConnection _ rq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = []}
SomeConn _ (SndConnection _ sq) -> ConnectionStats {rcvServers = [], sndServers = sndSrvs sq}
SomeConn _ (DuplexConnection _ rq sq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = sndSrvs sq}
SomeConn _ (ContactConnection _ rq) -> ConnectionStats {rcvServers = rcvSrvs rq, sndServers = []}
rcvSrvs RcvQueue {server} = [server]
sndSrvs SndQueue {server} = [server]
-- | Change servers to be used for creating new queues, in Reader monad
setSMPServers' :: AgentMonad m => AgentClient -> NonEmpty SMPServer -> m ()
@@ -1300,5 +1302,6 @@ newSndQueue_ a (Compatible (SMPQueueInfo smpClientVersion SMPQueueAddress {smpSe
e2eDhSecret = C.dh' rcvE2ePubDhKey e2ePrivKey,
e2ePubKey = Just e2ePubKey,
status = New,
dbNextSndQueueId = Nothing,
smpClientVersion
}
+1
View File
@@ -500,6 +500,7 @@ newRcvQueue_ a c srv vRange = do
e2eDhSecret = Nothing,
sndId = Just sndId,
status = New,
dbNextRcvQueueId = Nothing,
smpClientVersion = maxVersion vRange,
clientNtfCreds = Nothing
}
+4
View File
@@ -52,6 +52,8 @@ data RcvQueue = RcvQueue
sndId :: Maybe SMP.SenderId,
-- | queue status
status :: QueueStatus,
-- | database ID of the new queue created for this queue to switch to (queue rotation)
dbNextRcvQueueId :: Maybe Int64,
-- | SMP client version
smpClientVersion :: Version,
-- | credentials used in context of notifications
@@ -84,6 +86,8 @@ data SndQueue = SndQueue
e2eDhSecret :: C.DhSecretX25519,
-- | queue status
status :: QueueStatus,
-- | database ID of the new queue created for this queue to switch to (queue rotation)
dbNextSndQueueId :: Maybe Int64,
-- | SMP client version
smpClientVersion :: Version
}
+22 -16
View File
@@ -265,10 +265,11 @@ createRcvConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandsh
insertRcvQueue_ db connId q
createSndConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> SndQueue -> IO (Either StoreError ConnId)
createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} =
createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} = do
createConn_ gVar cData $ \connId -> do
upsertServer_ db server
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion, enableNtfs, duplexHandshake)
-- TODO add queue ID in insertSndQueue_
insertSndQueue_ db connId q
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError SomeConn)
@@ -297,6 +298,7 @@ upgradeRcvConnToDuplex db connId sq@SndQueue {server} =
getConn db connId $>>= \case
(SomeConn _ RcvConnection {}) -> do
upsertServer_ db server
-- TODO save with queue ID
insertSndQueue_ db connId sq
pure $ Right ()
(SomeConn c _) -> pure . Left . SEBadConnType $ connType c
@@ -1128,46 +1130,50 @@ getConnData dbConn connId' =
connData [(connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake)] = Just (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake}, cMode)
connData _ = Nothing
type ServerRow = (NonEmpty TransportHost, String, C.KeyHash)
toSMPServer :: ServerRow -> SMPServer
toSMPServer (host, port, keyHash) = SMPServer host port keyHash
getRcvQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe RcvQueue)
getRcvQueueByConnId_ dbConn connId =
listToMaybe . map rcvQueue
<$> DB.query
dbConn
[sql|
SELECT s.key_hash, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, q.smp_client_version,
SELECT q.host, q.port, s.key_hash, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, q.next_rcv_queue_id, q.smp_client_version,
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret
FROM rcv_queues q
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
WHERE q.conn_id = ?;
WHERE q.conn_id = ? AND (q.next_rcv_queue = ? OR q.next_rcv_queue IS NULL)
|]
(Only connId)
(connId, False)
where
rcvQueue ((keyHash, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
let server = SMPServer host port keyHash
rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbNextRcvQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
let server = toSMPServer srvRow
smpClientVersion = fromMaybe 1 smpClientVersion_
clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
_ -> Nothing
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, smpClientVersion, clientNtfCreds}
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbNextRcvQueueId, smpClientVersion, clientNtfCreds}
getSndQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe SndQueue)
getSndQueueByConnId_ dbConn connId =
sndQueue
listToMaybe . map sndQueue
<$> DB.query
dbConn
[sql|
SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.smp_client_version
SELECT q.host, q.port, s.key_hash, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.next_snd_queue_id, q.smp_client_version
FROM snd_queues q
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
WHERE q.conn_id = ?;
WHERE q.conn_id = ? AND (q.next_snd_queue = ? OR q.next_snd_queue IS NULL)
|]
(Only connId)
(connId, False)
where
sndQueue [(keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, smpClientVersion)] =
let server = SMPServer host port keyHash
in Just SndQueue {server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, smpClientVersion}
sndQueue _ = Nothing
sndQueue (srvRow :. (sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbNextSndQueueId, smpClientVersion)) =
let server = toSMPServer srvRow
in SndQueue {server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbNextSndQueueId, smpClientVersion}
-- * updateRcvIds helpers
@@ -36,6 +36,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220822_queue_rotation
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -50,7 +51,8 @@ schemaMigrations =
("20220607_v2", m20220608_v2),
("m20220625_v2_ntf_mode", m20220625_v2_ntf_mode),
("m20220811_onion_hosts", m20220811_onion_hosts),
("m20220817_connection_ntfs", m20220817_connection_ntfs)
("m20220817_connection_ntfs", m20220817_connection_ntfs),
("m20220822_queue_rotation", m20220822_queue_rotation)
]
-- | The list of migrations in ascending order by date
@@ -41,6 +41,9 @@ CREATE TABLE rcv_queues(
ntf_private_key BLOB,
ntf_id BLOB,
rcv_ntf_dh_secret BLOB,
rcv_queue_id INTEGER NULL,
next_rcv_queue INTEGER DEFAULT 0,
next_rcv_queue_id INTEGER NULL,
PRIMARY KEY(host, port, rcv_id),
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE,
@@ -58,6 +61,9 @@ CREATE TABLE snd_queues(
smp_client_version INTEGER NOT NULL DEFAULT 1,
snd_public_key BLOB,
e2e_pub_key BLOB,
snd_queue_id INTEGER NULL,
next_snd_queue INTEGER DEFAULT 0,
next_snd_queue_id INTEGER NULL,
PRIMARY KEY(host, port, snd_id),
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE
+1 -1
View File
@@ -9,7 +9,7 @@
module Simplex.Messaging.Server.MsgStore.STM where
import Control.Concurrent.STM.TBQueue (flushTBQueue)
import Control.Concurrent.STM.TBQueue (flushTBQueue, lengthTBQueue)
import Control.Monad (when)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
+2 -2
View File
@@ -15,7 +15,7 @@ data QueueRec = QueueRec
senderId :: SenderId,
senderKey :: Maybe SndPublicVerifyKey,
notifier :: Maybe NtfCreds,
status :: QueueStatus
status :: ServerQueueStatus
}
deriving (Eq, Show)
@@ -32,7 +32,7 @@ instance StrEncoding NtfCreds where
(notifierId, notifierKey, rcvNtfDhSecret) <- strP
pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}
data QueueStatus = QueueActive | QueueOff deriving (Eq, Show)
data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show)
class MonadQueueStore s m where
addQueue :: s -> QueueRec -> m (Either ErrorType ())
+1 -1
View File
@@ -37,7 +37,7 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), QueueStatus (..))
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..))
import Simplex.Messaging.Transport (trimCR)
import System.Directory (doesFileExist)
import System.IO
+4
View File
@@ -161,6 +161,7 @@ rcvQueue1 =
e2eDhSecret = Nothing,
sndId = Just "2345",
status = New,
dbNextRcvQueueId = Nothing,
smpClientVersion = 1,
clientNtfCreds = Nothing
}
@@ -175,6 +176,7 @@ sndQueue1 =
e2ePubKey = Nothing,
e2eDhSecret = testDhSecret,
status = New,
dbNextSndQueueId = Nothing,
smpClientVersion = 1
}
@@ -308,6 +310,7 @@ testUpgradeRcvConnToDuplex =
e2ePubKey = Nothing,
e2eDhSecret = testDhSecret,
status = New,
dbNextSndQueueId = Nothing,
smpClientVersion = 1
}
upgradeRcvConnToDuplex db "conn1" anotherSndQueue
@@ -331,6 +334,7 @@ testUpgradeSndConnToDuplex =
e2eDhSecret = Nothing,
sndId = Just "4567",
status = New,
dbNextRcvQueueId = Nothing,
smpClientVersion = 1,
clientNtfCreds = Nothing
}