mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-19 11:05:11 +00:00
remove message statuses and unused timestamps (#257)
This commit is contained in:
committed by
GitHub
parent
488398df9f
commit
02c023f939
@@ -60,7 +60,7 @@ CREATE TABLE messages (
|
||||
internal_ts TEXT NOT NULL,
|
||||
internal_rcv_id INTEGER,
|
||||
internal_snd_id INTEGER,
|
||||
msg_type BLOB NOT NULL, -- SMP_CONF?, HELLO, REPLY, DELETE
|
||||
msg_type BLOB NOT NULL, -- (H)ELLO, (R)EPLY, (D)ELETE. Should SMP confirmation be saved too?
|
||||
msg_body BLOB NOT NULL DEFAULT x'',
|
||||
PRIMARY KEY (conn_alias, internal_id),
|
||||
FOREIGN KEY (conn_alias, internal_rcv_id) REFERENCES rcv_messages
|
||||
@@ -76,8 +76,6 @@ CREATE TABLE rcv_messages (
|
||||
external_snd_id INTEGER NOT NULL,
|
||||
broker_id BLOB NOT NULL,
|
||||
broker_ts TEXT NOT NULL,
|
||||
rcv_status TEXT NOT NULL,
|
||||
ack_brocker_ts TEXT,
|
||||
internal_hash BLOB NOT NULL,
|
||||
external_prev_snd_hash BLOB NOT NULL,
|
||||
integrity BLOB NOT NULL,
|
||||
@@ -90,8 +88,6 @@ CREATE TABLE snd_messages (
|
||||
conn_alias BLOB NOT NULL,
|
||||
internal_snd_id INTEGER NOT NULL,
|
||||
internal_id INTEGER NOT NULL,
|
||||
snd_status TEXT NOT NULL,
|
||||
sent_ts TEXT,
|
||||
internal_hash BLOB NOT NULL,
|
||||
previous_msg_hash BLOB NOT NULL DEFAULT x'',
|
||||
PRIMARY KEY (conn_alias, internal_snd_id),
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
@@ -17,7 +15,6 @@ import Crypto.Random (ChaChaDRG)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind (Type)
|
||||
import Data.Text (Text)
|
||||
import Data.Time (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
@@ -66,7 +63,6 @@ class Monad m => MonadAgentStore s m where
|
||||
createSndMsg :: s -> ConnId -> SndMsgData -> m ()
|
||||
getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody))
|
||||
getPendingMsgs :: s -> ConnId -> m [InternalId]
|
||||
getMsg :: s -> ConnId -> InternalId -> m Msg
|
||||
checkRcvMsg :: s -> ConnId -> InternalId -> m ()
|
||||
deleteMsg :: s -> ConnId -> InternalId -> m ()
|
||||
|
||||
@@ -210,8 +206,6 @@ type PrevRcvMsgHash = MsgHash
|
||||
-- | Corresponds to `last_snd_msg_hash` in `connections` table
|
||||
type PrevSndMsgHash = MsgHash
|
||||
|
||||
-- ? merge/replace these with RcvMsg and SndMsg
|
||||
|
||||
-- * Message data containers - used on Msg creation to reduce number of parameters
|
||||
|
||||
data RcvMsgData = RcvMsgData
|
||||
@@ -239,35 +233,6 @@ data PendingMsg = PendingMsg
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
-- * Message types
|
||||
|
||||
-- | A message in either direction that is stored by the agent.
|
||||
data Msg = MRcv RcvMsg | MSnd SndMsg
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | A message received by the agent from a sender.
|
||||
data RcvMsg = RcvMsg
|
||||
{ msgBase :: MsgBase,
|
||||
internalRcvId :: InternalRcvId,
|
||||
-- | Id of the message at sender, corresponds to `internalSndId` from the sender's side.
|
||||
-- Sender Id is made sequential for detection of missing messages. For redundant / parallel queues,
|
||||
-- it also allows to keep track of duplicates and restore the original order before delivery to the client.
|
||||
externalSndId :: ExternalSndId,
|
||||
externalSndTs :: ExternalSndTs,
|
||||
-- | Id of the message at broker, although it is not sequential (to avoid metadata leakage for potential observer),
|
||||
-- it is needed to track repeated deliveries in case of connection loss - this logic is not implemented yet.
|
||||
brokerId :: BrokerId,
|
||||
brokerTs :: BrokerTs,
|
||||
rcvMsgStatus :: RcvMsgStatus,
|
||||
-- | Timestamp of acknowledgement to broker, corresponds to `Acknowledged` status.
|
||||
-- Don't confuse with `brokerTs` - timestamp created at broker after it receives the message from sender.
|
||||
ackBrokerTs :: AckBrokerTs,
|
||||
-- | Hash of previous message as received from sender - stored for integrity forensics.
|
||||
externalPrevSndHash :: MsgHash,
|
||||
msgIntegrity :: MsgIntegrity
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- internal Ids are newtypes to prevent mixing them up
|
||||
newtype InternalRcvId = InternalRcvId {unRcvId :: Int64} deriving (Eq, Show)
|
||||
|
||||
@@ -279,55 +244,8 @@ type BrokerId = MsgId
|
||||
|
||||
type BrokerTs = UTCTime
|
||||
|
||||
data RcvMsgStatus = RcvMsgReceived | RcvMsgAcknowledged
|
||||
deriving (Eq, Show)
|
||||
|
||||
serializeRcvMsgStatus :: RcvMsgStatus -> Text
|
||||
serializeRcvMsgStatus = \case
|
||||
RcvMsgReceived -> "rcvd"
|
||||
RcvMsgAcknowledged -> "ackd"
|
||||
|
||||
rcvMsgStatusT :: Text -> Maybe RcvMsgStatus
|
||||
rcvMsgStatusT = \case
|
||||
"rcvd" -> Just RcvMsgReceived
|
||||
"ackd" -> Just RcvMsgAcknowledged
|
||||
_ -> Nothing
|
||||
|
||||
type AckBrokerTs = UTCTime
|
||||
|
||||
type AckSenderTs = UTCTime
|
||||
|
||||
-- | A message sent by the agent to a recipient.
|
||||
data SndMsg = SndMsg
|
||||
{ msgBase :: MsgBase,
|
||||
-- | Id of the message sent / to be sent, as in its number in order of sending.
|
||||
internalSndId :: InternalSndId,
|
||||
sndMsgStatus :: SndMsgStatus,
|
||||
-- | Timestamp of the message received by broker, corresponds to `Sent` status.
|
||||
sentTs :: SentTs
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show)
|
||||
|
||||
data SndMsgStatus = SndMsgCreated | SndMsgSent
|
||||
deriving (Eq, Show)
|
||||
|
||||
serializeSndMsgStatus :: SndMsgStatus -> Text
|
||||
serializeSndMsgStatus = \case
|
||||
SndMsgCreated -> "created"
|
||||
SndMsgSent -> "sent"
|
||||
|
||||
sndMsgStatusT :: Text -> Maybe SndMsgStatus
|
||||
sndMsgStatusT = \case
|
||||
"created" -> Just SndMsgCreated
|
||||
"sent" -> Just SndMsgSent
|
||||
_ -> Nothing
|
||||
|
||||
type SentTs = UTCTime
|
||||
|
||||
type DeliveredTs = UTCTime
|
||||
|
||||
-- | Base message data independent of direction.
|
||||
data MsgBase = MsgBase
|
||||
{ connAlias :: ConnId,
|
||||
|
||||
@@ -468,10 +468,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
getPendingMsgs st connId =
|
||||
liftIO . withTransaction st $ \db ->
|
||||
map fromOnly
|
||||
<$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_alias = ? AND snd_status = ?" (connId, SndMsgCreated)
|
||||
|
||||
getMsg :: SQLiteStore -> ConnId -> InternalId -> m Msg
|
||||
getMsg _st _connId _id = throwError SENotImplemented
|
||||
<$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_alias = ?" (Only connId)
|
||||
|
||||
checkRcvMsg :: SQLiteStore -> ConnId -> InternalId -> m ()
|
||||
checkRcvMsg st connId msgId =
|
||||
@@ -512,18 +509,10 @@ instance ToField InternalId where toField (InternalId x) = toField x
|
||||
|
||||
instance FromField InternalId where fromField x = InternalId <$> fromField x
|
||||
|
||||
instance ToField RcvMsgStatus where toField = toField . serializeRcvMsgStatus
|
||||
|
||||
instance FromField RcvMsgStatus where fromField = fromTextField_ rcvMsgStatusT
|
||||
|
||||
instance ToField AMsgType where toField = toField . smpEncode
|
||||
|
||||
instance FromField AMsgType where fromField = blobFieldParser smpP
|
||||
|
||||
instance ToField SndMsgStatus where toField = toField . serializeSndMsgStatus
|
||||
|
||||
instance FromField SndMsgStatus where fromField = fromTextField_ sndMsgStatusT
|
||||
|
||||
instance ToField MsgIntegrity where toField = toField . serializeMsgIntegrity
|
||||
|
||||
instance FromField MsgIntegrity where fromField = blobFieldParser msgIntegrityP
|
||||
@@ -764,11 +753,11 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH
|
||||
[sql|
|
||||
INSERT INTO rcv_messages
|
||||
( conn_alias, internal_rcv_id, internal_id, external_snd_id,
|
||||
broker_id, broker_ts, rcv_status,
|
||||
broker_id, broker_ts,
|
||||
internal_hash, external_prev_snd_hash, integrity)
|
||||
VALUES
|
||||
(:conn_alias,:internal_rcv_id,:internal_id,:external_snd_id,
|
||||
:broker_id,:broker_ts,:rcv_status,
|
||||
:broker_id,:broker_ts,
|
||||
:internal_hash,:external_prev_snd_hash,:integrity);
|
||||
|]
|
||||
[ ":conn_alias" := connId,
|
||||
@@ -777,7 +766,6 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH
|
||||
":external_snd_id" := sndMsgId,
|
||||
":broker_id" := fst broker,
|
||||
":broker_ts" := snd broker,
|
||||
":rcv_status" := RcvMsgReceived,
|
||||
":internal_hash" := internalHash,
|
||||
":external_prev_snd_hash" := externalPrevSndHash,
|
||||
":integrity" := integrity
|
||||
@@ -857,14 +845,13 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} =
|
||||
dbConn
|
||||
[sql|
|
||||
INSERT INTO snd_messages
|
||||
( conn_alias, internal_snd_id, internal_id, snd_status, internal_hash, previous_msg_hash)
|
||||
( conn_alias, internal_snd_id, internal_id, internal_hash, previous_msg_hash)
|
||||
VALUES
|
||||
(:conn_alias,:internal_snd_id,:internal_id,:snd_status,:internal_hash,:previous_msg_hash);
|
||||
(:conn_alias,:internal_snd_id,:internal_id,:internal_hash,:previous_msg_hash);
|
||||
|]
|
||||
[ ":conn_alias" := connId,
|
||||
":internal_snd_id" := internalSndId,
|
||||
":internal_id" := internalId,
|
||||
":snd_status" := SndMsgCreated,
|
||||
":internal_hash" := internalHash,
|
||||
":previous_msg_hash" := prevMsgHash
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user