Files
simplexmq/src/Simplex/Messaging/Agent/Store.hs

609 lines
20 KiB
Haskell

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
module Simplex.Messaging.Agent.Store where
import Control.Exception (Exception)
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.Kind (Type)
import Data.List (find)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (isJust)
import Data.Time (UTCTime)
import Data.Type.Equality
import GHC.Records (HasField)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (RatchetX448)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( MsgBody,
MsgFlags,
MsgId,
NotifierId,
NtfPrivateSignKey,
NtfPublicVerifyKey,
RcvDhSecret,
RcvNtfDhSecret,
RcvPrivateSignKey,
SndPrivateSignKey,
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util ((<$?>))
import Simplex.Messaging.Version
-- * Queue types
-- | A receive queue. SMP queue through which the agent receives messages from a sender.
data RcvQueue = RcvQueue
{ userId :: UserId,
connId :: ConnId,
server :: SMPServer,
-- | recipient queue ID
rcvId :: SMP.RecipientId,
-- | key used by the recipient to sign transmissions
rcvPrivateKey :: RcvPrivateSignKey,
-- | shared DH secret used to encrypt/decrypt message bodies from server to recipient
rcvDhSecret :: RcvDhSecret,
-- | private DH key related to public sent to sender out-of-band (to agree simple per-queue e2e)
e2ePrivKey :: C.PrivateKeyX25519,
-- | public sender's DH key and agreed shared DH secret for simple per-queue e2e
e2eDhSecret :: Maybe C.DhSecretX25519,
-- | sender queue ID
sndId :: SMP.SenderId,
-- | queue status
status :: QueueStatus,
-- | database queue ID (within connection)
dbQueueId :: Int64,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | database queue ID to replace, Nothing if this queue is not replacing another, `Just Nothing` is used for replacing old queues
dbReplaceQueueId :: Maybe Int64,
rcvSwchStatus :: Maybe RcvSwitchStatus,
-- | SMP client version
smpClientVersion :: Version,
-- | credentials used in context of notifications
clientNtfCreds :: Maybe ClientNtfCreds,
deleteErrors :: Int
}
deriving (Eq, Show)
rcvQueueInfo :: RcvQueue -> RcvQueueInfo
rcvQueueInfo rq@RcvQueue {server, rcvSwchStatus} =
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq}
canAbortRcvSwitch :: RcvQueue -> Bool
canAbortRcvSwitch = maybe False canAbort . rcvSwchStatus
where
canAbort = \case
RSSwitchStarted -> True
RSSendingQADD -> True
-- if switch is in RSSendingQUSE, a race condition with sender deleting the original queue is possible
RSSendingQUSE -> False
-- if switch is in RSReceivedMessage status, aborting switch (deleting new queue)
-- will break the connection because the sender would have original queue deleted
RSReceivedMessage -> False
data ClientNtfCreds = ClientNtfCreds
{ -- | key pair to be used by the notification server to sign transmissions
ntfPublicKey :: NtfPublicVerifyKey,
ntfPrivateKey :: NtfPrivateSignKey,
-- | queue ID to be used by the notification server for NSUB command
notifierId :: NotifierId,
-- | shared DH secret used to encrypt/decrypt notification metadata (NMsgMeta) from server to recipient
rcvNtfDhSecret :: RcvNtfDhSecret
}
deriving (Eq, Show)
-- | A send queue. SMP queue through which the agent sends messages to a recipient.
data SndQueue = SndQueue
{ userId :: UserId,
connId :: ConnId,
server :: SMPServer,
-- | sender queue ID
sndId :: SMP.SenderId,
-- | key pair used by the sender to sign transmissions
sndPublicKey :: Maybe C.APublicVerifyKey,
sndPrivateKey :: SndPrivateSignKey,
-- | DH public key used to negotiate per-queue e2e encryption
e2ePubKey :: Maybe C.PublicKeyX25519,
-- | shared DH secret agreed for simple per-queue e2e encryption
e2eDhSecret :: C.DhSecretX25519,
-- | queue status
status :: QueueStatus,
-- | database queue ID (within connection)
dbQueueId :: Int64,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | ID of the queue this one is replacing
dbReplaceQueueId :: Maybe Int64,
sndSwchStatus :: Maybe SndSwitchStatus,
-- | SMP client version
smpClientVersion :: Version
}
deriving (Eq, Show)
sndQueueInfo :: SndQueue -> SndQueueInfo
sndQueueInfo SndQueue {server, sndSwchStatus} =
SndQueueInfo {sndServer = server, sndSwitchStatus = sndSwchStatus}
instance SMPQueue RcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
queueId RcvQueue {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue SndQueue where
qServer SndQueue {server} = server
{-# INLINE qServer #-}
queueId SndQueue {sndId} = sndId
{-# INLINE queueId #-}
findQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe q
findQ = find . sameQueue
{-# INLINE findQ #-}
removeQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe (q, [q])
removeQ = removeQP . sameQueue
{-# INLINE removeQ #-}
removeQP :: (q -> Bool) -> NonEmpty q -> Maybe (q, [q])
removeQP p qs = case L.break p qs of
(_, []) -> Nothing
(qs1, q : qs2) -> Just (q, qs1 <> qs2)
sndAddress :: RcvQueue -> (SMPServer, SMP.SenderId)
sndAddress RcvQueue {server, sndId} = (server, sndId)
{-# INLINE sndAddress #-}
findRQ :: (SMPServer, SMP.SenderId) -> NonEmpty RcvQueue -> Maybe RcvQueue
findRQ sAddr = find $ sameQAddress sAddr . sndAddress
{-# INLINE findRQ #-}
switchingRQ :: NonEmpty RcvQueue -> Maybe RcvQueue
switchingRQ = find $ isJust . rcvSwchStatus
{-# INLINE switchingRQ #-}
updatedQs :: SMPQueueRec q => q -> NonEmpty q -> NonEmpty q
updatedQs q = L.map $ \q' -> if q.dbQueueId == q'.dbQueueId then q else q'
{-# INLINE updatedQs #-}
type SMPQueueRec q =
( SMPQueue q,
HasField "userId" q UserId,
HasField "connId" q ConnId,
HasField "dbQueueId" q Int64,
HasField "dbReplaceQueueId" q (Maybe Int64)
)
-- * Connection types
-- | Type of a connection.
data ConnType = CNew | CRcv | CSnd | CDuplex | CContact deriving (Eq, Show)
-- | Connection of a specific type.
--
-- - RcvConnection is a connection that only has a receive queue set up,
-- typically created by a recipient initiating a duplex connection.
--
-- - SndConnection is a connection that only has a send queue set up, typically
-- created by a sender joining a duplex connection through a recipient's invitation.
--
-- - DuplexConnection is a connection that has both receive and send queues set up,
-- typically created by upgrading a receive or a send connection with a missing queue.
data Connection (d :: ConnType) where
NewConnection :: ConnData -> Connection CNew
RcvConnection :: ConnData -> RcvQueue -> Connection CRcv
SndConnection :: ConnData -> SndQueue -> Connection CSnd
DuplexConnection :: ConnData -> NonEmpty RcvQueue -> NonEmpty SndQueue -> Connection CDuplex
ContactConnection :: ConnData -> RcvQueue -> Connection CContact
deriving instance Eq (Connection d)
deriving instance Show (Connection d)
toConnData :: Connection d -> ConnData
toConnData = \case
NewConnection cData -> cData
RcvConnection cData _ -> cData
SndConnection cData _ -> cData
DuplexConnection cData _ _ -> cData
ContactConnection cData _ -> cData
updateConnection :: ConnData -> Connection d -> Connection d
updateConnection cData = \case
NewConnection _ -> NewConnection cData
RcvConnection _ rq -> RcvConnection cData rq
SndConnection _ sq -> SndConnection cData sq
DuplexConnection _ rqs sqs -> DuplexConnection cData rqs sqs
ContactConnection _ rq -> ContactConnection cData rq
data SConnType :: ConnType -> Type where
SCNew :: SConnType CNew
SCRcv :: SConnType CRcv
SCSnd :: SConnType CSnd
SCDuplex :: SConnType CDuplex
SCContact :: SConnType CContact
connType :: SConnType c -> ConnType
connType SCNew = CNew
connType SCRcv = CRcv
connType SCSnd = CSnd
connType SCDuplex = CDuplex
connType SCContact = CContact
deriving instance Eq (SConnType d)
deriving instance Show (SConnType d)
instance TestEquality SConnType where
testEquality SCRcv SCRcv = Just Refl
testEquality SCSnd SCSnd = Just Refl
testEquality SCDuplex SCDuplex = Just Refl
testEquality SCContact SCContact = Just Refl
testEquality _ _ = Nothing
-- | Connection of an unknown type.
-- Used to refer to an arbitrary connection when retrieving from store.
data SomeConn = forall d. SomeConn (SConnType d) (Connection d)
instance Eq SomeConn where
SomeConn d c == SomeConn d' c' = case testEquality d d' of
Just Refl -> c == c'
_ -> False
deriving instance Show SomeConn
data ConnData = ConnData
{ connId :: ConnId,
userId :: UserId,
connAgentVersion :: Version,
enableNtfs :: Bool,
duplexHandshake :: Maybe Bool, -- added in agent protocol v2
lastExternalSndId :: PrevExternalSndId,
deleted :: Bool,
ratchetSyncState :: RatchetSyncState
}
deriving (Eq, Show)
-- this function should be mirrored in the clients
ratchetSyncAllowed :: ConnData -> Bool
ratchetSyncAllowed cData@ConnData {ratchetSyncState} =
ratchetSyncSupported' cData && (ratchetSyncState `elem` ([RSAllowed, RSRequired] :: [RatchetSyncState]))
ratchetSyncSupported' :: ConnData -> Bool
ratchetSyncSupported' ConnData {connAgentVersion} = connAgentVersion >= 3
messageRcptsSupported :: ConnData -> Bool
messageRcptsSupported ConnData {connAgentVersion} = connAgentVersion >= 4
-- this function should be mirrored in the clients
ratchetSyncSendProhibited :: ConnData -> Bool
ratchetSyncSendProhibited ConnData {ratchetSyncState} =
ratchetSyncState `elem` ([RSRequired, RSStarted, RSAgreed] :: [RatchetSyncState])
data PendingCommand = PendingCommand
{ corrId :: ACorrId,
userId :: UserId,
connId :: ConnId,
command :: AgentCommand
}
data AgentCmdType = ACClient | ACInternal
instance StrEncoding AgentCmdType where
strEncode = \case
ACClient -> "CLIENT"
ACInternal -> "INTERNAL"
strP =
A.takeTill (== ' ') >>= \case
"CLIENT" -> pure ACClient
"INTERNAL" -> pure ACInternal
_ -> fail "bad AgentCmdType"
data AgentCommand
= AClientCommand (APartyCmd 'Client)
| AInternalCommand InternalCommand
instance StrEncoding AgentCommand where
strEncode = \case
AClientCommand (APC _ cmd) -> strEncode (ACClient, Str $ serializeCommand cmd)
AInternalCommand cmd -> strEncode (ACInternal, cmd)
strP =
strP_ >>= \case
ACClient -> AClientCommand <$> ((\(ACmd _ e cmd) -> checkParty $ APC e cmd) <$?> dbCommandP)
ACInternal -> AInternalCommand <$> strP
data AgentCommandTag
= AClientCommandTag (APartyCmdTag 'Client)
| AInternalCommandTag InternalCommandTag
deriving (Show)
instance StrEncoding AgentCommandTag where
strEncode = \case
AClientCommandTag t -> strEncode (ACClient, t)
AInternalCommandTag t -> strEncode (ACInternal, t)
strP =
strP_ >>= \case
ACClient -> AClientCommandTag <$> strP
ACInternal -> AInternalCommandTag <$> strP
data InternalCommand
= ICAck SMP.RecipientId MsgId
| ICAckDel SMP.RecipientId MsgId InternalId
| ICAllowSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICDuplexSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICDeleteConn
| ICDeleteRcvQueue SMP.RecipientId
| ICQSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICQDelete SMP.RecipientId
data InternalCommandTag
= ICAck_
| ICAckDel_
| ICAllowSecure_
| ICDuplexSecure_
| ICDeleteConn_
| ICDeleteRcvQueue_
| ICQSecure_
| ICQDelete_
deriving (Show)
instance StrEncoding InternalCommand where
strEncode = \case
ICAck rId srvMsgId -> strEncode (ICAck_, rId, srvMsgId)
ICAckDel rId srvMsgId mId -> strEncode (ICAckDel_, rId, srvMsgId, mId)
ICAllowSecure rId sndKey -> strEncode (ICAllowSecure_, rId, sndKey)
ICDuplexSecure rId sndKey -> strEncode (ICDuplexSecure_, rId, sndKey)
ICDeleteConn -> strEncode ICDeleteConn_
ICDeleteRcvQueue rId -> strEncode (ICDeleteRcvQueue_, rId)
ICQSecure rId senderKey -> strEncode (ICQSecure_, rId, senderKey)
ICQDelete rId -> strEncode (ICQDelete_, rId)
strP =
strP >>= \case
ICAck_ -> ICAck <$> _strP <*> _strP
ICAckDel_ -> ICAckDel <$> _strP <*> _strP <*> _strP
ICAllowSecure_ -> ICAllowSecure <$> _strP <*> _strP
ICDuplexSecure_ -> ICDuplexSecure <$> _strP <*> _strP
ICDeleteConn_ -> pure ICDeleteConn
ICDeleteRcvQueue_ -> ICDeleteRcvQueue <$> _strP
ICQSecure_ -> ICQSecure <$> _strP <*> _strP
ICQDelete_ -> ICQDelete <$> _strP
instance StrEncoding InternalCommandTag where
strEncode = \case
ICAck_ -> "ACK"
ICAckDel_ -> "ACK_DEL"
ICAllowSecure_ -> "ALLOW_SECURE"
ICDuplexSecure_ -> "DUPLEX_SECURE"
ICDeleteConn_ -> "DELETE_CONN"
ICDeleteRcvQueue_ -> "DELETE_RCV_QUEUE"
ICQSecure_ -> "QSECURE"
ICQDelete_ -> "QDELETE"
strP =
A.takeTill (== ' ') >>= \case
"ACK" -> pure ICAck_
"ACK_DEL" -> pure ICAckDel_
"ALLOW_SECURE" -> pure ICAllowSecure_
"DUPLEX_SECURE" -> pure ICDuplexSecure_
"DELETE_CONN" -> pure ICDeleteConn_
"DELETE_RCV_QUEUE" -> pure ICDeleteRcvQueue_
"QSECURE" -> pure ICQSecure_
"QDELETE" -> pure ICQDelete_
_ -> fail "bad InternalCommandTag"
agentCommandTag :: AgentCommand -> AgentCommandTag
agentCommandTag = \case
AClientCommand cmd -> AClientCommandTag $ aPartyCmdTag cmd
AInternalCommand cmd -> AInternalCommandTag $ internalCmdTag cmd
internalCmdTag :: InternalCommand -> InternalCommandTag
internalCmdTag = \case
ICAck {} -> ICAck_
ICAckDel {} -> ICAckDel_
ICAllowSecure {} -> ICAllowSecure_
ICDuplexSecure {} -> ICDuplexSecure_
ICDeleteConn -> ICDeleteConn_
ICDeleteRcvQueue {} -> ICDeleteRcvQueue_
ICQSecure {} -> ICQSecure_
ICQDelete _ -> ICQDelete_
-- * Confirmation types
data NewConfirmation = NewConfirmation
{ connId :: ConnId,
senderConf :: SMPConfirmation,
ratchetState :: RatchetX448
}
data AcceptedConfirmation = AcceptedConfirmation
{ confirmationId :: ConfirmationId,
connId :: ConnId,
senderConf :: SMPConfirmation,
ratchetState :: RatchetX448,
ownConnInfo :: ConnInfo
}
-- * Invitations
data NewInvitation = NewInvitation
{ contactConnId :: ConnId,
connReq :: ConnectionRequestUri 'CMInvitation,
recipientConnInfo :: ConnInfo
}
data Invitation = Invitation
{ invitationId :: InvitationId,
contactConnId :: ConnId,
connReq :: ConnectionRequestUri 'CMInvitation,
recipientConnInfo :: ConnInfo,
ownConnInfo :: Maybe ConnInfo,
accepted :: Bool
}
-- * Message integrity validation types
-- | Corresponds to `last_external_snd_msg_id` in `connections` table
type PrevExternalSndId = Int64
-- | Corresponds to `last_rcv_msg_hash` in `connections` table
type PrevRcvMsgHash = MsgHash
-- | Corresponds to `last_snd_msg_hash` in `connections` table
type PrevSndMsgHash = MsgHash
-- * Message data containers
data RcvMsgData = RcvMsgData
{ msgMeta :: MsgMeta,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
internalRcvId :: InternalRcvId,
internalHash :: MsgHash,
externalPrevSndHash :: MsgHash,
encryptedMsgHash :: MsgHash
}
data RcvMsg = RcvMsg
{ internalId :: InternalId,
msgMeta :: MsgMeta,
msgType :: AgentMessageType,
msgBody :: MsgBody,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt, -- if this message is a delivery receipt
userAck :: Bool
}
data SndMsgData = SndMsgData
{ internalId :: InternalId,
internalSndId :: InternalSndId,
internalTs :: InternalTs,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
internalHash :: MsgHash,
prevMsgHash :: MsgHash
}
data SndMsg = SndMsg
{ internalId :: InternalId,
internalSndId :: InternalSndId,
msgType :: AgentMessageType,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt
}
data PendingMsgData = PendingMsgData
{ msgId :: InternalId,
msgType :: AgentMessageType,
msgFlags :: MsgFlags,
msgBody :: MsgBody,
msgRetryState :: Maybe RI2State,
internalTs :: InternalTs
}
deriving (Show)
-- internal Ids are newtypes to prevent mixing them up
newtype InternalRcvId = InternalRcvId {unRcvId :: Int64} deriving (Eq, Show)
type ExternalSndId = Int64
type ExternalSndTs = UTCTime
type BrokerId = MsgId
type BrokerTs = UTCTime
newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show)
-- | Base message data independent of direction.
data MsgBase = MsgBase
{ connId :: ConnId,
-- | Monotonically increasing id of a message per connection, internal to the agent.
-- Internal Id preserves ordering between both received and sent messages, and is needed
-- to track the order of the conversation (which can be different for the sender / receiver)
-- and address messages in commands. External [sender] Id cannot be used for this purpose
-- due to a possibility of implementation errors in different agents.
internalId :: InternalId,
internalTs :: InternalTs,
msgBody :: MsgBody,
-- | Hash of the message as computed by agent.
internalHash :: MsgHash
}
deriving (Eq, Show)
newtype InternalId = InternalId {unId :: Int64} deriving (Eq, Show)
instance StrEncoding InternalId where
strEncode = strEncode . unId
strP = InternalId <$> strP
type InternalTs = UTCTime
type AsyncCmdId = Int64
-- * Store errors
-- | Agent store error.
data StoreError
= -- | IO exceptions in store actions.
SEInternal ByteString
| -- | Failed to generate unique random ID
SEUniqueID
| -- | User ID not found
SEUserNotFound
| -- | Connection not found (or both queues absent).
SEConnNotFound
| -- | Server not found.
SEServerNotFound
| -- | Connection already used.
SEConnDuplicate
| -- | Confirmed snd queue already exists.
SESndQueueExists
| -- | Wrong connection type, e.g. "send" connection when "receive" or "duplex" is expected, or vice versa.
-- 'upgradeRcvConnToDuplex' and 'upgradeSndConnToDuplex' do not allow duplex connections - they would also return this error.
SEBadConnType ConnType
| -- | Confirmation not found.
SEConfirmationNotFound
| -- | Invitation not found
SEInvitationNotFound
| -- | Message not found
SEMsgNotFound
| -- | Command not found
SECmdNotFound
| -- | Currently not used. The intention was to pass current expected queue status in methods,
-- as we always know what it should be at any stage of the protocol,
-- and in case it does not match use this error.
SEBadQueueStatus
| -- | connection does not have associated double-ratchet state
SERatchetNotFound
| -- | connection does not have associated x3dh keys
SEX3dhKeysNotFound
| -- | Used to wrap agent errors inside store operations to avoid race conditions
SEAgentError AgentErrorType
| -- | XFTP Server not found.
SEXFTPServerNotFound
| -- | XFTP File not found.
SEFileNotFound
| -- | XFTP Deleted snd chunk replica not found.
SEDeletedSndChunkReplicaNotFound
deriving (Eq, Show, Exception)