mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-20 00:05:25 +00:00
update DB fields presentations: port, QueueStatus, RcvMsgStatus, SndMsgStatus (#244)
* make port nullable * make conversions of enum types to/from text explicit; remove unused statuses/fields * Update src/Simplex/Messaging/Agent/Store.hs * rename RcvMsgStatus constructors Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
c8178e7f1f
commit
5e3f66a4cb
@@ -1,6 +1,6 @@
|
||||
CREATE TABLE servers (
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
port TEXT,
|
||||
key_hash BLOB,
|
||||
PRIMARY KEY (host, port)
|
||||
) WITHOUT ROWID;
|
||||
@@ -18,7 +18,7 @@ CREATE TABLE connections (
|
||||
|
||||
CREATE TABLE rcv_queues (
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
port TEXT,
|
||||
rcv_id BLOB NOT NULL,
|
||||
conn_alias BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
|
||||
rcv_private_key BLOB NOT NULL,
|
||||
@@ -37,7 +37,7 @@ CREATE TABLE rcv_queues (
|
||||
|
||||
CREATE TABLE snd_queues (
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
port TEXT,
|
||||
snd_id BLOB NOT NULL,
|
||||
conn_alias BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
|
||||
snd_private_key BLOB NOT NULL,
|
||||
@@ -73,7 +73,6 @@ CREATE TABLE rcv_messages (
|
||||
broker_ts TEXT NOT NULL,
|
||||
rcv_status TEXT NOT NULL,
|
||||
ack_brocker_ts TEXT,
|
||||
ack_sender_ts TEXT,
|
||||
internal_hash BLOB NOT NULL,
|
||||
external_prev_snd_hash BLOB NOT NULL,
|
||||
integrity BLOB NOT NULL,
|
||||
@@ -88,7 +87,6 @@ CREATE TABLE snd_messages (
|
||||
internal_id INTEGER NOT NULL,
|
||||
snd_status TEXT NOT NULL,
|
||||
sent_ts TEXT,
|
||||
delivered_ts TEXT,
|
||||
internal_hash BLOB NOT NULL,
|
||||
previous_msg_hash BLOB NOT NULL DEFAULT x'',
|
||||
PRIMARY KEY (conn_alias, internal_snd_id),
|
||||
|
||||
@@ -92,6 +92,8 @@ module Simplex.Messaging.Agent.Protocol
|
||||
connReqP',
|
||||
msgIntegrityP,
|
||||
agentErrorTypeP,
|
||||
serializeQueueStatus,
|
||||
queueStatusT,
|
||||
|
||||
-- * TCP transport functions
|
||||
tPut,
|
||||
@@ -532,6 +534,23 @@ data QueueStatus
|
||||
Disabled
|
||||
deriving (Eq, Show, Read)
|
||||
|
||||
serializeQueueStatus :: QueueStatus -> Text
|
||||
serializeQueueStatus = \case
|
||||
New -> "new"
|
||||
Confirmed -> "confirmed"
|
||||
Secured -> "secured"
|
||||
Active -> "active"
|
||||
Disabled -> "disabled"
|
||||
|
||||
queueStatusT :: Text -> Maybe QueueStatus
|
||||
queueStatusT = \case
|
||||
"new" -> Just New
|
||||
"confirmed" -> Just Confirmed
|
||||
"secured" -> Just Secured
|
||||
"active" -> Just Active
|
||||
"disabled" -> Just Disabled
|
||||
_ -> Nothing
|
||||
|
||||
type AgentMsgId = Int64
|
||||
|
||||
-- | Result of received message integrity validation.
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
@@ -15,6 +17,7 @@ import Crypto.Random (ChaChaDRG)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind (Type)
|
||||
import Data.Text (Text)
|
||||
import Data.Time (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
@@ -257,13 +260,9 @@ data RcvMsg = RcvMsg
|
||||
brokerId :: BrokerId,
|
||||
brokerTs :: BrokerTs,
|
||||
rcvMsgStatus :: RcvMsgStatus,
|
||||
-- | Timestamp of acknowledgement to broker, corresponds to `AcknowledgedToBroker` status.
|
||||
-- Do not mix up with `brokerTs` - timestamp created at broker after it receives the message from sender.
|
||||
-- | Timestamp of acknowledgement to broker, corresponds to `Acknowledged` status.
|
||||
-- Don't confuse with `brokerTs` - timestamp created at broker after it receives the message from sender.
|
||||
ackBrokerTs :: AckBrokerTs,
|
||||
-- | Timestamp of acknowledgement to sender, corresponds to `AcknowledgedToSender` status.
|
||||
-- Do not mix up with `externalSndTs` - timestamp created at sender before sending,
|
||||
-- which in its turn corresponds to `internalTs` in sending agent.
|
||||
ackSenderTs :: AckSenderTs,
|
||||
-- | Hash of previous message as received from sender - stored for integrity forensics.
|
||||
externalPrevSndHash :: MsgHash,
|
||||
msgIntegrity :: MsgIntegrity
|
||||
@@ -281,12 +280,20 @@ type BrokerId = MsgId
|
||||
|
||||
type BrokerTs = UTCTime
|
||||
|
||||
data RcvMsgStatus
|
||||
= Received
|
||||
| AcknowledgedToBroker
|
||||
| AcknowledgedToSender
|
||||
data RcvMsgStatus = RcvMsgReceived | RcvMsgAcknowledged
|
||||
deriving (Eq, Show)
|
||||
|
||||
serializeRcvMsgStatus :: RcvMsgStatus -> Text
|
||||
serializeRcvMsgStatus = \case
|
||||
RcvMsgReceived -> "rcvd"
|
||||
RcvMsgAcknowledged -> "ackd"
|
||||
|
||||
rcvMsgStatusT :: Text -> Maybe RcvMsgStatus
|
||||
rcvMsgStatusT = \case
|
||||
"rcvd" -> Just RcvMsgReceived
|
||||
"ackd" -> Just RcvMsgAcknowledged
|
||||
_ -> Nothing
|
||||
|
||||
type AckBrokerTs = UTCTime
|
||||
|
||||
type AckSenderTs = UTCTime
|
||||
@@ -298,20 +305,26 @@ data SndMsg = SndMsg
|
||||
internalSndId :: InternalSndId,
|
||||
sndMsgStatus :: SndMsgStatus,
|
||||
-- | Timestamp of the message received by broker, corresponds to `Sent` status.
|
||||
sentTs :: SentTs,
|
||||
-- | Timestamp of the message received by recipient, corresponds to `Delivered` status.
|
||||
deliveredTs :: DeliveredTs
|
||||
sentTs :: SentTs
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show)
|
||||
|
||||
data SndMsgStatus
|
||||
= SndMsgCreated
|
||||
| SndMsgSent
|
||||
| SndMsgDelivered
|
||||
data SndMsgStatus = SndMsgCreated | SndMsgSent
|
||||
deriving (Eq, Show)
|
||||
|
||||
serializeSndMsgStatus :: SndMsgStatus -> Text
|
||||
serializeSndMsgStatus = \case
|
||||
SndMsgCreated -> "created"
|
||||
SndMsgSent -> "sent"
|
||||
|
||||
sndMsgStatusT :: Text -> Maybe SndMsgStatus
|
||||
sndMsgStatusT = \case
|
||||
"created" -> Just SndMsgCreated
|
||||
"sent" -> Just SndMsgSent
|
||||
_ -> Nothing
|
||||
|
||||
type SentTs = UTCTime
|
||||
|
||||
type DeliveredTs = UTCTime
|
||||
|
||||
@@ -37,7 +37,6 @@ import Data.ByteString.Base64 (encode)
|
||||
import Data.Char (toLower)
|
||||
import Data.Functor (($>))
|
||||
import Data.List (find)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
@@ -48,7 +47,6 @@ import Database.SQLite.Simple.Internal (Field (..))
|
||||
import Database.SQLite.Simple.Ok (Ok (Ok))
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import Network.Socket (ServiceName)
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration)
|
||||
@@ -62,7 +60,6 @@ import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (takeDirectory)
|
||||
import System.IO (hFlush, stdout)
|
||||
import Text.Read (readMaybe)
|
||||
import qualified UnliftIO.Exception as E
|
||||
|
||||
-- * SQLite Store implementation
|
||||
@@ -195,7 +192,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
FROM rcv_queues q
|
||||
WHERE q.host = :host AND q.port = :port AND q.rcv_id = :rcv_id;
|
||||
|]
|
||||
[":host" := host, ":port" := serializePort_ port, ":rcv_id" := rcvId]
|
||||
[":host" := host, ":port" := port, ":rcv_id" := rcvId]
|
||||
>>= \case
|
||||
[Only connId] -> getConn_ db connId
|
||||
_ -> pure $ Left SEConnNotFound
|
||||
@@ -241,7 +238,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
SET status = :status
|
||||
WHERE host = :host AND port = :port AND rcv_id = :rcv_id;
|
||||
|]
|
||||
[":status" := status, ":host" := host, ":port" := serializePort_ port, ":rcv_id" := rcvId]
|
||||
[":status" := status, ":host" := host, ":port" := port, ":rcv_id" := rcvId]
|
||||
|
||||
setRcvQueueConfirmedE2E :: SQLiteStore -> RcvQueue -> C.PublicKeyX25519 -> C.DhSecretX25519 -> m ()
|
||||
setRcvQueueConfirmedE2E st RcvQueue {rcvId, server = SMPServer {host, port}} e2eSndPubKey e2eDhSecret =
|
||||
@@ -259,7 +256,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
":e2e_snd_pub_key" := e2eSndPubKey,
|
||||
":e2e_dh_secret" := e2eDhSecret,
|
||||
":host" := host,
|
||||
":port" := serializePort_ port,
|
||||
":port" := port,
|
||||
":rcv_id" := rcvId
|
||||
]
|
||||
|
||||
@@ -274,7 +271,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
SET status = :status
|
||||
WHERE host = :host AND port = :port AND snd_id = :snd_id;
|
||||
|]
|
||||
[":status" := status, ":host" := host, ":port" := serializePort_ port, ":snd_id" := sndId]
|
||||
[":status" := status, ":host" := host, ":port" := port, ":snd_id" := sndId]
|
||||
|
||||
createConfirmation :: SQLiteStore -> TVar ChaChaDRG -> NewConfirmation -> m ConfirmationId
|
||||
createConfirmation st gVar NewConfirmation {connId, senderConf = SMPConfirmation {senderKey, e2ePubKey, connInfo}} =
|
||||
@@ -516,22 +513,13 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
SET rcv_status = ?, ack_brocker_ts = datetime('now')
|
||||
WHERE conn_alias = ? AND internal_id = ?
|
||||
|]
|
||||
(AcknowledgedToBroker, connId, msgId)
|
||||
(RcvMsgAcknowledged, connId, msgId)
|
||||
|
||||
-- * Auxiliary helpers
|
||||
|
||||
-- ? replace with ToField? - it's easy to forget to use this
|
||||
serializePort_ :: Maybe ServiceName -> ServiceName
|
||||
serializePort_ = fromMaybe "_"
|
||||
instance ToField QueueStatus where toField = toField . serializeQueueStatus
|
||||
|
||||
deserializePort_ :: ServiceName -> Maybe ServiceName
|
||||
deserializePort_ "_" = Nothing
|
||||
deserializePort_ port = Just port
|
||||
|
||||
-- TODO make status conversion explicit
|
||||
instance ToField QueueStatus where toField = toField . show
|
||||
|
||||
instance FromField QueueStatus where fromField = fromTextField_ $ readMaybe . T.unpack
|
||||
instance FromField QueueStatus where fromField = fromTextField_ queueStatusT
|
||||
|
||||
instance ToField InternalRcvId where toField (InternalRcvId x) = toField x
|
||||
|
||||
@@ -545,11 +533,13 @@ instance ToField InternalId where toField (InternalId x) = toField x
|
||||
|
||||
instance FromField InternalId where fromField x = InternalId <$> fromField x
|
||||
|
||||
-- TODO make status conversion explicit
|
||||
instance ToField RcvMsgStatus where toField = toField . show
|
||||
instance ToField RcvMsgStatus where toField = toField . serializeRcvMsgStatus
|
||||
|
||||
-- TODO make status conversion explicit
|
||||
instance ToField SndMsgStatus where toField = toField . show
|
||||
instance FromField RcvMsgStatus where fromField = fromTextField_ rcvMsgStatusT
|
||||
|
||||
instance ToField SndMsgStatus where toField = toField . serializeSndMsgStatus
|
||||
|
||||
instance FromField SndMsgStatus where fromField = fromTextField_ sndMsgStatusT
|
||||
|
||||
instance ToField MsgIntegrity where toField = toField . serializeMsgIntegrity
|
||||
|
||||
@@ -615,7 +605,6 @@ instance (ToField a, ToField b, ToField c, ToField d, ToField e, ToField f,
|
||||
|
||||
upsertServer_ :: DB.Connection -> SMPServer -> IO ()
|
||||
upsertServer_ dbConn SMPServer {host, port, keyHash} = do
|
||||
let port_ = serializePort_ port
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
[sql|
|
||||
@@ -625,14 +614,13 @@ upsertServer_ dbConn SMPServer {host, port, keyHash} = do
|
||||
port=excluded.port,
|
||||
key_hash=excluded.key_hash;
|
||||
|]
|
||||
[":host" := host, ":port" := port_, ":key_hash" := keyHash]
|
||||
[":host" := host, ":port" := port, ":key_hash" := keyHash]
|
||||
|
||||
-- * createRcvConn helpers
|
||||
|
||||
insertRcvQueue_ :: DB.Connection -> ConnId -> RcvQueue -> IO ()
|
||||
insertRcvQueue_ dbConn connId RcvQueue {..} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
e2eSndPubKey = fst <$> e2eShared :: Maybe C.PublicKeyX25519
|
||||
let e2eSndPubKey = fst <$> e2eShared :: Maybe C.PublicKeyX25519
|
||||
e2eDhSecret = snd <$> e2eShared :: Maybe C.DhSecretX25519
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
@@ -643,7 +631,7 @@ insertRcvQueue_ dbConn connId RcvQueue {..} = do
|
||||
(:host,:port,:rcv_id,:conn_alias,:rcv_private_key,:rcv_dh_secret,:e2e_priv_key,:e2e_snd_pub_key,:e2e_dh_secret,:snd_id,:status);
|
||||
|]
|
||||
[ ":host" := host server,
|
||||
":port" := port_,
|
||||
":port" := port server,
|
||||
":rcv_id" := rcvId,
|
||||
":conn_alias" := connId,
|
||||
":rcv_private_key" := rcvPrivateKey,
|
||||
@@ -659,7 +647,6 @@ insertRcvQueue_ dbConn connId RcvQueue {..} = do
|
||||
|
||||
insertSndQueue_ :: DB.Connection -> ConnId -> SndQueue -> IO ()
|
||||
insertSndQueue_ dbConn connId SndQueue {..} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
[sql|
|
||||
@@ -669,7 +656,7 @@ insertSndQueue_ dbConn connId SndQueue {..} = do
|
||||
(:host,:port,:snd_id,:conn_alias,:snd_private_key,:e2e_pub_key,:e2e_dh_secret,:status);
|
||||
|]
|
||||
[ ":host" := host server,
|
||||
":port" := port_,
|
||||
":port" := port server,
|
||||
":snd_id" := sndId,
|
||||
":conn_alias" := connId,
|
||||
":snd_private_key" := sndPrivateKey,
|
||||
@@ -717,7 +704,7 @@ getRcvQueueByConnAlias_ dbConn connId =
|
||||
(Only connId)
|
||||
where
|
||||
rcvQueue [(keyHash, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eSndPubKey, e2eDhSecret, sndId, status)] =
|
||||
let server = SMPServer host (deserializePort_ port) keyHash
|
||||
let server = SMPServer host port keyHash
|
||||
e2eShared = (,) <$> e2eSndPubKey <*> e2eDhSecret
|
||||
in Just RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eShared, sndId, status}
|
||||
rcvQueue _ = Nothing
|
||||
@@ -737,7 +724,7 @@ getSndQueueByConnAlias_ dbConn connId =
|
||||
(Only connId)
|
||||
where
|
||||
sndQueue [(keyHash, host, port, sndId, sndPrivateKey, e2ePubKey, e2eDhSecret, status)] =
|
||||
let server = SMPServer host (deserializePort_ port) keyHash
|
||||
let server = SMPServer host port keyHash
|
||||
in Just SndQueue {server, sndId, sndPrivateKey, e2ePubKey, e2eDhSecret, status}
|
||||
sndQueue _ = Nothing
|
||||
|
||||
@@ -799,11 +786,11 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH
|
||||
[sql|
|
||||
INSERT INTO rcv_messages
|
||||
( conn_alias, internal_rcv_id, internal_id, external_snd_id,
|
||||
broker_id, broker_ts, rcv_status, ack_brocker_ts, ack_sender_ts,
|
||||
broker_id, broker_ts, rcv_status,
|
||||
internal_hash, external_prev_snd_hash, integrity)
|
||||
VALUES
|
||||
(:conn_alias,:internal_rcv_id,:internal_id,:external_snd_id,
|
||||
:broker_id,:broker_ts,:rcv_status, NULL, NULL,
|
||||
:broker_id,:broker_ts,:rcv_status,
|
||||
:internal_hash,:external_prev_snd_hash,:integrity);
|
||||
|]
|
||||
[ ":conn_alias" := connId,
|
||||
@@ -812,7 +799,7 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH
|
||||
":external_snd_id" := sndMsgId,
|
||||
":broker_id" := fst broker,
|
||||
":broker_ts" := snd broker,
|
||||
":rcv_status" := Received,
|
||||
":rcv_status" := RcvMsgReceived,
|
||||
":internal_hash" := internalHash,
|
||||
":external_prev_snd_hash" := externalPrevSndHash,
|
||||
":integrity" := integrity
|
||||
@@ -891,9 +878,9 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} =
|
||||
dbConn
|
||||
[sql|
|
||||
INSERT INTO snd_messages
|
||||
( conn_alias, internal_snd_id, internal_id, snd_status, sent_ts, delivered_ts, internal_hash, previous_msg_hash)
|
||||
( conn_alias, internal_snd_id, internal_id, snd_status, internal_hash, previous_msg_hash)
|
||||
VALUES
|
||||
(:conn_alias,:internal_snd_id,:internal_id,:snd_status, NULL, NULL,:internal_hash,:previous_msg_hash);
|
||||
(:conn_alias,:internal_snd_id,:internal_id,:snd_status,:internal_hash,:previous_msg_hash);
|
||||
|]
|
||||
[ ":conn_alias" := connId,
|
||||
":internal_snd_id" := internalSndId,
|
||||
|
||||
Reference in New Issue
Block a user