From a98708d206458c0f990917e9eecef7185aeb5f8b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 24 Aug 2022 17:23:28 +0100 Subject: [PATCH] store functions --- rfcs/2022-08-14-queue-rotation.md | 2 +- src/Simplex/Messaging/Agent.hs | 2 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 49 ++++++++++++++++----- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/rfcs/2022-08-14-queue-rotation.md b/rfcs/2022-08-14-queue-rotation.md index d85837c55..e794ec5c4 100644 --- a/rfcs/2022-08-14-queue-rotation.md +++ b/rfcs/2022-08-14-queue-rotation.md @@ -40,7 +40,7 @@ A ->> S ->> B: QNEW (R'): address of the new queue B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of SMP confirmation for the initial exchange) B ->> R ->> A: continue sending new messages to the old queue A ->> R': secure queue -A ->> S ->> B: QREADY (R'): instruction to use new queue +A ->> S ->> B: QREADY (R'): notify sender that the queue is secured B ->> R' ->> A: QHELLO: to validate that the sender can send messages to the new queue before switching to it A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained. diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1bc082c2a..7baafeb32 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1173,7 +1173,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm | otherwise -> ignored _ -> ignored ignored = pure "END from disconnected client - ignored" - LEN 0 -> do + SMP.LEN 0 -> do -- load nextRq -- currRcvQueueDrained c rq nextRq pure () diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 9fe8da42e..e595a5e93 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -378,8 +378,31 @@ setRcvQueueNtfCreds db connId clientNtfCreds = Nothing -> (Nothing, Nothing, Nothing, Nothing) getNextRcvQueue :: DB.Connection -> Maybe Int64 -> IO (Maybe RcvQueue) -getNextRcvQueue _db = \case - Just _rqId -> pure Nothing +getNextRcvQueue db = \case + Just rqId -> + listToMaybe . map rcvQueue + <$> DB.query + db + [sql| + SELECT q.host, q.port, s.key_hash, + q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, + q.rcv_queue_action, q.rcv_queue_action_ts, q.next_rcv_queue_id, + q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret, + q.smp_client_version, q.created_at, q.updated_at + FROM rcv_queues q + INNER JOIN servers s ON q.host = s.host AND q.port = s.port + WHERE q.rcv_queue_id = ? AND q.next_rcv_queue = ? + |] + (rqId, True) + where + rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rqAction_, rqActionTs_, dbNextRcvQueueId) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (smpClientVersion_, createdAt, updatedAt)) = + let server = toSMPServer srvRow + smpClientVersion = fromMaybe 1 smpClientVersion_ + rcvQueueAction = (,) <$> rqAction_ <*> rqActionTs_ + clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of + (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} + _ -> Nothing + in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt} _ -> pure Nothing type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version) @@ -1100,26 +1123,32 @@ upsertNtfServer_ db ProtocolServer {host, port, keyHash} = do -- * createRcvConn helpers insertRcvQueue_ :: DB.Connection -> ConnId -> RcvQueue -> IO () -insertRcvQueue_ dbConn connId RcvQueue {..} = do +insertRcvQueue_ db connId RcvQueue {..} = do + qId <- newQueueId_ <$> DB.query_ db "SELECT rcv_queue_id FROM rcv_queues ORDER BY rcv_queue_id DESC LIMIT 1" DB.execute - dbConn + db [sql| INSERT INTO rcv_queues - (host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?); + (rcv_queue_id, host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?); |] - ((host server, port server, rcvId, connId) :. (rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (smpClientVersion, createdAt, updatedAt)) + ((qId, host server, port server, rcvId, connId) :. (rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (smpClientVersion, createdAt, updatedAt)) -- * createSndConn helpers insertSndQueue_ :: DB.Connection -> ConnId -> SndQueue -> IO () -insertSndQueue_ dbConn connId SndQueue {..} = do +insertSndQueue_ db connId SndQueue {..} = do + qId <- newQueueId_ <$> DB.query_ db "SELECT snd_queue_id FROM snd_queues ORDER BY snd_queue_id DESC LIMIT 1" DB.execute - dbConn + db [sql| INSERT INTO snd_queues - (host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?); + (snd_queue_id, host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?); |] - ((host server, port server, sndId, connId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (smpClientVersion, createdAt, updatedAt)) + ((qId, host server, port server, sndId, connId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (smpClientVersion, createdAt, updatedAt)) + +newQueueId_ :: [Only (Maybe Int64)] -> Int64 +newQueueId_ [] = 1 +newQueueId_ (Only maxId_ : _) = maybe 1 (+ 1) maxId_ -- * getConn helpers