Files
simplexmq/src/Simplex/Messaging/Agent/Store.hs
T
2026-03-03 21:16:46 +00:00

831 lines
26 KiB
Haskell

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
module Simplex.Messaging.Agent.Store
( RcvQueue,
NewRcvQueue,
StoredRcvQueue (..),
RcvQueueSub (..),
ClientNtfCreds (..),
InvShortLink (..),
SndQueue,
NewSndQueue,
StoredSndQueue (..),
SMPQueueRec (..),
SomeRcvQueue (..),
ConnType (..),
Connection' (..),
Connection,
SConnType (..),
SomeConn' (..),
SomeConn,
SomeConnSub,
ConnData (..),
NoticeId,
PendingCommand (..),
AgentCmdType (..),
AgentCommand (..),
AgentCommandTag (..),
InternalCommand (..),
InternalCommandTag (..),
NewConfirmation (..),
AcceptedConfirmation (..),
NewInvitation (..),
Invitation (..),
PrevExternalSndId,
PrevRcvMsgHash,
PrevSndMsgHash,
RcvMsgData (..),
RcvMsg (..),
SndMsgData (..),
SndMsgPrepData (..),
SndMsg (..),
PendingMsgData (..),
PendingMsgPrepData (..),
InternalRcvId (..),
ExternalSndId,
ExternalSndTs,
BrokerId,
BrokerTs,
InternalSndId (..),
MsgBase (..),
InternalId (..),
InternalTs,
AsyncCmdId,
StoreError (..),
AnyStoreError (..),
ServiceAssoc,
createStore,
rcvQueueSub,
rcvSMPQueueAddress,
canAbortRcvSwitch,
findQ,
removeQ,
removeQP,
sndAddress,
findRQ,
switchingRQ,
updatedQs,
toConnData,
updateConnection,
connType,
ratchetSyncAllowed,
ratchetSyncSendProhibited,
agentCommandTag,
internalCmdTag,
) where
import Control.Exception (Exception (..))
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.Kind (Type)
import Data.List (find)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (isJust)
import Data.Time (UTCTime)
import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.Store.Common
import Simplex.Messaging.Agent.Store.DB (SQLError)
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Agent.Store.Interface (createDBStore)
import Simplex.Messaging.Agent.Store.Migrations.App (appMigrations)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..), MigrationError (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (MsgEncryptKeyX448, PQEncryption, PQSupport, RatchetX448)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( MsgBody,
MsgFlags,
MsgId,
NotifierId,
NtfPrivateAuthKey,
NtfPublicAuthKey,
QueueMode,
RcvDhSecret,
RcvNtfDhSecret,
RcvPrivateAuthKey,
SndPrivateAuthKey,
VersionSMPC,
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (AnyError (..), bshow)
createStore :: DBOpts -> MigrationConfig -> IO (Either MigrationError DBStore)
createStore dbOpts = createDBStore dbOpts appMigrations
-- * Queue types
type RcvQueue = StoredRcvQueue 'DBStored
type NewRcvQueue = StoredRcvQueue 'DBNew
-- | A receive queue. SMP queue through which the agent receives messages from a sender.
data StoredRcvQueue (q :: DBStored) = RcvQueue
{ userId :: UserId,
connId :: ConnId,
server :: SMPServer,
-- | recipient queue ID
rcvId :: SMP.RecipientId,
-- | key used by the recipient to authorize transmissions
rcvPrivateKey :: RcvPrivateAuthKey,
-- | shared DH secret used to encrypt/decrypt message bodies from server to recipient
rcvDhSecret :: RcvDhSecret,
-- | private DH key related to public sent to sender out-of-band (to agree simple per-queue e2e)
e2ePrivKey :: C.PrivateKeyX25519,
-- | public sender's DH key and agreed shared DH secret for simple per-queue e2e
e2eDhSecret :: Maybe C.DhSecretX25519,
-- | sender queue ID
sndId :: SMP.SenderId,
-- | sender can secure the queue
queueMode :: Maybe QueueMode,
-- | short link ID and credentials
shortLink :: Maybe ShortLinkCreds,
-- | associated client service
rcvServiceAssoc :: ServiceAssoc,
-- | queue status
status :: QueueStatus,
-- | to enable notifications for this queue - this field is duplicated from ConnData
enableNtfs :: Bool,
-- | client notice
clientNoticeId :: Maybe NoticeId,
-- | database queue ID (within connection)
dbQueueId :: DBEntityId' q,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | database queue ID to replace, Nothing if this queue is not replacing another, `Just Nothing` is used for replacing old queues
dbReplaceQueueId :: Maybe Int64,
rcvSwchStatus :: Maybe RcvSwitchStatus,
-- | SMP client version
smpClientVersion :: VersionSMPC,
-- | credentials used in context of notifications
clientNtfCreds :: Maybe ClientNtfCreds,
deleteErrors :: Int
}
deriving (Show)
data RcvQueueSub = RcvQueueSub
{ userId :: UserId,
connId :: ConnId,
server :: SMPServer,
rcvId :: SMP.RecipientId,
rcvPrivateKey :: RcvPrivateAuthKey,
status :: QueueStatus,
enableNtfs :: Bool,
clientNoticeId :: Maybe NoticeId,
dbQueueId :: Int64,
primary :: Bool,
dbReplaceQueueId :: Maybe Int64
}
deriving (Show)
rcvQueueSub :: RcvQueue -> RcvQueueSub
rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, clientNoticeId, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} =
RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId}
type ServiceAssoc = Bool
rcvSMPQueueAddress :: RcvQueue -> SMPQueueAddress
rcvSMPQueueAddress RcvQueue {server, sndId, e2ePrivKey, queueMode} =
SMPQueueAddress server sndId (C.publicKey e2ePrivKey) queueMode
canAbortRcvSwitch :: RcvQueue -> Bool
canAbortRcvSwitch = maybe False canAbort . rcvSwchStatus
where
canAbort = \case
RSSwitchStarted -> True
RSSendingQADD -> True
-- if switch is in RSSendingQUSE, a race condition with sender deleting the original queue is possible
RSSendingQUSE -> False
-- if switch is in RSReceivedMessage status, aborting switch (deleting new queue)
-- will break the connection because the sender would have original queue deleted
RSReceivedMessage -> False
data ClientNtfCreds = ClientNtfCreds
{ -- | key pair to be used by the notification server to authorize transmissions
ntfPublicKey :: NtfPublicAuthKey,
ntfPrivateKey :: NtfPrivateAuthKey,
-- | queue ID to be used by the notification server for NSUB command
notifierId :: NotifierId,
-- | shared DH secret used to encrypt/decrypt notification metadata (NMsgMeta) from server to recipient
rcvNtfDhSecret :: RcvNtfDhSecret
}
deriving (Show)
-- This record is stored in inv_short_links table.
-- It is needed only for 1-time invitation links because of "secure-on-read" property of link data,
-- that prevents undetected access to link data from link observers.
data InvShortLink = InvShortLink
{ server :: SMPServer,
linkId :: SMP.LinkId,
linkKey :: LinkKey,
sndPrivateKey :: SndPrivateAuthKey, -- stored to allow retries
sndId :: Maybe SMP.SenderId
}
deriving (Show)
type SndQueue = StoredSndQueue 'DBStored
type NewSndQueue = StoredSndQueue 'DBNew
-- | A send queue. SMP queue through which the agent sends messages to a recipient.
data StoredSndQueue (q :: DBStored) = SndQueue
{ userId :: UserId,
connId :: ConnId,
server :: SMPServer,
-- | sender queue ID
sndId :: SMP.SenderId,
-- | sender can secure the queue
queueMode :: Maybe QueueMode,
-- | sender key used to authorize transmissions
sndPrivateKey :: SndPrivateAuthKey,
-- | DH public key used to negotiate per-queue e2e encryption
e2ePubKey :: Maybe C.PublicKeyX25519,
-- | shared DH secret agreed for simple per-queue e2e encryption
e2eDhSecret :: C.DhSecretX25519,
-- | queue status
status :: QueueStatus,
-- | database queue ID (within connection)
dbQueueId :: DBEntityId' q,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | ID of the queue this one is replacing
dbReplaceQueueId :: Maybe Int64,
sndSwchStatus :: Maybe SndSwitchStatus,
-- | SMP client version
smpClientVersion :: VersionSMPC
}
deriving (Show)
instance SMPQueue RcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
queueId RcvQueue {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue NewRcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
queueId RcvQueue {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue RcvQueueSub where
qServer RcvQueueSub {server} = server
{-# INLINE qServer #-}
queueId RcvQueueSub {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue SndQueue where
qServer SndQueue {server} = server
{-# INLINE qServer #-}
queueId SndQueue {sndId} = sndId
{-# INLINE queueId #-}
findQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe q
findQ = find . sameQueue
{-# INLINE findQ #-}
removeQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe (q, [q])
removeQ = removeQP . sameQueue
{-# INLINE removeQ #-}
removeQP :: (q -> Bool) -> NonEmpty q -> Maybe (q, [q])
removeQP p qs = case L.break p qs of
(_, []) -> Nothing
(qs1, q : qs2) -> Just (q, qs1 <> qs2)
sndAddress :: RcvQueue -> (SMPServer, SMP.SenderId)
sndAddress RcvQueue {server, sndId} = (server, sndId)
{-# INLINE sndAddress #-}
findRQ :: (SMPServer, SMP.SenderId) -> NonEmpty RcvQueue -> Maybe RcvQueue
findRQ sAddr = find $ sameQAddress sAddr . sndAddress
{-# INLINE findRQ #-}
switchingRQ :: NonEmpty RcvQueue -> Maybe RcvQueue
switchingRQ = find $ isJust . rcvSwchStatus
{-# INLINE switchingRQ #-}
updatedQs :: SMPQueueRec q => q -> NonEmpty q -> NonEmpty q
updatedQs q = L.map $ \q' -> if dbQId q == dbQId q' then q else q'
{-# INLINE updatedQs #-}
class SMPQueue q => SMPQueueRec q where
qUserId :: q -> UserId
qConnId :: q -> ConnId
dbQId :: q -> Int64
qPrimary :: q -> Bool
dbReplaceQId :: q -> Maybe Int64
instance SMPQueueRec RcvQueue where
qUserId RcvQueue {userId} = userId
{-# INLINE qUserId #-}
qConnId RcvQueue {connId} = connId
{-# INLINE qConnId #-}
dbQId RcvQueue {dbQueueId = DBEntityId qId} = qId
{-# INLINE dbQId #-}
qPrimary RcvQueue {primary} = primary
{-# INLINE qPrimary #-}
dbReplaceQId RcvQueue {dbReplaceQueueId} = dbReplaceQueueId
{-# INLINE dbReplaceQId #-}
instance SMPQueueRec RcvQueueSub where
qUserId RcvQueueSub {userId} = userId
{-# INLINE qUserId #-}
qConnId RcvQueueSub {connId} = connId
{-# INLINE qConnId #-}
dbQId RcvQueueSub {dbQueueId} = dbQueueId
{-# INLINE dbQId #-}
qPrimary RcvQueueSub {primary} = primary
{-# INLINE qPrimary #-}
dbReplaceQId RcvQueueSub {dbReplaceQueueId} = dbReplaceQueueId
{-# INLINE dbReplaceQId #-}
instance SMPQueueRec SndQueue where
qUserId SndQueue {userId} = userId
{-# INLINE qUserId #-}
qConnId SndQueue {connId} = connId
{-# INLINE qConnId #-}
dbQId SndQueue {dbQueueId = DBEntityId qId} = qId
{-# INLINE dbQId #-}
qPrimary SndQueue {primary} = primary
{-# INLINE qPrimary #-}
dbReplaceQId SndQueue {dbReplaceQueueId} = dbReplaceQueueId
{-# INLINE dbReplaceQId #-}
class SMPQueueRec q => SomeRcvQueue q where
rcvAuthKey :: q -> RcvPrivateAuthKey
instance SomeRcvQueue RcvQueue where
rcvAuthKey RcvQueue {rcvPrivateKey} = rcvPrivateKey
{-# INLINE rcvAuthKey #-}
instance SomeRcvQueue RcvQueueSub where
rcvAuthKey RcvQueueSub {rcvPrivateKey} = rcvPrivateKey
{-# INLINE rcvAuthKey #-}
-- * Connection types
-- | Type of a connection.
data ConnType = CNew | CRcv | CSnd | CDuplex | CContact deriving (Eq, Show)
-- | Connection of a specific type.
--
-- - RcvConnection is a connection that only has a receive queue set up,
-- typically created by a recipient initiating a duplex connection.
--
-- - SndConnection is a connection that only has a send queue set up, typically
-- created by a sender joining a duplex connection through a recipient's invitation.
--
-- - DuplexConnection is a connection that has both receive and send queues set up,
-- typically created by upgrading a receive or a send connection with a missing queue.
data Connection' (d :: ConnType) rq sq where
NewConnection :: ConnData -> Connection' CNew rq sq
RcvConnection :: ConnData -> rq -> Connection' CRcv rq sq
SndConnection :: ConnData -> sq -> Connection' CSnd rq sq
DuplexConnection :: ConnData -> NonEmpty rq -> NonEmpty sq -> Connection' CDuplex rq sq
ContactConnection :: ConnData -> rq -> Connection' CContact rq sq
deriving instance (Show rq, Show sq) => Show (Connection' d rq sq)
type Connection d = Connection' d RcvQueue SndQueue
toConnData :: Connection' d rq sq -> ConnData
toConnData = \case
NewConnection cData -> cData
RcvConnection cData _ -> cData
SndConnection cData _ -> cData
DuplexConnection cData _ _ -> cData
ContactConnection cData _ -> cData
updateConnection :: ConnData -> Connection' d rq sq -> Connection' d rq sq
updateConnection cData = \case
NewConnection _ -> NewConnection cData
RcvConnection _ rq -> RcvConnection cData rq
SndConnection _ sq -> SndConnection cData sq
DuplexConnection _ rqs sqs -> DuplexConnection cData rqs sqs
ContactConnection _ rq -> ContactConnection cData rq
data SConnType :: ConnType -> Type where
SCNew :: SConnType CNew
SCRcv :: SConnType CRcv
SCSnd :: SConnType CSnd
SCDuplex :: SConnType CDuplex
SCContact :: SConnType CContact
connType :: SConnType c -> ConnType
connType SCNew = CNew
connType SCRcv = CRcv
connType SCSnd = CSnd
connType SCDuplex = CDuplex
connType SCContact = CContact
deriving instance Show (SConnType d)
instance TestEquality SConnType where
testEquality SCRcv SCRcv = Just Refl
testEquality SCSnd SCSnd = Just Refl
testEquality SCDuplex SCDuplex = Just Refl
testEquality SCContact SCContact = Just Refl
testEquality _ _ = Nothing
-- | Connection of an unknown type.
-- Used to refer to an arbitrary connection when retrieving from store.
data SomeConn' rq sq = forall d. SomeConn (SConnType d) (Connection' d rq sq)
deriving instance (Show rq, Show sq) => Show (SomeConn' rq sq)
type SomeConn = SomeConn' RcvQueue SndQueue
type SomeConnSub = SomeConn' RcvQueueSub SndQueue
data ConnData = ConnData
{ connId :: ConnId,
userId :: UserId,
connAgentVersion :: VersionSMPA,
enableNtfs :: Bool,
lastExternalSndId :: PrevExternalSndId,
deleted :: Bool,
ratchetSyncState :: RatchetSyncState,
pqSupport :: PQSupport
}
deriving (Eq, Show)
type NoticeId = Int64
-- this function should be mirrored in the clients
ratchetSyncAllowed :: ConnData -> Bool
ratchetSyncAllowed ConnData {ratchetSyncState, connAgentVersion} =
connAgentVersion >= ratchetSyncSMPAgentVersion && (ratchetSyncState `elem` ([RSAllowed, RSRequired] :: [RatchetSyncState]))
-- this function should be mirrored in the clients
ratchetSyncSendProhibited :: ConnData -> Bool
ratchetSyncSendProhibited ConnData {ratchetSyncState} =
ratchetSyncState `elem` ([RSRequired, RSStarted, RSAgreed] :: [RatchetSyncState])
data PendingCommand = PendingCommand
{ cmdId :: AsyncCmdId,
corrId :: ACorrId,
userId :: UserId,
connId :: ConnId,
command :: AgentCommand
}
data AgentCmdType = ACClient | ACInternal
instance StrEncoding AgentCmdType where
strEncode = \case
ACClient -> "CLIENT"
ACInternal -> "INTERNAL"
strP =
A.takeTill (== ' ') >>= \case
"CLIENT" -> pure ACClient
"INTERNAL" -> pure ACInternal
_ -> fail "bad AgentCmdType"
data AgentCommand
= AClientCommand ACommand
| AInternalCommand InternalCommand
instance StrEncoding AgentCommand where
strEncode = \case
AClientCommand cmd -> strEncode (ACClient, Str $ serializeCommand cmd)
AInternalCommand cmd -> strEncode (ACInternal, cmd)
strP =
strP_ >>= \case
ACClient -> AClientCommand <$> dbCommandP
ACInternal -> AInternalCommand <$> strP
data AgentCommandTag
= AClientCommandTag ACommandTag
| AInternalCommandTag InternalCommandTag
deriving (Show)
instance StrEncoding AgentCommandTag where
strEncode = \case
AClientCommandTag t -> strEncode (ACClient, t)
AInternalCommandTag t -> strEncode (ACInternal, t)
strP =
strP_ >>= \case
ACClient -> AClientCommandTag <$> strP
ACInternal -> AInternalCommandTag <$> strP
data InternalCommand
= ICAck SMP.RecipientId MsgId
| ICAckDel SMP.RecipientId MsgId InternalId
| ICAllowSecure SMP.RecipientId (Maybe SMP.SndPublicAuthKey)
| ICDuplexSecure SMP.RecipientId SMP.SndPublicAuthKey
| ICDeleteConn
| ICDeleteRcvQueue SMP.RecipientId
| ICQSecure SMP.RecipientId SMP.SndPublicAuthKey
| ICQDelete SMP.RecipientId
data InternalCommandTag
= ICAck_
| ICAckDel_
| ICAllowSecure_
| ICDuplexSecure_
| ICDeleteConn_
| ICDeleteRcvQueue_
| ICQSecure_
| ICQDelete_
deriving (Show)
instance StrEncoding InternalCommand where
strEncode = \case
ICAck rId srvMsgId -> strEncode (ICAck_, rId, srvMsgId)
ICAckDel rId srvMsgId mId -> strEncode (ICAckDel_, rId, srvMsgId, mId)
ICAllowSecure rId sndKey -> strEncode (ICAllowSecure_, rId, sndKey)
ICDuplexSecure rId sndKey -> strEncode (ICDuplexSecure_, rId, sndKey)
ICDeleteConn -> strEncode ICDeleteConn_
ICDeleteRcvQueue rId -> strEncode (ICDeleteRcvQueue_, rId)
ICQSecure rId senderKey -> strEncode (ICQSecure_, rId, senderKey)
ICQDelete rId -> strEncode (ICQDelete_, rId)
strP =
strP >>= \case
ICAck_ -> ICAck <$> _strP <*> _strP
ICAckDel_ -> ICAckDel <$> _strP <*> _strP <*> _strP
ICAllowSecure_ -> ICAllowSecure <$> _strP <*> _strP
ICDuplexSecure_ -> ICDuplexSecure <$> _strP <*> _strP
ICDeleteConn_ -> pure ICDeleteConn
ICDeleteRcvQueue_ -> ICDeleteRcvQueue <$> _strP
ICQSecure_ -> ICQSecure <$> _strP <*> _strP
ICQDelete_ -> ICQDelete <$> _strP
instance StrEncoding InternalCommandTag where
strEncode = \case
ICAck_ -> "ACK"
ICAckDel_ -> "ACK_DEL"
ICAllowSecure_ -> "ALLOW_SECURE"
ICDuplexSecure_ -> "DUPLEX_SECURE"
ICDeleteConn_ -> "DELETE_CONN"
ICDeleteRcvQueue_ -> "DELETE_RCV_QUEUE"
ICQSecure_ -> "QSECURE"
ICQDelete_ -> "QDELETE"
strP =
A.takeTill (== ' ') >>= \case
"ACK" -> pure ICAck_
"ACK_DEL" -> pure ICAckDel_
"ALLOW_SECURE" -> pure ICAllowSecure_
"DUPLEX_SECURE" -> pure ICDuplexSecure_
"DELETE_CONN" -> pure ICDeleteConn_
"DELETE_RCV_QUEUE" -> pure ICDeleteRcvQueue_
"QSECURE" -> pure ICQSecure_
"QDELETE" -> pure ICQDelete_
_ -> fail "bad InternalCommandTag"
agentCommandTag :: AgentCommand -> AgentCommandTag
agentCommandTag = \case
AClientCommand cmd -> AClientCommandTag $ aCommandTag cmd
AInternalCommand cmd -> AInternalCommandTag $ internalCmdTag cmd
internalCmdTag :: InternalCommand -> InternalCommandTag
internalCmdTag = \case
ICAck {} -> ICAck_
ICAckDel {} -> ICAckDel_
ICAllowSecure {} -> ICAllowSecure_
ICDuplexSecure {} -> ICDuplexSecure_
ICDeleteConn -> ICDeleteConn_
ICDeleteRcvQueue {} -> ICDeleteRcvQueue_
ICQSecure {} -> ICQSecure_
ICQDelete _ -> ICQDelete_
-- * Confirmation types
data NewConfirmation = NewConfirmation
{ connId :: ConnId,
senderConf :: SMPConfirmation,
ratchetState :: RatchetX448
}
data AcceptedConfirmation = AcceptedConfirmation
{ confirmationId :: ConfirmationId,
connId :: ConnId,
senderConf :: SMPConfirmation,
ratchetState :: RatchetX448,
ownConnInfo :: ConnInfo
}
-- * Invitations
data NewInvitation = NewInvitation
{ contactConnId :: ConnId,
connReq :: ConnectionRequestUri 'CMInvitation,
recipientConnInfo :: ConnInfo
}
data Invitation = Invitation
{ invitationId :: InvitationId,
contactConnId_ :: Maybe ConnId,
connReq :: ConnectionRequestUri 'CMInvitation,
recipientConnInfo :: ConnInfo,
ownConnInfo :: Maybe ConnInfo,
accepted :: Bool
}
-- * Message integrity validation types
-- | Corresponds to `last_external_snd_msg_id` in `connections` table
type PrevExternalSndId = Int64
-- | Corresponds to `last_rcv_msg_hash` in `connections` table
type PrevRcvMsgHash = MsgHash
-- | Corresponds to `last_snd_msg_hash` in `connections` table
type PrevSndMsgHash = MsgHash
-- * Message data containers
data RcvMsgData = RcvMsgData
{ msgMeta :: MsgMeta,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
internalRcvId :: InternalRcvId,
internalHash :: MsgHash,
externalPrevSndHash :: MsgHash,
encryptedMsgHash :: MsgHash
}
data RcvMsg = RcvMsg
{ internalId :: InternalId,
msgMeta :: MsgMeta,
msgType :: AgentMessageType,
msgBody :: MsgBody,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt, -- if this message is a delivery receipt
userAck :: Bool
}
data SndMsgData = SndMsgData
{ internalId :: InternalId,
internalSndId :: InternalSndId,
internalTs :: InternalTs,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
pqEncryption :: PQEncryption,
internalHash :: MsgHash,
prevMsgHash :: MsgHash,
sndMsgPrepData_ :: Maybe SndMsgPrepData
}
data SndMsgPrepData = SndMsgPrepData
{ encryptKey :: MsgEncryptKeyX448,
paddedLen :: Int,
sndMsgBodyId :: Int64
}
deriving (Show)
data SndMsg = SndMsg
{ internalId :: InternalId,
internalSndId :: InternalSndId,
msgType :: AgentMessageType,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt
}
data PendingMsgData = PendingMsgData
{ msgId :: InternalId,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
pqEncryption :: PQEncryption,
msgRetryState :: Maybe RI2State,
internalTs :: InternalTs,
internalSndId :: InternalSndId,
prevMsgHash :: PrevSndMsgHash,
pendingMsgPrepData_ :: Maybe PendingMsgPrepData
}
deriving (Show)
data PendingMsgPrepData = PendingMsgPrepData
{ encryptKey :: MsgEncryptKeyX448,
paddedLen :: Int,
sndMsgBody :: AMessage
}
deriving (Show)
-- internal Ids are newtypes to prevent mixing them up
newtype InternalRcvId = InternalRcvId {unRcvId :: Int64} deriving (Eq, Show)
type ExternalSndId = Int64
type ExternalSndTs = UTCTime
type BrokerId = MsgId
type BrokerTs = UTCTime
newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show)
-- | Base message data independent of direction.
data MsgBase = MsgBase
{ connId :: ConnId,
-- | Monotonically increasing id of a message per connection, internal to the agent.
-- Internal Id preserves ordering between both received and sent messages, and is needed
-- to track the order of the conversation (which can be different for the sender / receiver)
-- and address messages in commands. External [sender] Id cannot be used for this purpose
-- due to a possibility of implementation errors in different agents.
internalId :: InternalId,
internalTs :: InternalTs,
msgBody :: MsgBody,
-- | Hash of the message as computed by agent.
internalHash :: MsgHash
}
deriving (Eq, Show)
newtype InternalId = InternalId {unId :: Int64} deriving (Eq, Show)
instance StrEncoding InternalId where
strEncode = strEncode . unId
strP = InternalId <$> strP
type InternalTs = UTCTime
type AsyncCmdId = Int64
-- * Store errors
-- | Agent store error.
data StoreError
= -- | IO exceptions in store actions.
SEInternal ByteString
| -- | Database busy
SEDatabaseBusy ByteString
| -- | Failed to generate unique random ID
SEUniqueID
| -- | User ID not found
SEUserNotFound
| -- | Connection not found (or both queues absent).
SEConnNotFound
| -- | Server not found.
SEServerNotFound
| -- | Connection already used.
SEConnDuplicate
| -- | Confirmed snd queue already exists.
SESndQueueExists
| -- | Wrong connection type, e.g. "send" connection when "receive" or "duplex" is expected, or vice versa.
-- 'upgradeRcvConnToDuplex' and 'upgradeSndConnToDuplex' do not allow duplex connections - they would also return this error.
SEBadConnType String ConnType
| -- | Confirmation not found.
SEConfirmationNotFound
| -- | Invitation not found
SEInvitationNotFound String InvitationId
| -- | Message not found
SEMsgNotFound String
| -- | Command not found
SECmdNotFound
| -- | Currently not used. The intention was to pass current expected queue status in methods,
-- as we always know what it should be at any stage of the protocol,
-- and in case it does not match use this error.
SEBadQueueStatus
| -- | connection does not have associated double-ratchet state
SERatchetNotFound
| -- | connection does not have associated x3dh keys
SEX3dhKeysNotFound
| -- | Used to wrap agent errors inside store operations to avoid race conditions
SEAgentError AgentErrorType
| -- | XFTP Server not found.
SEXFTPServerNotFound
| -- | XFTP File not found.
SEFileNotFound
| -- | XFTP Deleted snd chunk replica not found.
SEDeletedSndChunkReplicaNotFound
| -- | Error when reading work item that suspends worker - do not use!
SEWorkItemError {errContext :: String}
| -- | Servers stats not found.
SEServersStatsNotFound
deriving (Eq, Show, Exception)
instance AnyError StoreError where
fromSomeException e = SEInternal $ case fromException e of
Just (e' :: SQLError) -> bshow e'
Nothing -> bshow e
class (Show e, AnyError e) => AnyStoreError e where
isWorkItemError :: e -> Bool
mkWorkItemError :: String -> e
instance AnyStoreError StoreError where
isWorkItemError = \case
SEWorkItemError {} -> True
_ -> False
mkWorkItemError errContext = SEWorkItemError {errContext}