mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 18:35:59 +00:00
agent sqlite: rework schema, make methods transactional, enable foreign keys (#34)
* agent sqlite: rework schema * agent sqlite: explicitly mark primary keys as NOT NULL * agent sqlite: adjust connections and queues fkeys * agent sqlite: remove ack_mode from queues tables * [WIP] agent sqlite: refactor methods * agent sqlite: implement transactional createRcvConn * add comment * agent sqlite: remove ConnAlias from createRcvConn signature * agent sqlite: implement transactional createSndConn * agent sqlite: remove monadic stack from store util methods * agent sqlite: refactor getConn * agent sqlite: rename conn -> dbConn * agent sqlite: move transactional logic to utils * agent sqlite: remove addServer from store interface * fix comment * agent sqlite: refactor getRcvQueue * agent sqlite: refactor deleteConn * agent sqlite: remove old deleteConn * agent sqlite: enable FKs * agent sqlite: refactor methods upgrading connection to duplex * agent sqlite: uncomment not implemented methods * agent sqlite: rename methods upgrading connection to duplex * use liftEither * agent sqlite: refactor update queue status methods * agent sqlite: refactor createMsg * clean up * fix compilation errors in src * fix existing tests * clean up tests * agent sqlite: test that foreign keys are enabled * change private members naming * tests: expect specific error code * clean up * agent sqlite: consistently separate lifts from logic to their own lines
This commit is contained in:
@@ -15,31 +15,31 @@ module Simplex.Messaging.Agent
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Logger.Simple (logInfo, showText)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Crypto.Random (MonadRandom)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding
|
||||
import Data.Text.Encoding (decodeUtf8)
|
||||
import Data.Time.Clock
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Util (SQLiteStore)
|
||||
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore)
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Client (SMPServerTransmission)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server (randomBytes)
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport (putLn, runTCPServer)
|
||||
import Simplex.Messaging.Types (CorrId (..), MsgBody, PrivateKey, SenderKey)
|
||||
import UnliftIO.Async
|
||||
import System.IO (Handle)
|
||||
import UnliftIO.Async (race_)
|
||||
import UnliftIO.Exception (SomeException)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
runSMPAgent :: (MonadRandom m, MonadUnliftIO m) => AgentConfig -> m ()
|
||||
@@ -127,15 +127,15 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(rq, qInfo) <- newReceiveQueue c server connAlias
|
||||
withStore $ \st -> createRcvConn st connAlias rq
|
||||
withStore $ \st -> createRcvConn st rq
|
||||
respond $ INV qInfo
|
||||
|
||||
joinConnection :: SMPQueueInfo -> ReplyMode -> m ()
|
||||
joinConnection qInfo@(SMPQueueInfo srv _ _) replyMode = do
|
||||
-- TODO create connection alias if not passed
|
||||
-- make connAlias Maybe?
|
||||
(sq, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> createSndConn st connAlias sq
|
||||
(sq, senderKey) <- newSendQueue qInfo connAlias
|
||||
withStore $ \st -> createSndConn st sq
|
||||
connectToSendQueue c sq senderKey
|
||||
case replyMode of
|
||||
ReplyOn -> sendReplyQInfo srv sq
|
||||
@@ -192,7 +192,7 @@ processCommand c@AgentClient {sndQ} (corrId, connAlias, cmd) =
|
||||
sendReplyQInfo :: SMPServer -> SendQueue -> m ()
|
||||
sendReplyQInfo srv sq = do
|
||||
(rq, qInfo) <- newReceiveQueue c srv connAlias
|
||||
withStore $ \st -> addRcvQueue st connAlias rq
|
||||
withStore $ \st -> upgradeSndConnToDuplex st connAlias rq
|
||||
sendAgentMessage c sq $ REPLY qInfo
|
||||
|
||||
respond :: ACommand 'Agent -> m ()
|
||||
@@ -208,7 +208,7 @@ subscriber c@AgentClient {msgQ} = forever $ do
|
||||
|
||||
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SMPServerTransmission -> m ()
|
||||
processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
(connAlias, rq@ReceiveQueue {decryptKey, status}) <- withStore $ \st -> getReceiveQueue st srv rId
|
||||
rq@ReceiveQueue {connAlias, decryptKey, status} <- withStore $ \st -> getRcvQueue st srv rId
|
||||
case cmd of
|
||||
SMP.MSG srvMsgId srvTs msgBody -> do
|
||||
-- TODO deduplicate with previously received
|
||||
@@ -222,10 +222,10 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
-- Commands CONF and LET are not implemented yet
|
||||
-- They are probably not needed in v0.2?
|
||||
-- TODO notification that connection confirmed?
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Confirmed
|
||||
withStore $ \st -> setRcvQueueStatus st rq Confirmed
|
||||
-- TODO update sender key in the store
|
||||
secureQueue c rq senderKey
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Secured
|
||||
withStore $ \st -> setRcvQueueStatus st rq Secured
|
||||
sendAck c rq
|
||||
s ->
|
||||
-- TODO maybe send notification to the user
|
||||
@@ -235,13 +235,13 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
HELLO _verifyKey _ -> do
|
||||
logServer "<--" c srv rId "MSG <HELLO>"
|
||||
-- TODO send status update to the user?
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Active
|
||||
withStore $ \st -> setRcvQueueStatus st rq Active
|
||||
sendAck c rq
|
||||
REPLY qInfo -> do
|
||||
logServer "<--" c srv rId "MSG <REPLY>"
|
||||
-- TODO move senderKey inside SendQueue
|
||||
(sq, senderKey) <- newSendQueue qInfo
|
||||
withStore $ \st -> addSndQueue st connAlias sq
|
||||
(sq, senderKey) <- newSendQueue qInfo connAlias
|
||||
withStore $ \st -> upgradeRcvConnToDuplex st connAlias sq
|
||||
connectToSendQueue c sq senderKey
|
||||
notify connAlias CON
|
||||
sendAck c rq
|
||||
@@ -271,16 +271,16 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
connectToSendQueue :: AgentMonad m => AgentClient -> SendQueue -> SenderKey -> m ()
|
||||
connectToSendQueue c sq senderKey = do
|
||||
sendConfirmation c sq senderKey
|
||||
withStore $ \st -> updateSndQueueStatus st sq Confirmed
|
||||
withStore $ \st -> setSndQueueStatus st sq Confirmed
|
||||
sendHello c sq
|
||||
withStore $ \st -> updateSndQueueStatus st sq Active
|
||||
withStore $ \st -> setSndQueueStatus st sq Active
|
||||
|
||||
decryptMessage :: MonadUnliftIO m => PrivateKey -> ByteString -> m ByteString
|
||||
decryptMessage _decryptKey = return
|
||||
|
||||
newSendQueue ::
|
||||
(MonadUnliftIO m, MonadReader Env m) => SMPQueueInfo -> m (SendQueue, SenderKey)
|
||||
newSendQueue (SMPQueueInfo smpServer senderId encryptKey) = do
|
||||
(MonadUnliftIO m, MonadReader Env m) => SMPQueueInfo -> ConnAlias -> m (SendQueue, SenderKey)
|
||||
newSendQueue (SMPQueueInfo smpServer senderId encryptKey) connAlias = do
|
||||
g <- asks idsDrg
|
||||
senderKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
verifyKey <- atomically $ randomBytes 16 g -- TODO replace with cryptographic key pair
|
||||
@@ -290,11 +290,10 @@ newSendQueue (SMPQueueInfo smpServer senderId encryptKey) = do
|
||||
SendQueue
|
||||
{ server = smpServer,
|
||||
sndId = senderId,
|
||||
connAlias,
|
||||
sndPrivateKey,
|
||||
encryptKey,
|
||||
signKey,
|
||||
-- verifyKey,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
return (sndQueue, senderKey)
|
||||
|
||||
@@ -161,13 +161,13 @@ newReceiveQueue c srv connAlias = do
|
||||
ReceiveQueue
|
||||
{ server = srv,
|
||||
rcvId,
|
||||
connAlias,
|
||||
rcvPrivateKey,
|
||||
sndId = Just sId,
|
||||
sndKey = Nothing,
|
||||
decryptKey,
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
addSubscription c rq connAlias
|
||||
return (rq, SMPQueueInfo srv sId encryptKey)
|
||||
|
||||
@@ -10,7 +10,6 @@ import Crypto.Random
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Util (SQLiteStore)
|
||||
import Simplex.Messaging.Client
|
||||
import UnliftIO.STM
|
||||
|
||||
|
||||
@@ -1,45 +1,52 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store where
|
||||
module Simplex.Messaging.Agent.Store
|
||||
( ReceiveQueue (..),
|
||||
SendQueue (..),
|
||||
Connection (..),
|
||||
SConnType (..),
|
||||
SomeConn (..),
|
||||
MessageDelivery (..),
|
||||
DeliveryStatus (..),
|
||||
MonadAgentStore (..),
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind
|
||||
import Data.Kind (Type)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Store.Types (ConnType (..))
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Types
|
||||
import Simplex.Messaging.Types (PrivateKey, PublicKey)
|
||||
|
||||
data ReceiveQueue = ReceiveQueue
|
||||
{ server :: SMPServer,
|
||||
rcvId :: SMP.RecipientId,
|
||||
connAlias :: ConnAlias,
|
||||
rcvPrivateKey :: PrivateKey,
|
||||
sndId :: Maybe SMP.SenderId,
|
||||
sndKey :: Maybe PublicKey,
|
||||
decryptKey :: PrivateKey,
|
||||
verifyKey :: Maybe PublicKey,
|
||||
status :: QueueStatus,
|
||||
ackMode :: AckMode -- whether acknowledgement will be sent (via SendQueue if present)
|
||||
status :: QueueStatus
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
data SendQueue = SendQueue
|
||||
{ server :: SMPServer,
|
||||
sndId :: SMP.SenderId,
|
||||
connAlias :: ConnAlias,
|
||||
sndPrivateKey :: PrivateKey,
|
||||
encryptKey :: PublicKey,
|
||||
signKey :: PrivateKey,
|
||||
-- verifyKey :: Maybe PublicKey,
|
||||
status :: QueueStatus,
|
||||
ackMode :: AckMode -- whether acknowledgement is expected (via ReceiveQueue if present)
|
||||
status :: QueueStatus
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -91,21 +98,17 @@ data DeliveryStatus
|
||||
| MDConfirmed -- SMP: OK received / ACK sent
|
||||
| MDAcknowledged AckStatus -- SAMP: RCVD sent to agent client / ACK received from agent client and sent to the server
|
||||
|
||||
type SMPServerId = Int64
|
||||
|
||||
-- TODO rework types - decouple Transmission types from Store? Convert on the agent instead?
|
||||
class Monad m => MonadAgentStore s m where
|
||||
addServer :: s -> SMPServer -> m SMPServerId
|
||||
createRcvConn :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
createSndConn :: s -> ConnAlias -> SendQueue -> m ()
|
||||
createRcvConn :: s -> ReceiveQueue -> m ()
|
||||
createSndConn :: s -> SendQueue -> m ()
|
||||
getConn :: s -> ConnAlias -> m SomeConn
|
||||
getReceiveQueue :: s -> SMPServer -> SMP.RecipientId -> m (ConnAlias, ReceiveQueue)
|
||||
getRcvQueue :: s -> SMPServer -> SMP.RecipientId -> m ReceiveQueue
|
||||
deleteConn :: s -> ConnAlias -> m ()
|
||||
addSndQueue :: s -> ConnAlias -> SendQueue -> m ()
|
||||
addRcvQueue :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
upgradeRcvConnToDuplex :: s -> ConnAlias -> SendQueue -> m ()
|
||||
upgradeSndConnToDuplex :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
removeSndAuth :: s -> ConnAlias -> m ()
|
||||
updateRcvQueueStatus :: s -> ReceiveQueue -> QueueStatus -> m ()
|
||||
updateSndQueueStatus :: s -> SendQueue -> QueueStatus -> m ()
|
||||
setRcvQueueStatus :: s -> ReceiveQueue -> QueueStatus -> m ()
|
||||
setSndQueueStatus :: s -> SendQueue -> QueueStatus -> m ()
|
||||
createMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
getLastMsg :: s -> ConnAlias -> QueueDirection -> m MessageDelivery
|
||||
getMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> m MessageDelivery
|
||||
|
||||
@@ -1,157 +1,106 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite where
|
||||
module Simplex.Messaging.Agent.Store.SQLite
|
||||
( SQLiteStore (..),
|
||||
newSQLiteStore,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Data.Maybe
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Schema
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Schema (createSchema)
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Util
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import UnliftIO.STM
|
||||
|
||||
data SQLiteStore = SQLiteStore
|
||||
{ dbFilename :: String,
|
||||
dbConn :: DB.Connection
|
||||
}
|
||||
|
||||
newSQLiteStore :: MonadUnliftIO m => String -> m SQLiteStore
|
||||
newSQLiteStore dbFilename = do
|
||||
conn <- liftIO $ DB.open dbFilename
|
||||
liftIO $ createSchema conn
|
||||
serversLock <- newTMVarIO ()
|
||||
rcvQueuesLock <- newTMVarIO ()
|
||||
sndQueuesLock <- newTMVarIO ()
|
||||
connectionsLock <- newTMVarIO ()
|
||||
messagesLock <- newTMVarIO ()
|
||||
return
|
||||
SQLiteStore
|
||||
{ dbFilename,
|
||||
conn,
|
||||
serversLock,
|
||||
rcvQueuesLock,
|
||||
sndQueuesLock,
|
||||
connectionsLock,
|
||||
messagesLock
|
||||
}
|
||||
dbConn <- liftIO $ DB.open dbFilename
|
||||
liftIO $ createSchema dbConn
|
||||
return SQLiteStore {dbFilename, dbConn}
|
||||
|
||||
instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteStore m where
|
||||
addServer store smpServer = upsertServer store smpServer
|
||||
createRcvConn :: SQLiteStore -> ReceiveQueue -> m ()
|
||||
createRcvConn SQLiteStore {dbConn} rcvQueue =
|
||||
liftIO $
|
||||
createRcvQueueAndConn dbConn rcvQueue
|
||||
|
||||
createRcvConn :: SQLiteStore -> ConnAlias -> ReceiveQueue -> m ()
|
||||
createRcvConn st connAlias rcvQueue = do
|
||||
-- TODO test for duplicate connAlias
|
||||
srvId <- upsertServer st (server (rcvQueue :: ReceiveQueue))
|
||||
rcvQId <- insertRcvQueue st srvId rcvQueue
|
||||
insertRcvConnection st connAlias rcvQId
|
||||
createSndConn :: SQLiteStore -> SendQueue -> m ()
|
||||
createSndConn SQLiteStore {dbConn} sndQueue =
|
||||
liftIO $
|
||||
createSndQueueAndConn dbConn sndQueue
|
||||
|
||||
createSndConn :: SQLiteStore -> ConnAlias -> SendQueue -> m ()
|
||||
createSndConn st connAlias sndQueue = do
|
||||
-- TODO test for duplicate connAlias
|
||||
srvId <- upsertServer st (server (sndQueue :: SendQueue))
|
||||
sndQ <- insertSndQueue st srvId sndQueue
|
||||
insertSndConnection st connAlias sndQ
|
||||
|
||||
-- TODO refactor ito a single query with join, and parse as `Only connAlias :. rcvQueue :. sndQueue`
|
||||
getConn :: SQLiteStore -> ConnAlias -> m SomeConn
|
||||
getConn st connAlias =
|
||||
getConnection st connAlias >>= \case
|
||||
(Just rcvQId, Just sndQId) -> do
|
||||
rcvQ <- getRcvQueue st rcvQId
|
||||
sndQ <- getSndQueue st sndQId
|
||||
return $ SomeConn SCDuplex (DuplexConnection connAlias rcvQ sndQ)
|
||||
(Just rcvQId, _) -> do
|
||||
rcvQ <- getRcvQueue st rcvQId
|
||||
return $ SomeConn SCReceive (ReceiveConnection connAlias rcvQ)
|
||||
(_, Just sndQId) -> do
|
||||
sndQ <- getSndQueue st sndQId
|
||||
return $ SomeConn SCSend (SendConnection connAlias sndQ)
|
||||
getConn SQLiteStore {dbConn} connAlias = do
|
||||
queues <-
|
||||
liftIO $
|
||||
retrieveConnQueues dbConn connAlias
|
||||
case queues of
|
||||
(Just rcvQ, Just sndQ) -> return $ SomeConn SCDuplex (DuplexConnection connAlias rcvQ sndQ)
|
||||
(Just rcvQ, Nothing) -> return $ SomeConn SCReceive (ReceiveConnection connAlias rcvQ)
|
||||
(Nothing, Just sndQ) -> return $ SomeConn SCSend (SendConnection connAlias sndQ)
|
||||
_ -> throwError SEBadConn
|
||||
|
||||
getReceiveQueue :: SQLiteStore -> SMPServer -> SMP.RecipientId -> m (ConnAlias, ReceiveQueue)
|
||||
getReceiveQueue st SMPServer {host, port} recipientId = do
|
||||
rcvQueue <- getRcvQueueByRecipientId st recipientId host port
|
||||
connAlias <- getConnAliasByRcvQueue st recipientId
|
||||
return (connAlias, rcvQueue)
|
||||
getRcvQueue :: SQLiteStore -> SMPServer -> SMP.RecipientId -> m ReceiveQueue
|
||||
getRcvQueue SQLiteStore {dbConn} SMPServer {host, port} rcvId = do
|
||||
rcvQueue <-
|
||||
liftIO $
|
||||
retrieveRcvQueue dbConn host port rcvId
|
||||
case rcvQueue of
|
||||
Just rcvQ -> return rcvQ
|
||||
_ -> throwError SENotFound
|
||||
|
||||
-- TODO make transactional
|
||||
addSndQueue :: SQLiteStore -> ConnAlias -> SendQueue -> m ()
|
||||
addSndQueue st connAlias sndQueue =
|
||||
getConn st connAlias
|
||||
>>= \case
|
||||
SomeConn SCDuplex _ -> throwError (SEBadConnType CDuplex)
|
||||
SomeConn SCSend _ -> throwError (SEBadConnType CSend)
|
||||
SomeConn SCReceive _ -> do
|
||||
srvId <- upsertServer st (server (sndQueue :: SendQueue))
|
||||
sndQ <- insertSndQueue st srvId sndQueue
|
||||
updateRcvConnectionWithSndQueue st connAlias sndQ
|
||||
|
||||
-- TODO make transactional
|
||||
addRcvQueue :: SQLiteStore -> ConnAlias -> ReceiveQueue -> m ()
|
||||
addRcvQueue st connAlias rcvQueue =
|
||||
getConn st connAlias
|
||||
>>= \case
|
||||
SomeConn SCDuplex _ -> throwError (SEBadConnType CDuplex)
|
||||
SomeConn SCReceive _ -> throwError (SEBadConnType CReceive)
|
||||
SomeConn SCSend _ -> do
|
||||
srvId <- upsertServer st (server (rcvQueue :: ReceiveQueue))
|
||||
rcvQ <- insertRcvQueue st srvId rcvQueue
|
||||
updateSndConnectionWithRcvQueue st connAlias rcvQ
|
||||
|
||||
-- TODO think about design of one-to-one relationships between connections ans send/receive queues
|
||||
-- - Make wide `connections` table? -> Leads to inability to constrain queue fields on SQL level
|
||||
-- - Make bi-directional foreign keys deferred on queue side?
|
||||
-- * Involves populating foreign keys on queues' tables and reworking store
|
||||
-- * Enables cascade deletes
|
||||
-- ? See https://sqlite.org/foreignkeys.html#fk_deferred
|
||||
-- - Keep as is and just wrap in transaction?
|
||||
deleteConn :: SQLiteStore -> ConnAlias -> m ()
|
||||
deleteConn st connAlias = do
|
||||
(rcvQId, sndQId) <- getConnection st connAlias
|
||||
forM_ rcvQId $ deleteRcvQueue st
|
||||
forM_ sndQId $ deleteSndQueue st
|
||||
deleteConnection st connAlias
|
||||
when (isNothing rcvQId && isNothing sndQId) $ throwError SEBadConn
|
||||
deleteConn SQLiteStore {dbConn} connAlias =
|
||||
liftIO $
|
||||
deleteConnCascade dbConn connAlias
|
||||
|
||||
upgradeRcvConnToDuplex :: SQLiteStore -> ConnAlias -> SendQueue -> m ()
|
||||
upgradeRcvConnToDuplex SQLiteStore {dbConn} connAlias sndQueue =
|
||||
liftIO
|
||||
(updateRcvConnWithSndQueue dbConn connAlias sndQueue)
|
||||
>>= liftEither
|
||||
|
||||
upgradeSndConnToDuplex :: SQLiteStore -> ConnAlias -> ReceiveQueue -> m ()
|
||||
upgradeSndConnToDuplex SQLiteStore {dbConn} connAlias rcvQueue =
|
||||
liftIO
|
||||
(updateSndConnWithRcvQueue dbConn connAlias rcvQueue)
|
||||
>>= liftEither
|
||||
|
||||
removeSndAuth :: SQLiteStore -> ConnAlias -> m ()
|
||||
removeSndAuth _st _connAlias = throwError SENotImplemented
|
||||
|
||||
-- TODO throw error if queue doesn't exist
|
||||
updateRcvQueueStatus :: SQLiteStore -> ReceiveQueue -> QueueStatus -> m ()
|
||||
updateRcvQueueStatus st ReceiveQueue {rcvId, server = SMPServer {host, port}} status =
|
||||
updateReceiveQueueStatus st rcvId host port status
|
||||
setRcvQueueStatus :: SQLiteStore -> ReceiveQueue -> QueueStatus -> m ()
|
||||
setRcvQueueStatus SQLiteStore {dbConn} rcvQueue status =
|
||||
liftIO $
|
||||
updateRcvQueueStatus dbConn rcvQueue status
|
||||
|
||||
-- TODO throw error if queue doesn't exist
|
||||
updateSndQueueStatus :: SQLiteStore -> SendQueue -> QueueStatus -> m ()
|
||||
updateSndQueueStatus st SendQueue {sndId, server = SMPServer {host, port}} status =
|
||||
updateSendQueueStatus st sndId host port status
|
||||
setSndQueueStatus :: SQLiteStore -> SendQueue -> QueueStatus -> m ()
|
||||
setSndQueueStatus SQLiteStore {dbConn} sndQueue status =
|
||||
liftIO $
|
||||
updateSndQueueStatus dbConn sndQueue status
|
||||
|
||||
-- TODO decrease duplication of queue direction checks?
|
||||
createMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
createMsg st connAlias qDirection agentMsgId msg = do
|
||||
case qDirection of
|
||||
RCV -> do
|
||||
(rcvQId, _) <- getConnection st connAlias
|
||||
case rcvQId of
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeAgentMessage msg
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
SND -> do
|
||||
(_, sndQId) <- getConnection st connAlias
|
||||
case sndQId of
|
||||
Just _ -> insertMsg st connAlias qDirection agentMsgId $ serializeAgentMessage msg
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
createMsg SQLiteStore {dbConn} connAlias qDirection agentMsgId aMsg =
|
||||
liftIO
|
||||
(insertMsg dbConn connAlias agentMsgId aMsg)
|
||||
>>= liftEither
|
||||
where
|
||||
insertMsg = case qDirection of
|
||||
RCV -> insertRcvMsg
|
||||
SND -> insertSndMsg
|
||||
|
||||
getLastMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> m MessageDelivery
|
||||
getLastMsg _st _connAlias _dir = throwError SENotImplemented
|
||||
@@ -159,7 +108,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
getMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> m MessageDelivery
|
||||
getMsg _st _connAlias _dir _msgId = throwError SENotImplemented
|
||||
|
||||
-- TODO missing status parameter?
|
||||
-- ? missing status parameter?
|
||||
updateMsgStatus :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> m ()
|
||||
updateMsgStatus _st _connAlias _dir _msgId = throwError SENotImplemented
|
||||
|
||||
|
||||
@@ -1,84 +1,103 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Schema where
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Schema (createSchema) where
|
||||
|
||||
import Database.SQLite.Simple
|
||||
import Database.SQLite.Simple (Connection, Query, execute_)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
enableFKs :: Query
|
||||
enableFKs = "PRAGMA foreign_keys = ON;"
|
||||
|
||||
-- port is either a port number or a service name, see Network.Socket.Info.ServiceName
|
||||
servers :: Query
|
||||
servers =
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS servers
|
||||
( server_id INTEGER PRIMARY KEY,
|
||||
host TEXT NOT NULL,
|
||||
port TEXT,
|
||||
key_hash BLOB,
|
||||
UNIQUE (host, port)
|
||||
)
|
||||
CREATE TABLE IF NOT EXISTS servers(
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
key_hash BLOB,
|
||||
PRIMARY KEY (host, port)
|
||||
) WITHOUT ROWID;
|
||||
|]
|
||||
|
||||
receiveQueues :: Query
|
||||
receiveQueues =
|
||||
rcvQueues :: Query
|
||||
rcvQueues =
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS receive_queues
|
||||
( receive_queue_id INTEGER PRIMARY KEY,
|
||||
server_id INTEGER REFERENCES servers(server_id) NOT NULL,
|
||||
rcv_id BLOB NOT NULL,
|
||||
rcv_private_key BLOB NOT NULL,
|
||||
snd_id BLOB,
|
||||
snd_key BLOB,
|
||||
decrypt_key BLOB NOT NULL,
|
||||
verify_key BLOB,
|
||||
status TEXT NOT NULL,
|
||||
ack_mode INTEGER NOT NULL,
|
||||
UNIQUE (server_id, rcv_id),
|
||||
UNIQUE (server_id, snd_id)
|
||||
)
|
||||
CREATE TABLE IF NOT EXISTS rcv_queues(
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
rcv_id BLOB NOT NULL,
|
||||
conn_alias TEXT NOT NULL,
|
||||
rcv_private_key BLOB NOT NULL,
|
||||
snd_id BLOB,
|
||||
snd_key BLOB,
|
||||
decrypt_key BLOB NOT NULL,
|
||||
verify_key BLOB,
|
||||
status TEXT NOT NULL,
|
||||
PRIMARY KEY(host, port, rcv_id),
|
||||
FOREIGN KEY(host, port) REFERENCES servers(host, port),
|
||||
FOREIGN KEY(conn_alias)
|
||||
REFERENCES connections(conn_alias)
|
||||
ON DELETE CASCADE
|
||||
DEFERRABLE INITIALLY DEFERRED,
|
||||
UNIQUE (host, port, snd_id)
|
||||
) WITHOUT ROWID;
|
||||
|]
|
||||
|
||||
sendQueues :: Query
|
||||
sendQueues =
|
||||
sndQueues :: Query
|
||||
sndQueues =
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS send_queues
|
||||
( send_queue_id INTEGER PRIMARY KEY,
|
||||
server_id INTEGER REFERENCES servers(server_id) NOT NULL,
|
||||
snd_id BLOB NOT NULL,
|
||||
snd_private_key BLOB NOT NULL,
|
||||
encrypt_key BLOB NOT NULL,
|
||||
sign_key BLOB NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
ack_mode INTEGER NOT NULL,
|
||||
UNIQUE (server_id, snd_id)
|
||||
)
|
||||
CREATE TABLE IF NOT EXISTS snd_queues(
|
||||
host TEXT NOT NULL,
|
||||
port TEXT NOT NULL,
|
||||
snd_id BLOB NOT NULL,
|
||||
conn_alias TEXT NOT NULL,
|
||||
snd_private_key BLOB NOT NULL,
|
||||
encrypt_key BLOB NOT NULL,
|
||||
sign_key BLOB NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
PRIMARY KEY(host, port, snd_id),
|
||||
FOREIGN KEY(host, port) REFERENCES servers(host, port),
|
||||
FOREIGN KEY(conn_alias)
|
||||
REFERENCES connections(conn_alias)
|
||||
ON DELETE CASCADE
|
||||
DEFERRABLE INITIALLY DEFERRED
|
||||
) WITHOUT ROWID;
|
||||
|]
|
||||
|
||||
connections :: Query
|
||||
connections =
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS connections
|
||||
( connection_id INTEGER PRIMARY KEY,
|
||||
conn_alias TEXT UNIQUE,
|
||||
receive_queue_id INTEGER REFERENCES recipient_queues(receive_queue_id) UNIQUE,
|
||||
send_queue_id INTEGER REFERENCES sender_queues(send_queue_id) UNIQUE
|
||||
)
|
||||
CREATE TABLE IF NOT EXISTS connections(
|
||||
conn_alias TEXT NOT NULL,
|
||||
rcv_host TEXT,
|
||||
rcv_port TEXT,
|
||||
rcv_id BLOB,
|
||||
snd_host TEXT,
|
||||
snd_port TEXT,
|
||||
snd_id BLOB,
|
||||
PRIMARY KEY(conn_alias),
|
||||
FOREIGN KEY(rcv_host, rcv_port, rcv_id) REFERENCES rcv_queues(host, port, rcv_id),
|
||||
FOREIGN KEY(snd_host, snd_port, snd_id) REFERENCES snd_queues(host, port, snd_id)
|
||||
) WITHOUT ROWID;
|
||||
|]
|
||||
|
||||
messages :: Query
|
||||
messages =
|
||||
[sql|
|
||||
CREATE TABLE IF NOT EXISTS messages
|
||||
( message_id INTEGER PRIMARY KEY,
|
||||
conn_alias TEXT REFERENCES connections(conn_alias),
|
||||
agent_msg_id INTEGER NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
message BLOB NOT NULL,
|
||||
direction TEXT NOT NULL,
|
||||
msg_status TEXT NOT NULL
|
||||
)
|
||||
CREATE TABLE IF NOT EXISTS messages(
|
||||
agent_msg_id INTEGER NOT NULL,
|
||||
conn_alias TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
message BLOB NOT NULL,
|
||||
direction TEXT NOT NULL,
|
||||
msg_status TEXT NOT NULL,
|
||||
PRIMARY KEY(agent_msg_id, conn_alias),
|
||||
FOREIGN KEY(conn_alias) REFERENCES connections(conn_alias)
|
||||
) WITHOUT ROWID;
|
||||
|]
|
||||
|
||||
createSchema :: Connection -> IO ()
|
||||
createSchema conn =
|
||||
mapM_ (execute_ conn) [servers, receiveQueues, sendQueues, connections, messages]
|
||||
mapM_ (execute_ conn) [enableFKs, servers, rcvQueues, sndQueues, connections, messages]
|
||||
|
||||
@@ -1,69 +1,61 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Util where
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Util
|
||||
( createRcvQueueAndConn,
|
||||
createSndQueueAndConn,
|
||||
retrieveConnQueues,
|
||||
retrieveRcvQueue,
|
||||
deleteConnCascade,
|
||||
updateRcvConnWithSndQueue,
|
||||
updateSndConnWithRcvQueue,
|
||||
updateRcvQueueStatus,
|
||||
updateSndQueueStatus,
|
||||
insertRcvMsg,
|
||||
insertSndMsg,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Data.Int (Int64)
|
||||
import Control.Monad.Except (MonadIO (liftIO))
|
||||
import Data.Maybe (fromMaybe)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time
|
||||
import Database.SQLite.Simple hiding (Connection)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Data.Time (getCurrentTime)
|
||||
import Database.SQLite.Simple as DB
|
||||
import Database.SQLite.Simple.FromField
|
||||
import Database.SQLite.Simple.Internal (Field (..))
|
||||
import Database.SQLite.Simple.Ok
|
||||
import Database.SQLite.Simple.Ok (Ok (Ok))
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import Database.SQLite.Simple.ToField
|
||||
import Network.Socket
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Util
|
||||
import Text.Read
|
||||
import Simplex.Messaging.Protocol as SMP (RecipientId)
|
||||
import Text.Read (readMaybe)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
data SQLiteStore = SQLiteStore
|
||||
{ dbFilename :: String,
|
||||
conn :: DB.Connection,
|
||||
serversLock :: TMVar (),
|
||||
rcvQueuesLock :: TMVar (),
|
||||
sndQueuesLock :: TMVar (),
|
||||
connectionsLock :: TMVar (),
|
||||
messagesLock :: TMVar ()
|
||||
}
|
||||
-- ? replace with ToField? - it's easy to forget to use this
|
||||
serializePort_ :: Maybe ServiceName -> ServiceName
|
||||
serializePort_ = fromMaybe "_"
|
||||
|
||||
type QueueRowId = Int64
|
||||
deserializePort_ :: ServiceName -> Maybe ServiceName
|
||||
deserializePort_ "_" = Nothing
|
||||
deserializePort_ port = Just port
|
||||
|
||||
type ConnectionRowId = Int64
|
||||
instance ToField QueueStatus where toField = toField . show
|
||||
|
||||
addRcvQueueQuery :: Query
|
||||
addRcvQueueQuery =
|
||||
[sql|
|
||||
INSERT INTO receive_queues
|
||||
( server_id, rcv_id, rcv_private_key, snd_id, snd_key, decrypt_key, verify_key, status, ack_mode)
|
||||
VALUES
|
||||
(:server_id,:rcv_id,:rcv_private_key,:snd_id,:snd_key,:decrypt_key,:verify_key,:status,:ack_mode);
|
||||
|]
|
||||
instance FromField QueueStatus where fromField = fromFieldToReadable_
|
||||
|
||||
fromFieldToReadable :: forall a. (Read a, E.Typeable a) => Field -> Ok a
|
||||
fromFieldToReadable = \case
|
||||
instance ToField QueueDirection where toField = toField . show
|
||||
|
||||
fromFieldToReadable_ :: forall a. (Read a, E.Typeable a) => Field -> Ok a
|
||||
fromFieldToReadable_ = \case
|
||||
f@(Field (SQLText t) _) ->
|
||||
let str = T.unpack t
|
||||
in case readMaybe str of
|
||||
@@ -71,312 +63,369 @@ fromFieldToReadable = \case
|
||||
_ -> returnError ConversionFailed f ("invalid string: " <> str)
|
||||
f -> returnError ConversionFailed f "expecting SQLText column type"
|
||||
|
||||
withLock :: MonadUnliftIO m => SQLiteStore -> (SQLiteStore -> TMVar ()) -> (DB.Connection -> m a) -> m a
|
||||
withLock st tableLock f = do
|
||||
let lock = tableLock st
|
||||
E.bracket_
|
||||
(atomically $ takeTMVar lock)
|
||||
(atomically $ putTMVar lock ())
|
||||
(f $ conn st)
|
||||
{- ORMOLU_DISABLE -}
|
||||
-- SQLite.Simple only has these up to 10 fields, which is insufficient for some of our queries
|
||||
instance (FromField a, FromField b, FromField c, FromField d, FromField e,
|
||||
FromField f, FromField g, FromField h, FromField i, FromField j,
|
||||
FromField k) =>
|
||||
FromRow (a,b,c,d,e,f,g,h,i,j,k) where
|
||||
fromRow = (,,,,,,,,,,) <$> field <*> field <*> field <*> field <*> field
|
||||
<*> field <*> field <*> field <*> field <*> field
|
||||
<*> field
|
||||
{- ORMOLU_ENABLE -}
|
||||
|
||||
insertWithLock :: (MonadUnliftIO m, ToRow q) => SQLiteStore -> (SQLiteStore -> TMVar ()) -> DB.Query -> q -> m Int64
|
||||
insertWithLock st tableLock queryStr q =
|
||||
withLock st tableLock $ \c -> liftIO $ do
|
||||
DB.execute c queryStr q
|
||||
DB.lastInsertRowId c
|
||||
createRcvQueueAndConn :: DB.Connection -> ReceiveQueue -> IO ()
|
||||
createRcvQueueAndConn dbConn rcvQueue =
|
||||
DB.withTransaction dbConn $ do
|
||||
upsertServer_ dbConn (server (rcvQueue :: ReceiveQueue))
|
||||
insertRcvQueue_ dbConn rcvQueue
|
||||
insertRcvConnection_ dbConn rcvQueue
|
||||
|
||||
executeWithLock :: (MonadUnliftIO m, ToRow q) => SQLiteStore -> (SQLiteStore -> TMVar ()) -> DB.Query -> q -> m ()
|
||||
executeWithLock st tableLock queryStr q =
|
||||
withLock st tableLock $ \c ->
|
||||
liftIO $ DB.execute c queryStr q
|
||||
createSndQueueAndConn :: DB.Connection -> SendQueue -> IO ()
|
||||
createSndQueueAndConn dbConn sndQueue =
|
||||
DB.withTransaction dbConn $ do
|
||||
upsertServer_ dbConn (server (sndQueue :: SendQueue))
|
||||
insertSndQueue_ dbConn sndQueue
|
||||
insertSndConnection_ dbConn sndQueue
|
||||
|
||||
instance ToRow SMPServer where
|
||||
toRow SMPServer {host, port, keyHash} = toRow (host, port, keyHash)
|
||||
upsertServer_ :: DB.Connection -> SMPServer -> IO ()
|
||||
upsertServer_ dbConn SMPServer {host, port, keyHash} = do
|
||||
let port_ = serializePort_ port
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
upsertServerQuery_
|
||||
[":host" := host, ":port" := port_, ":key_hash" := keyHash]
|
||||
|
||||
instance FromRow SMPServer where
|
||||
fromRow = SMPServer <$> field <*> field <*> field
|
||||
upsertServerQuery_ :: Query
|
||||
upsertServerQuery_ =
|
||||
[sql|
|
||||
INSERT INTO servers (host, port, key_hash) VALUES (:host,:port,:key_hash)
|
||||
ON CONFLICT (host, port) DO UPDATE SET
|
||||
host=excluded.host,
|
||||
port=excluded.port,
|
||||
key_hash=excluded.key_hash;
|
||||
|]
|
||||
|
||||
upsertServer :: (MonadUnliftIO m, MonadError StoreError m) => SQLiteStore -> SMPServer -> m SMPServerId
|
||||
upsertServer SQLiteStore {conn} srv@SMPServer {host, port} = do
|
||||
r <- liftIO $ do
|
||||
DB.execute
|
||||
conn
|
||||
[sql|
|
||||
INSERT INTO servers (host, port, key_hash) VALUES (?, ?, ?)
|
||||
ON CONFLICT (host, port) DO UPDATE SET
|
||||
host=excluded.host,
|
||||
port=excluded.port,
|
||||
key_hash=excluded.key_hash;
|
||||
|]
|
||||
srv
|
||||
insertRcvQueue_ :: DB.Connection -> ReceiveQueue -> IO ()
|
||||
insertRcvQueue_ dbConn ReceiveQueue {..} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
insertRcvQueueQuery_
|
||||
[ ":host" := host server,
|
||||
":port" := port_,
|
||||
":rcv_id" := rcvId,
|
||||
":conn_alias" := connAlias,
|
||||
":rcv_private_key" := rcvPrivateKey,
|
||||
":snd_id" := sndId,
|
||||
":snd_key" := sndKey,
|
||||
":decrypt_key" := decryptKey,
|
||||
":verify_key" := verifyKey,
|
||||
":status" := status
|
||||
]
|
||||
|
||||
insertRcvQueueQuery_ :: Query
|
||||
insertRcvQueueQuery_ =
|
||||
[sql|
|
||||
INSERT INTO rcv_queues
|
||||
( host, port, rcv_id, conn_alias, rcv_private_key, snd_id, snd_key, decrypt_key, verify_key, status)
|
||||
VALUES
|
||||
(:host,:port,:rcv_id,:conn_alias,:rcv_private_key,:snd_id,:snd_key,:decrypt_key,:verify_key,:status);
|
||||
|]
|
||||
|
||||
insertRcvConnection_ :: DB.Connection -> ReceiveQueue -> IO ()
|
||||
insertRcvConnection_ dbConn ReceiveQueue {server, rcvId, connAlias} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
insertRcvConnectionQuery_
|
||||
[":conn_alias" := connAlias, ":rcv_host" := host server, ":rcv_port" := port_, ":rcv_id" := rcvId]
|
||||
|
||||
insertRcvConnectionQuery_ :: Query
|
||||
insertRcvConnectionQuery_ =
|
||||
[sql|
|
||||
INSERT INTO connections
|
||||
( conn_alias, rcv_host, rcv_port, rcv_id, snd_host, snd_port, snd_id)
|
||||
VALUES
|
||||
(:conn_alias,:rcv_host,:rcv_port,:rcv_id, NULL, NULL, NULL);
|
||||
|]
|
||||
|
||||
insertSndQueue_ :: DB.Connection -> SendQueue -> IO ()
|
||||
insertSndQueue_ dbConn SendQueue {..} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
insertSndQueueQuery_
|
||||
[ ":host" := host server,
|
||||
":port" := port_,
|
||||
":snd_id" := sndId,
|
||||
":conn_alias" := connAlias,
|
||||
":snd_private_key" := sndPrivateKey,
|
||||
":encrypt_key" := encryptKey,
|
||||
":sign_key" := signKey,
|
||||
":status" := status
|
||||
]
|
||||
|
||||
insertSndQueueQuery_ :: Query
|
||||
insertSndQueueQuery_ =
|
||||
[sql|
|
||||
INSERT INTO snd_queues
|
||||
( host, port, snd_id, conn_alias, snd_private_key, encrypt_key, sign_key, status)
|
||||
VALUES
|
||||
(:host,:port,:snd_id,:conn_alias,:snd_private_key,:encrypt_key,:sign_key,:status);
|
||||
|]
|
||||
|
||||
insertSndConnection_ :: DB.Connection -> SendQueue -> IO ()
|
||||
insertSndConnection_ dbConn SendQueue {server, sndId, connAlias} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
insertSndConnectionQuery_
|
||||
[":conn_alias" := connAlias, ":snd_host" := host server, ":snd_port" := port_, ":snd_id" := sndId]
|
||||
|
||||
insertSndConnectionQuery_ :: Query
|
||||
insertSndConnectionQuery_ =
|
||||
[sql|
|
||||
INSERT INTO connections
|
||||
( conn_alias, rcv_host, rcv_port, rcv_id, snd_host, snd_port, snd_id)
|
||||
VALUES
|
||||
(:conn_alias, NULL, NULL, NULL,:snd_host,:snd_port,:snd_id);
|
||||
|]
|
||||
|
||||
retrieveConnQueues :: DB.Connection -> ConnAlias -> IO (Maybe ReceiveQueue, Maybe SendQueue)
|
||||
retrieveConnQueues dbConn connAlias =
|
||||
DB.withTransaction -- Avoid inconsistent state between queue reads
|
||||
dbConn
|
||||
$ retrieveConnQueues_ dbConn connAlias
|
||||
|
||||
-- Separate transactionless version of retrieveConnQueues to be reused in other functions that already wrap
|
||||
-- multiple statements in transaction - otherwise they'd be attempting to start a transaction within a transaction
|
||||
retrieveConnQueues_ :: DB.Connection -> ConnAlias -> IO (Maybe ReceiveQueue, Maybe SendQueue)
|
||||
retrieveConnQueues_ dbConn connAlias = do
|
||||
rcvQ <- retrieveRcvQueueByConnAlias_ dbConn connAlias
|
||||
sndQ <- retrieveSndQueueByConnAlias_ dbConn connAlias
|
||||
return (rcvQ, sndQ)
|
||||
|
||||
retrieveRcvQueueByConnAlias_ :: DB.Connection -> ConnAlias -> IO (Maybe ReceiveQueue)
|
||||
retrieveRcvQueueByConnAlias_ dbConn connAlias = do
|
||||
r <-
|
||||
DB.queryNamed
|
||||
conn
|
||||
"SELECT server_id FROM servers WHERE host = :host AND port = :port"
|
||||
[":host" := host, ":port" := port]
|
||||
dbConn
|
||||
retrieveRcvQueueByConnAliasQuery_
|
||||
[":conn_alias" := connAlias]
|
||||
case r of
|
||||
[Only serverId] -> return serverId
|
||||
_ -> throwError SEInternal
|
||||
[(keyHash, host, port, rcvId, cAlias, rcvPrivateKey, sndId, sndKey, decryptKey, verifyKey, status)] -> do
|
||||
let srv = SMPServer host (deserializePort_ port) keyHash
|
||||
return . Just $ ReceiveQueue srv rcvId cAlias rcvPrivateKey sndId sndKey decryptKey verifyKey status
|
||||
_ -> return Nothing
|
||||
|
||||
getServer :: (MonadUnliftIO m, MonadError StoreError m) => SQLiteStore -> SMPServerId -> m SMPServer
|
||||
getServer SQLiteStore {conn} serverId = do
|
||||
retrieveRcvQueueByConnAliasQuery_ :: Query
|
||||
retrieveRcvQueueByConnAliasQuery_ =
|
||||
[sql|
|
||||
SELECT
|
||||
s.key_hash, q.host, q.port, q.rcv_id, q.conn_alias, q.rcv_private_key,
|
||||
q.snd_id, q.snd_key, q.decrypt_key, q.verify_key, q.status
|
||||
FROM rcv_queues q
|
||||
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
WHERE q.conn_alias = :conn_alias;
|
||||
|]
|
||||
|
||||
retrieveSndQueueByConnAlias_ :: DB.Connection -> ConnAlias -> IO (Maybe SendQueue)
|
||||
retrieveSndQueueByConnAlias_ dbConn connAlias = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
"SELECT host, port, key_hash FROM servers WHERE server_id = :server_id"
|
||||
[":server_id" := serverId]
|
||||
DB.queryNamed
|
||||
dbConn
|
||||
retrieveSndQueueByConnAliasQuery_
|
||||
[":conn_alias" := connAlias]
|
||||
case r of
|
||||
[smpServer] -> return smpServer
|
||||
_ -> throwError SENotFound
|
||||
[(keyHash, host, port, sndId, cAlias, sndPrivateKey, encryptKey, signKey, status)] -> do
|
||||
let srv = SMPServer host (deserializePort_ port) keyHash
|
||||
return . Just $ SendQueue srv sndId cAlias sndPrivateKey encryptKey signKey status
|
||||
_ -> return Nothing
|
||||
|
||||
instance ToField AckMode where toField (AckMode mode) = toField $ show mode
|
||||
retrieveSndQueueByConnAliasQuery_ :: Query
|
||||
retrieveSndQueueByConnAliasQuery_ =
|
||||
[sql|
|
||||
SELECT
|
||||
s.key_hash, q.host, q.port, q.snd_id, q.conn_alias,
|
||||
q.snd_private_key, q.encrypt_key, q.sign_key, q.status
|
||||
FROM snd_queues q
|
||||
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
WHERE q.conn_alias = :conn_alias;
|
||||
|]
|
||||
|
||||
instance FromField AckMode where fromField = AckMode <$$> fromFieldToReadable
|
||||
|
||||
instance ToField QueueStatus where toField = toField . show
|
||||
|
||||
instance FromField QueueStatus where fromField = fromFieldToReadable
|
||||
|
||||
instance ToRow ReceiveQueue where
|
||||
toRow ReceiveQueue {rcvId, rcvPrivateKey, sndId, sndKey, decryptKey, verifyKey, status, ackMode} =
|
||||
toRow (rcvId, rcvPrivateKey, sndId, sndKey, decryptKey, verifyKey, status, ackMode)
|
||||
|
||||
instance FromRow ReceiveQueue where
|
||||
fromRow = ReceiveQueue undefined <$> field <*> field <*> field <*> field <*> field <*> field <*> field <*> field
|
||||
|
||||
-- TODO refactor into a single query with join
|
||||
getRcvQueue :: (MonadUnliftIO m, MonadError StoreError m) => SQLiteStore -> QueueRowId -> m ReceiveQueue
|
||||
getRcvQueue st@SQLiteStore {conn} queueRowId = do
|
||||
-- ? make server an argument and pass it for the queue instead of joining with 'servers'?
|
||||
-- ? the downside would be the queue having an outdated 'key_hash' if it has changed,
|
||||
-- ? but maybe it's unwanted behavior?
|
||||
retrieveRcvQueue :: DB.Connection -> HostName -> Maybe ServiceName -> SMP.RecipientId -> IO (Maybe ReceiveQueue)
|
||||
retrieveRcvQueue dbConn host port rcvId = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
[sql|
|
||||
SELECT server_id, rcv_id, rcv_private_key, snd_id, snd_key, decrypt_key, verify_key, status, ack_mode
|
||||
FROM receive_queues
|
||||
WHERE receive_queue_id = :rowId;
|
||||
|]
|
||||
[":rowId" := queueRowId]
|
||||
DB.queryNamed
|
||||
dbConn
|
||||
retrieveRcvQueueQuery_
|
||||
[":host" := host, ":port" := serializePort_ port, ":rcv_id" := rcvId]
|
||||
case r of
|
||||
[Only serverId :. rcvQueue] ->
|
||||
(\srv -> (rcvQueue {server = srv} :: ReceiveQueue)) <$> getServer st serverId
|
||||
_ -> throwError SENotFound
|
||||
[(keyHash, hst, prt, rId, connAlias, rcvPrivateKey, sndId, sndKey, decryptKey, verifyKey, status)] -> do
|
||||
let srv = SMPServer hst (deserializePort_ prt) keyHash
|
||||
return . Just $ ReceiveQueue srv rId connAlias rcvPrivateKey sndId sndKey decryptKey verifyKey status
|
||||
_ -> return Nothing
|
||||
|
||||
getRcvQueueByRecipientId :: (MonadUnliftIO m, MonadError StoreError m) => SQLiteStore -> RecipientId -> HostName -> Maybe ServiceName -> m ReceiveQueue
|
||||
getRcvQueueByRecipientId st@SQLiteStore {conn} rcvId host port = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
[sql|
|
||||
SELECT server_id, rcv_id, rcv_private_key, snd_id, snd_key, decrypt_key, verify_key, status, ack_mode
|
||||
FROM receive_queues
|
||||
WHERE rcv_id = :rcvId AND server_id IN (
|
||||
SELECT server_id
|
||||
FROM servers
|
||||
WHERE host = :host AND port = :port
|
||||
);
|
||||
|]
|
||||
[":rcvId" := rcvId, ":host" := host, ":port" := port]
|
||||
case r of
|
||||
[Only serverId :. rcvQueue] ->
|
||||
(\srv -> (rcvQueue {server = srv} :: ReceiveQueue)) <$> getServer st serverId
|
||||
_ -> throwError SENotFound
|
||||
retrieveRcvQueueQuery_ :: Query
|
||||
retrieveRcvQueueQuery_ =
|
||||
[sql|
|
||||
SELECT
|
||||
s.key_hash, q.host, q.port, q.rcv_id, q.conn_alias, q.rcv_private_key,
|
||||
q.snd_id, q.snd_key, q.decrypt_key, q.verify_key, q.status
|
||||
FROM rcv_queues q
|
||||
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
WHERE q.host = :host AND q.port = :port AND q.rcv_id = :rcv_id;
|
||||
|]
|
||||
|
||||
-- TODO refactor into a single query with join
|
||||
getSndQueue :: (MonadUnliftIO m, MonadError StoreError m) => SQLiteStore -> QueueRowId -> m SendQueue
|
||||
getSndQueue st@SQLiteStore {conn} queueRowId = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
[sql|
|
||||
SELECT server_id, snd_id, snd_private_key, encrypt_key, sign_key, status, ack_mode
|
||||
FROM send_queues
|
||||
WHERE send_queue_id = :rowId;
|
||||
|]
|
||||
[":rowId" := queueRowId]
|
||||
case r of
|
||||
[Only serverId :. sndQueue] ->
|
||||
(\srv -> (sndQueue {server = srv} :: SendQueue)) <$> getServer st serverId
|
||||
_ -> throwError SENotFound
|
||||
deleteConnCascade :: DB.Connection -> ConnAlias -> IO ()
|
||||
deleteConnCascade dbConn connAlias =
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
"DELETE FROM connections WHERE conn_alias = :conn_alias;"
|
||||
[":conn_alias" := connAlias]
|
||||
|
||||
insertRcvQueue :: MonadUnliftIO m => SQLiteStore -> SMPServerId -> ReceiveQueue -> m QueueRowId
|
||||
insertRcvQueue store serverId rcvQueue =
|
||||
insertWithLock
|
||||
store
|
||||
rcvQueuesLock
|
||||
[sql|
|
||||
INSERT INTO receive_queues
|
||||
( server_id, rcv_id, rcv_private_key, snd_id, snd_key, decrypt_key, verify_key, status, ack_mode)
|
||||
VALUES (?,?,?,?,?,?,?,?,?);
|
||||
|]
|
||||
(Only serverId :. rcvQueue)
|
||||
-- ? rewrite with ExceptT?
|
||||
updateRcvConnWithSndQueue :: DB.Connection -> ConnAlias -> SendQueue -> IO (Either StoreError ())
|
||||
updateRcvConnWithSndQueue dbConn connAlias sndQueue =
|
||||
DB.withTransaction dbConn $ do
|
||||
queues <- retrieveConnQueues_ dbConn connAlias
|
||||
case queues of
|
||||
(Just _rcvQ, Nothing) -> do
|
||||
upsertServer_ dbConn (server (sndQueue :: SendQueue))
|
||||
insertSndQueue_ dbConn sndQueue
|
||||
updateConnWithSndQueue_ dbConn connAlias sndQueue
|
||||
return $ Right ()
|
||||
(Nothing, Just _sndQ) -> return $ Left (SEBadConnType CSend)
|
||||
(Just _rcvQ, Just _sndQ) -> return $ Left (SEBadConnType CDuplex)
|
||||
_ -> return $ Left SEBadConn
|
||||
|
||||
insertRcvConnection :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
insertRcvConnection store connAlias rcvQueueId =
|
||||
void $
|
||||
insertWithLock
|
||||
store
|
||||
connectionsLock
|
||||
"INSERT INTO connections (conn_alias, receive_queue_id, send_queue_id) VALUES (?,?,NULL);"
|
||||
(Only connAlias :. Only rcvQueueId)
|
||||
updateConnWithSndQueue_ :: DB.Connection -> ConnAlias -> SendQueue -> IO ()
|
||||
updateConnWithSndQueue_ dbConn connAlias SendQueue {server, sndId} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
updateConnWithSndQueueQuery_
|
||||
[":snd_host" := host server, ":snd_port" := port_, ":snd_id" := sndId, ":conn_alias" := connAlias]
|
||||
|
||||
updateRcvConnectionWithSndQueue :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
updateRcvConnectionWithSndQueue store connAlias sndQueueId =
|
||||
executeWithLock
|
||||
store
|
||||
connectionsLock
|
||||
[sql|
|
||||
UPDATE connections
|
||||
SET send_queue_id = ?
|
||||
WHERE conn_alias = ?;
|
||||
|]
|
||||
(Only sndQueueId :. Only connAlias)
|
||||
updateConnWithSndQueueQuery_ :: Query
|
||||
updateConnWithSndQueueQuery_ =
|
||||
[sql|
|
||||
UPDATE connections
|
||||
SET snd_host = :snd_host, snd_port = :snd_port, snd_id = :snd_id
|
||||
WHERE conn_alias = :conn_alias;
|
||||
|]
|
||||
|
||||
instance ToRow SendQueue where
|
||||
toRow SendQueue {sndId, sndPrivateKey, encryptKey, signKey, status, ackMode} =
|
||||
toRow (sndId, sndPrivateKey, encryptKey, signKey, status, ackMode)
|
||||
-- ? rewrite with ExceptT?
|
||||
updateSndConnWithRcvQueue :: DB.Connection -> ConnAlias -> ReceiveQueue -> IO (Either StoreError ())
|
||||
updateSndConnWithRcvQueue dbConn connAlias rcvQueue =
|
||||
DB.withTransaction dbConn $ do
|
||||
queues <- retrieveConnQueues_ dbConn connAlias
|
||||
case queues of
|
||||
(Nothing, Just _sndQ) -> do
|
||||
upsertServer_ dbConn (server (rcvQueue :: ReceiveQueue))
|
||||
insertRcvQueue_ dbConn rcvQueue
|
||||
updateConnWithRcvQueue_ dbConn connAlias rcvQueue
|
||||
return $ Right ()
|
||||
(Just _rcvQ, Nothing) -> return $ Left (SEBadConnType CReceive)
|
||||
(Just _rcvQ, Just _sndQ) -> return $ Left (SEBadConnType CDuplex)
|
||||
_ -> return $ Left SEBadConn
|
||||
|
||||
instance FromRow SendQueue where
|
||||
fromRow = SendQueue undefined <$> field <*> field <*> field <*> field <*> field <*> field
|
||||
updateConnWithRcvQueue_ :: DB.Connection -> ConnAlias -> ReceiveQueue -> IO ()
|
||||
updateConnWithRcvQueue_ dbConn connAlias ReceiveQueue {server, rcvId} = do
|
||||
let port_ = serializePort_ $ port server
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
updateConnWithRcvQueueQuery_
|
||||
[":rcv_host" := host server, ":rcv_port" := port_, ":rcv_id" := rcvId, ":conn_alias" := connAlias]
|
||||
|
||||
insertSndQueue :: MonadUnliftIO m => SQLiteStore -> SMPServerId -> SendQueue -> m QueueRowId
|
||||
insertSndQueue store serverId sndQueue =
|
||||
insertWithLock
|
||||
store
|
||||
sndQueuesLock
|
||||
[sql|
|
||||
INSERT INTO send_queues
|
||||
( server_id, snd_id, snd_private_key, encrypt_key, sign_key, status, ack_mode)
|
||||
VALUES (?,?,?,?,?,?,?);
|
||||
|]
|
||||
(Only serverId :. sndQueue)
|
||||
updateConnWithRcvQueueQuery_ :: Query
|
||||
updateConnWithRcvQueueQuery_ =
|
||||
[sql|
|
||||
UPDATE connections
|
||||
SET rcv_host = :rcv_host, rcv_port = :rcv_port, rcv_id = :rcv_id
|
||||
WHERE conn_alias = :conn_alias;
|
||||
|]
|
||||
|
||||
insertSndConnection :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
insertSndConnection store connAlias sndQueueId =
|
||||
void $
|
||||
insertWithLock
|
||||
store
|
||||
connectionsLock
|
||||
"INSERT INTO connections (conn_alias, receive_queue_id, send_queue_id) VALUES (?,NULL,?);"
|
||||
(Only connAlias :. Only sndQueueId)
|
||||
-- ? throw error if queue doesn't exist?
|
||||
updateRcvQueueStatus :: DB.Connection -> ReceiveQueue -> QueueStatus -> IO ()
|
||||
updateRcvQueueStatus dbConn ReceiveQueue {rcvId, server = SMPServer {host, port}} status =
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
updateRcvQueueStatusQuery_
|
||||
[":status" := status, ":host" := host, ":port" := serializePort_ port, ":rcv_id" := rcvId]
|
||||
|
||||
updateSndConnectionWithRcvQueue :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
updateSndConnectionWithRcvQueue store connAlias rcvQueueId =
|
||||
executeWithLock
|
||||
store
|
||||
connectionsLock
|
||||
[sql|
|
||||
UPDATE connections
|
||||
SET receive_queue_id = ?
|
||||
WHERE conn_alias = ?;
|
||||
|]
|
||||
(Only rcvQueueId :. Only connAlias)
|
||||
updateRcvQueueStatusQuery_ :: Query
|
||||
updateRcvQueueStatusQuery_ =
|
||||
[sql|
|
||||
UPDATE rcv_queues
|
||||
SET status = :status
|
||||
WHERE host = :host AND port = :port AND rcv_id = :rcv_id;
|
||||
|]
|
||||
|
||||
getConnection :: (MonadError StoreError m, MonadUnliftIO m) => SQLiteStore -> ConnAlias -> m (Maybe QueueRowId, Maybe QueueRowId)
|
||||
getConnection SQLiteStore {conn} connAlias = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
"SELECT receive_queue_id, send_queue_id FROM connections WHERE conn_alias = :conn_alias"
|
||||
[":conn_alias" := connAlias]
|
||||
case r of
|
||||
[queueIds] -> return queueIds
|
||||
_ -> throwError SEInternal
|
||||
-- ? throw error if queue doesn't exist?
|
||||
updateSndQueueStatus :: DB.Connection -> SendQueue -> QueueStatus -> IO ()
|
||||
updateSndQueueStatus dbConn SendQueue {sndId, server = SMPServer {host, port}} status =
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
updateSndQueueStatusQuery_
|
||||
[":status" := status, ":host" := host, ":port" := serializePort_ port, ":snd_id" := sndId]
|
||||
|
||||
instance FromRow ConnAlias where
|
||||
fromRow = field
|
||||
updateSndQueueStatusQuery_ :: Query
|
||||
updateSndQueueStatusQuery_ =
|
||||
[sql|
|
||||
UPDATE snd_queues
|
||||
SET status = :status
|
||||
WHERE host = :host AND port = :port AND snd_id = :snd_id;
|
||||
|]
|
||||
|
||||
getConnAliasByRcvQueue :: (MonadError StoreError m, MonadUnliftIO m) => SQLiteStore -> RecipientId -> m ConnAlias
|
||||
getConnAliasByRcvQueue SQLiteStore {conn} rcvId = do
|
||||
r <-
|
||||
liftIO $
|
||||
DB.queryNamed
|
||||
conn
|
||||
[sql|
|
||||
SELECT c.conn_alias
|
||||
FROM connections c
|
||||
JOIN receive_queues rq
|
||||
ON c.receive_queue_id = rq.receive_queue_id
|
||||
WHERE rq.rcv_id = :rcvId;
|
||||
|]
|
||||
[":rcvId" := rcvId]
|
||||
case r of
|
||||
[connAlias] -> return connAlias
|
||||
_ -> throwError SEInternal
|
||||
-- ? rewrite with ExceptT?
|
||||
insertRcvMsg :: DB.Connection -> ConnAlias -> AgentMsgId -> AMessage -> IO (Either StoreError ())
|
||||
insertRcvMsg dbConn connAlias agentMsgId aMsg =
|
||||
DB.withTransaction dbConn $ do
|
||||
queues <- retrieveConnQueues_ dbConn connAlias
|
||||
case queues of
|
||||
(Just _rcvQ, _) -> do
|
||||
insertMsg_ dbConn connAlias RCV agentMsgId aMsg
|
||||
return $ Right ()
|
||||
(Nothing, Just _sndQ) -> return $ Left SEBadQueueDirection
|
||||
_ -> return $ Left SEBadConn
|
||||
|
||||
deleteRcvQueue :: MonadUnliftIO m => SQLiteStore -> QueueRowId -> m ()
|
||||
deleteRcvQueue store rcvQueueId = do
|
||||
executeWithLock
|
||||
store
|
||||
rcvQueuesLock
|
||||
"DELETE FROM receive_queues WHERE receive_queue_id = ?"
|
||||
(Only rcvQueueId)
|
||||
|
||||
deleteSndQueue :: MonadUnliftIO m => SQLiteStore -> QueueRowId -> m ()
|
||||
deleteSndQueue store sndQueueId = do
|
||||
executeWithLock
|
||||
store
|
||||
sndQueuesLock
|
||||
"DELETE FROM send_queues WHERE send_queue_id = ?"
|
||||
(Only sndQueueId)
|
||||
|
||||
deleteConnection :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> m ()
|
||||
deleteConnection store connAlias = do
|
||||
executeWithLock
|
||||
store
|
||||
connectionsLock
|
||||
"DELETE FROM connections WHERE conn_alias = ?"
|
||||
(Only connAlias)
|
||||
|
||||
updateReceiveQueueStatus :: MonadUnliftIO m => SQLiteStore -> RecipientId -> HostName -> Maybe ServiceName -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus store rcvQueueId host port status =
|
||||
executeWithLock
|
||||
store
|
||||
rcvQueuesLock
|
||||
[sql|
|
||||
UPDATE receive_queues
|
||||
SET status = ?
|
||||
WHERE rcv_id = ?
|
||||
AND server_id IN (
|
||||
SELECT server_id
|
||||
FROM servers
|
||||
WHERE host = ? AND port = ?
|
||||
);
|
||||
|]
|
||||
(Only status :. Only rcvQueueId :. Only host :. Only port)
|
||||
|
||||
updateSendQueueStatus :: MonadUnliftIO m => SQLiteStore -> SMP.SenderId -> HostName -> Maybe ServiceName -> QueueStatus -> m ()
|
||||
updateSendQueueStatus store sndQueueId host port status =
|
||||
executeWithLock
|
||||
store
|
||||
sndQueuesLock
|
||||
[sql|
|
||||
UPDATE send_queues
|
||||
SET status = ?
|
||||
WHERE snd_id = ?
|
||||
AND server_id IN (
|
||||
SELECT server_id
|
||||
FROM servers
|
||||
WHERE host = ? AND port = ?
|
||||
);
|
||||
|]
|
||||
(Only status :. Only sndQueueId :. Only host :. Only port)
|
||||
|
||||
instance ToField QueueDirection where toField = toField . show
|
||||
-- ? rewrite with ExceptT?
|
||||
insertSndMsg :: DB.Connection -> ConnAlias -> AgentMsgId -> AMessage -> IO (Either StoreError ())
|
||||
insertSndMsg dbConn connAlias agentMsgId aMsg =
|
||||
DB.withTransaction dbConn $ do
|
||||
queues <- retrieveConnQueues_ dbConn connAlias
|
||||
case queues of
|
||||
(_, Just _sndQ) -> do
|
||||
insertMsg_ dbConn connAlias SND agentMsgId aMsg
|
||||
return $ Right ()
|
||||
(Just _rcvQ, Nothing) -> return $ Left SEBadQueueDirection
|
||||
_ -> return $ Left SEBadConn
|
||||
|
||||
-- TODO add parser and serializer for DeliveryStatus? Pass DeliveryStatus?
|
||||
insertMsg :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> Message -> m ()
|
||||
insertMsg store connAlias qDirection agentMsgId msg = do
|
||||
insertMsg_ :: DB.Connection -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> IO ()
|
||||
insertMsg_ dbConn connAlias qDirection agentMsgId aMsg = do
|
||||
let msg = serializeAgentMessage aMsg
|
||||
ts <- liftIO getCurrentTime
|
||||
void $
|
||||
insertWithLock
|
||||
store
|
||||
messagesLock
|
||||
[sql|
|
||||
INSERT INTO messages (conn_alias, agent_msg_id, timestamp, message, direction, msg_status)
|
||||
VALUES (?,?,?,?,?,"MDTransmitted");
|
||||
|]
|
||||
(Only connAlias :. Only agentMsgId :. Only ts :. Only qDirection :. Only msg)
|
||||
DB.executeNamed
|
||||
dbConn
|
||||
insertMsgQuery_
|
||||
[ ":agent_msg_id" := agentMsgId,
|
||||
":conn_alias" := connAlias,
|
||||
":timestamp" := ts,
|
||||
":message" := msg,
|
||||
":direction" := qDirection
|
||||
]
|
||||
|
||||
insertMsgQuery_ :: Query
|
||||
insertMsgQuery_ =
|
||||
[sql|
|
||||
INSERT INTO messages
|
||||
( agent_msg_id, conn_alias, timestamp, message, direction, msg_status)
|
||||
VALUES
|
||||
(:agent_msg_id,:conn_alias,:timestamp,:message,:direction,"MDTransmitted");
|
||||
|]
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.Types where
|
||||
module Simplex.Messaging.Agent.Store.Types
|
||||
( ConnType (..),
|
||||
StoreError (..),
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Exception
|
||||
import Control.Exception (Exception)
|
||||
|
||||
data ConnType = CSend | CReceive | CDuplex deriving (Eq, Show)
|
||||
|
||||
|
||||
@@ -1,20 +1,21 @@
|
||||
{-# LANGUAGE BlockArguments #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module AgentTests.SQLiteTests where
|
||||
module AgentTests.SQLiteTests (storeTests) where
|
||||
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Except (ExceptT, runExceptT)
|
||||
import Data.Word (Word32)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Util (SQLiteStore, conn, dbFilename)
|
||||
import Simplex.Messaging.Agent.Store.Types
|
||||
import Simplex.Messaging.Agent.Transmission
|
||||
import System.Random
|
||||
import System.Random (Random (randomIO))
|
||||
import Test.Hspec
|
||||
import UnliftIO.Directory
|
||||
import UnliftIO.Directory (removeFile)
|
||||
|
||||
testDB :: String
|
||||
testDB = "smp-agent.test.db"
|
||||
@@ -31,7 +32,7 @@ withStore = before createStore . after removeStore
|
||||
|
||||
removeStore :: SQLiteStore -> IO ()
|
||||
removeStore store = do
|
||||
DB.close $ conn store
|
||||
DB.close $ dbConn store
|
||||
removeFile $ dbFilename store
|
||||
|
||||
returnsResult :: (Eq a, Eq e, Show a, Show e) => ExceptT e IO a -> a -> Expectation
|
||||
@@ -40,23 +41,26 @@ action `returnsResult` r = runExceptT action `shouldReturn` Right r
|
||||
throwsError :: (Eq a, Eq e, Show a, Show e) => ExceptT e IO a -> e -> Expectation
|
||||
action `throwsError` e = runExceptT action `shouldReturn` Left e
|
||||
|
||||
-- TODO add null port tests
|
||||
storeTests :: Spec
|
||||
storeTests = withStore do
|
||||
describe "foreign keys enabled" testForeignKeysEnabled
|
||||
describe "store methods" do
|
||||
describe "createRcvConn" testCreateRcvConn
|
||||
describe "createSndConn" testCreateSndConn
|
||||
describe "addSndQueue" testAddSndQueue
|
||||
describe "addRcvQueue" testAddRcvQueue
|
||||
describe "getRcvQueue" testGetRcvQueue
|
||||
describe "deleteConn" do
|
||||
describe "Receive connection" testDeleteConnReceive
|
||||
describe "Send connection" testDeleteConnSend
|
||||
describe "Duplex connection" testDeleteConnDuplex
|
||||
describe "Update queue status" do
|
||||
describe "Receive queue" testupdateRcvQueueStatus
|
||||
describe "Send queue" testupdateSndQueueStatus
|
||||
describe "Duplex connection" testUpdateQueueStatusConnDuplex
|
||||
xdescribe "Nonexistent send queue" testUpdateNonexistentSendQueueStatus
|
||||
xdescribe "Nonexistent receive queue" testUpdateNonexistentReceiveQueueStatus
|
||||
describe "upgradeRcvConnToDuplex" testUpgradeRcvConnToDuplex
|
||||
describe "upgradeSndConnToDuplex" testUpgradeSndConnToDuplex
|
||||
describe "Set queue status" do
|
||||
describe "setRcvQueueStatus" testSetRcvQueueStatus
|
||||
describe "setSndQueueStatus" testSetSndQueueStatus
|
||||
describe "Duplex connection" testSetQueueStatusConnDuplex
|
||||
xdescribe "Nonexistent send queue" testSetNonexistentSendQueueStatus
|
||||
xdescribe "Nonexistent receive queue" testSetNonexistentReceiveQueueStatus
|
||||
describe "createMsg" do
|
||||
describe "A_MSG in RCV direction" testCreateMsgRcv
|
||||
describe "A_MSG in SND direction" testCreateMsgSnd
|
||||
@@ -64,7 +68,19 @@ storeTests = withStore do
|
||||
describe "REPLY message" testCreateMsgReply
|
||||
describe "Bad queue direction - SND" testCreateMsgBadDirectionSnd
|
||||
describe "Bad queue direction - RCV" testCreateMsgBadDirectionRcv
|
||||
describe "getReceiveQueue" testGetReceiveQueue
|
||||
|
||||
testForeignKeysEnabled :: SpecWith SQLiteStore
|
||||
testForeignKeysEnabled = do
|
||||
it "should throw error if foreign keys are enabled" $ \store -> do
|
||||
let inconsistent_query =
|
||||
[sql|
|
||||
INSERT INTO connections
|
||||
(conn_alias, rcv_host, rcv_port, rcv_id, snd_host, snd_port, snd_id)
|
||||
VALUES
|
||||
("conn1", "smp.simplex.im", "5223", "1234", "smp.simplex.im", "5223", "2345");
|
||||
|]
|
||||
DB.execute_ (dbConn store) inconsistent_query
|
||||
`shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint)
|
||||
|
||||
testCreateRcvConn :: SpecWith SQLiteStore
|
||||
testCreateRcvConn = do
|
||||
@@ -73,15 +89,15 @@ testCreateRcvConn = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
@@ -89,13 +105,13 @@ testCreateRcvConn = do
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "3456",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
addSndQueue store "conn1" sndQueue
|
||||
upgradeRcvConnToDuplex store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue sndQueue)
|
||||
@@ -107,13 +123,13 @@ testCreateSndConn = do
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
@@ -121,108 +137,40 @@ testCreateSndConn = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "2345",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "3456",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
addRcvQueue store "conn1" rcvQueue
|
||||
upgradeSndConnToDuplex store "conn1" rcvQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue sndQueue)
|
||||
|
||||
testAddSndQueue :: SpecWith SQLiteStore
|
||||
testAddSndQueue = do
|
||||
it "should throw error on attempts to add send queue to SendConnection or DuplexConnection" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
let anotherSndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "2345",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
addSndQueue store "conn1" anotherSndQueue
|
||||
`throwsError` SEBadConnType CSend
|
||||
testGetRcvQueue :: SpecWith SQLiteStore
|
||||
testGetRcvQueue = do
|
||||
it "should get receive queue and conn alias" $ \store -> do
|
||||
let smpServer = SMPServer "smp.simplex.im" (Just "5223") (Just "1234")
|
||||
let recipientId = "1234"
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "3456",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "4567",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
addRcvQueue store "conn1" rcvQueue
|
||||
`returnsResult` ()
|
||||
addSndQueue store "conn1" anotherSndQueue
|
||||
`throwsError` SEBadConnType CDuplex
|
||||
|
||||
testAddRcvQueue :: SpecWith SQLiteStore
|
||||
testAddRcvQueue = do
|
||||
it "should throw error on attempts to add receive queue to ReceiveConnection or DuplexConnection" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
{ server = smpServer,
|
||||
rcvId = recipientId,
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let anotherRcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "3456",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "4567",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
addRcvQueue store "conn1" anotherRcvQueue
|
||||
`throwsError` SEBadConnType CReceive
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "5678",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
addSndQueue store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
addRcvQueue store "conn1" anotherRcvQueue
|
||||
`throwsError` SEBadConnType CDuplex
|
||||
getRcvQueue store smpServer recipientId
|
||||
`returnsResult` rcvQueue
|
||||
|
||||
testDeleteConnReceive :: SpecWith SQLiteStore
|
||||
testDeleteConnReceive = do
|
||||
@@ -231,22 +179,23 @@ testDeleteConnReceive = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "2345",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "3456",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
deleteConn store "conn1"
|
||||
`returnsResult` ()
|
||||
-- TODO check queues are deleted as well
|
||||
getConn store "conn1"
|
||||
`throwsError` SEInternal
|
||||
`throwsError` SEBadConn
|
||||
|
||||
testDeleteConnSend :: SpecWith SQLiteStore
|
||||
testDeleteConnSend = do
|
||||
@@ -255,20 +204,21 @@ testDeleteConnSend = do
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "2345",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
deleteConn store "conn1"
|
||||
`returnsResult` ()
|
||||
-- TODO check queues are deleted as well
|
||||
getConn store "conn1"
|
||||
`throwsError` SEInternal
|
||||
`throwsError` SEBadConn
|
||||
|
||||
testDeleteConnDuplex :: SpecWith SQLiteStore
|
||||
testDeleteConnDuplex = do
|
||||
@@ -277,153 +227,247 @@ testDeleteConnDuplex = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "4567",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
addSndQueue store "conn1" sndQueue
|
||||
upgradeRcvConnToDuplex store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue sndQueue)
|
||||
deleteConn store "conn1"
|
||||
`returnsResult` ()
|
||||
-- TODO check queues are deleted as well
|
||||
getConn store "conn1"
|
||||
`throwsError` SEInternal
|
||||
`throwsError` SEBadConn
|
||||
|
||||
testupdateRcvQueueStatus :: SpecWith SQLiteStore
|
||||
testupdateRcvQueueStatus = do
|
||||
testUpgradeRcvConnToDuplex :: SpecWith SQLiteStore
|
||||
testUpgradeRcvConnToDuplex = do
|
||||
it "should throw error on attempts to add send queue to SendConnection or DuplexConnection" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New
|
||||
}
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
let anotherSndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "2345",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New
|
||||
}
|
||||
upgradeRcvConnToDuplex store "conn1" anotherSndQueue
|
||||
`throwsError` SEBadConnType CSend
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "3456",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "4567",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New
|
||||
}
|
||||
upgradeSndConnToDuplex store "conn1" rcvQueue
|
||||
`returnsResult` ()
|
||||
upgradeRcvConnToDuplex store "conn1" anotherSndQueue
|
||||
`throwsError` SEBadConnType CDuplex
|
||||
|
||||
testUpgradeSndConnToDuplex :: SpecWith SQLiteStore
|
||||
testUpgradeSndConnToDuplex = do
|
||||
it "should throw error on attempts to add receive queue to ReceiveConnection or DuplexConnection" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New
|
||||
}
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let anotherRcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "3456",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "4567",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New
|
||||
}
|
||||
upgradeSndConnToDuplex store "conn1" anotherRcvQueue
|
||||
`throwsError` SEBadConnType CReceive
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "5678",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New
|
||||
}
|
||||
upgradeRcvConnToDuplex store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
upgradeSndConnToDuplex store "conn1" anotherRcvQueue
|
||||
`throwsError` SEBadConnType CDuplex
|
||||
|
||||
testSetRcvQueueStatus :: SpecWith SQLiteStore
|
||||
testSetRcvQueueStatus = do
|
||||
it "should update status of receive queue" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
updateRcvQueueStatus store rcvQueue Confirmed
|
||||
setRcvQueueStatus store rcvQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue {status = Confirmed})
|
||||
|
||||
testupdateSndQueueStatus :: SpecWith SQLiteStore
|
||||
testupdateSndQueueStatus = do
|
||||
testSetSndQueueStatus :: SpecWith SQLiteStore
|
||||
testSetSndQueueStatus = do
|
||||
it "should update status of send queue" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
setSndQueueStatus store sndQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue {status = Confirmed})
|
||||
|
||||
testUpdateQueueStatusConnDuplex :: SpecWith SQLiteStore
|
||||
testUpdateQueueStatusConnDuplex = do
|
||||
testSetQueueStatusConnDuplex :: SpecWith SQLiteStore
|
||||
testSetQueueStatusConnDuplex = do
|
||||
it "should update statuses of receive and send queues in duplex connection" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "3456",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
addSndQueue store "conn1" sndQueue
|
||||
upgradeRcvConnToDuplex store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue sndQueue)
|
||||
updateRcvQueueStatus store rcvQueue Secured
|
||||
setRcvQueueStatus store rcvQueue Secured
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue {status = Secured} sndQueue)
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
setSndQueueStatus store sndQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue {status = Secured} sndQueue {status = Confirmed})
|
||||
`returnsResult` SomeConn
|
||||
SCDuplex
|
||||
( DuplexConnection "conn1" rcvQueue {status = Secured} sndQueue {status = Confirmed}
|
||||
)
|
||||
|
||||
testUpdateNonexistentSendQueueStatus :: SpecWith SQLiteStore
|
||||
testUpdateNonexistentSendQueueStatus = do
|
||||
testSetNonexistentSendQueueStatus :: SpecWith SQLiteStore
|
||||
testSetNonexistentSendQueueStatus = do
|
||||
it "should throw error on attempt to update status of nonexistent send queue" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
setSndQueueStatus store sndQueue Confirmed
|
||||
`throwsError` SEInternal
|
||||
|
||||
testUpdateNonexistentReceiveQueueStatus :: SpecWith SQLiteStore
|
||||
testUpdateNonexistentReceiveQueueStatus = do
|
||||
testSetNonexistentReceiveQueueStatus :: SpecWith SQLiteStore
|
||||
testSetNonexistentReceiveQueueStatus = do
|
||||
it "should throw error on attempt to update status of nonexistent receive queue" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
updateRcvQueueStatus store rcvQueue Confirmed
|
||||
setRcvQueueStatus store rcvQueue Confirmed
|
||||
`throwsError` SEInternal
|
||||
|
||||
testCreateMsgRcv :: SpecWith SQLiteStore
|
||||
@@ -433,15 +477,15 @@ testCreateMsgRcv = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let msg = A_MSG "hello"
|
||||
let msgId = 1
|
||||
@@ -456,13 +500,13 @@ testCreateMsgSnd = do
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
let msg = A_MSG "hi"
|
||||
let msgId = 1
|
||||
@@ -477,15 +521,15 @@ testCreateMsgHello = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let verificationKey = "abcd"
|
||||
let am = AckMode On
|
||||
@@ -502,15 +546,15 @@ testCreateMsgReply = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let smpServer = SMPServer "smp.simplex.im" (Just "5223") (Just "1234")
|
||||
let senderId = "sender1"
|
||||
@@ -528,15 +572,15 @@ testCreateMsgBadDirectionSnd = do
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
rcvId = "1234",
|
||||
connAlias = "conn1",
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
createRcvConn store rcvQueue
|
||||
`returnsResult` ()
|
||||
let msg = A_MSG "hello"
|
||||
let msgId = 1
|
||||
@@ -550,38 +594,15 @@ testCreateMsgBadDirectionRcv = do
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
connAlias = "conn1",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
status = New
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
createSndConn store sndQueue
|
||||
`returnsResult` ()
|
||||
let msg = A_MSG "hello"
|
||||
let msgId = 1
|
||||
createMsg store "conn1" RCV msgId msg
|
||||
`throwsError` SEBadQueueDirection
|
||||
|
||||
testGetReceiveQueue :: SpecWith SQLiteStore
|
||||
testGetReceiveQueue = do
|
||||
it "should get receive queue and conn alias" $ \store -> do
|
||||
let smpServer = SMPServer "smp.simplex.im" (Just "5223") (Just "1234")
|
||||
let cAlias = "conn1"
|
||||
let recipientId = "1234"
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = smpServer,
|
||||
rcvId = recipientId,
|
||||
rcvPrivateKey = "abcd",
|
||||
sndId = Just "2345",
|
||||
sndKey = Nothing,
|
||||
decryptKey = "dcba",
|
||||
verifyKey = Nothing,
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
createRcvConn store cAlias rcvQueue
|
||||
`returnsResult` ()
|
||||
getReceiveQueue store smpServer recipientId
|
||||
`returnsResult` (cAlias, rcvQueue)
|
||||
|
||||
Reference in New Issue
Block a user