mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-18 02:35:16 +00:00
wip
This commit is contained in:
@@ -178,12 +178,16 @@ withStore action = do
|
||||
st <- asks store
|
||||
runExceptT (action st `E.catch` handleInternal) >>= \case
|
||||
Right c -> return c
|
||||
Left e -> throwError $ storeError e
|
||||
Left e -> do
|
||||
liftIO $ print e
|
||||
throwError $ storeError e
|
||||
where
|
||||
-- TODO when parsing exception happens in store, the agent hangs;
|
||||
-- changing SQLError to SomeException does not help
|
||||
handleInternal :: (MonadError StoreError m') => SQLError -> m' a
|
||||
handleInternal e = throwError . SEInternal $ bshow e
|
||||
handleInternal :: (MonadUnliftIO m', MonadError StoreError m') => SQLError -> m' a
|
||||
handleInternal e = do
|
||||
liftIO $ print e
|
||||
throwError . SEInternal $ bshow e
|
||||
storeError :: StoreError -> AgentErrorType
|
||||
storeError = \case
|
||||
SEConnNotFound -> CONN NOT_FOUND
|
||||
@@ -238,8 +242,11 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2
|
||||
g <- asks idsDrg
|
||||
let cData = ConnData {connId}
|
||||
connId' <- withStore $ \st -> do
|
||||
liftIO $ print "before: createSndConn st g cData sq"
|
||||
connId' <- createSndConn st g cData sq
|
||||
liftIO $ print "before: createRatchet st connId' rc"
|
||||
createRatchet st connId' rc
|
||||
liftIO $ print "after: createRatchet st connId' rc"
|
||||
pure connId'
|
||||
confirmQueue c connId' sq smpConf $ Just e2eSndParams
|
||||
void $ enqueueMessage c connId' sq HELLO
|
||||
@@ -621,9 +628,12 @@ processSMPTransmission c@AgentClient {subQ} (srv, rId, cmd) = do
|
||||
Nothing -> notify . ERR $ AGENT A_VERSION
|
||||
Just qInfo' -> do
|
||||
(sq, smpConf) <- newSndQueue qInfo' ownConnInfo
|
||||
liftIO $ print "before: upgradeRcvConnToDuplex st connId sq"
|
||||
withStore $ \st -> upgradeRcvConnToDuplex st connId sq
|
||||
confirmQueue c connId sq smpConf Nothing
|
||||
liftIO $ print "before: `removeConfirmations` connId"
|
||||
withStore (`removeConfirmations` connId)
|
||||
liftIO $ print "after: `removeConfirmations` connId"
|
||||
void $ enqueueMessage c connId sq HELLO
|
||||
_ -> prohibited
|
||||
|
||||
|
||||
@@ -73,6 +73,8 @@ import System.Exit (exitFailure)
|
||||
import System.FilePath (takeDirectory)
|
||||
import System.IO (hFlush, stdout)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Simplex.Messaging.Crypto (KeyHash)
|
||||
|
||||
-- * Postgres Store implementation
|
||||
|
||||
@@ -169,6 +171,7 @@ createConn_ st gVar cData create = do
|
||||
case cData of
|
||||
ConnData {connId = ""} -> createWithRandomId gVar $ create db
|
||||
ConnData {connId} -> create db connId $> Right connId
|
||||
liftIO $ print "before: getConn_ db connId"
|
||||
conn <- liftIO $ withTransaction st $ \db -> getConn_ db connId
|
||||
liftIO $ print conn
|
||||
pure connId
|
||||
@@ -184,13 +187,9 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore PostgresS
|
||||
createSndConn :: PostgresStore -> TVar ChaChaDRG -> ConnData -> SndQueue -> m ConnId
|
||||
createSndConn st gVar cData q@SndQueue {server} =
|
||||
createConn_ st gVar cData $ \db connId -> do
|
||||
print "upsertServer_ db server"
|
||||
upsertServer_ db server
|
||||
print "execute db \"INSERT INTO connections (conn_id, conn_mode) VALUES (?, ?)\" (connId, SCMInvitation)"
|
||||
execute db "INSERT INTO connections (conn_id, conn_mode) VALUES (?, ?)" (connId, SCMInvitation)
|
||||
print "insertSndQueue_ db connId q"
|
||||
insertSndQueue_ db connId q -- ! fails here - comment this line to see data written to db
|
||||
print "after insertSndQueue_ db connId q"
|
||||
insertSndQueue_ db connId q
|
||||
|
||||
getConn :: PostgresStore -> ConnId -> m SomeConn
|
||||
getConn st connId =
|
||||
@@ -694,7 +693,7 @@ insertSndQueue_ dbConn connId SndQueue {..} = do
|
||||
VALUES
|
||||
(?,?,?,?,?,?,?);
|
||||
|]
|
||||
(host server, port server, sndId, connId, sndPrivateKey, e2eDhSecret, status)
|
||||
(host server, port server, DB.Binary sndId, connId, sndPrivateKey, e2eDhSecret, status)
|
||||
|
||||
-- * getConn helpers
|
||||
|
||||
@@ -703,8 +702,13 @@ getConn_ dbConn connId =
|
||||
getConnData_ dbConn connId >>= \case
|
||||
Nothing -> pure $ Left SEConnNotFound
|
||||
Just (connData, cMode) -> do
|
||||
liftIO $ print "before: getRcvQueueByConnId_ dbConn connId"
|
||||
rQ <- getRcvQueueByConnId_ dbConn connId
|
||||
liftIO $ print $ "rQ: " <> show rQ
|
||||
liftIO $ print "before: getSndQueueByConnId_ dbConn connId"
|
||||
sQ <- getSndQueueByConnId_ dbConn connId
|
||||
liftIO $ print $ "sQ: " <> show sQ
|
||||
liftIO $ print "after: getSndQueueByConnId_ dbConn connId"
|
||||
pure $ case (rQ, sQ, cMode) of
|
||||
(Just rcvQ, Just sndQ, CMInvitation) -> Right $ SomeConn SCDuplex (DuplexConnection connData rcvQ sndQ)
|
||||
(Just rcvQ, Nothing, CMInvitation) -> Right $ SomeConn SCRcv (RcvConnection connData rcvQ)
|
||||
@@ -740,9 +744,34 @@ getRcvQueueByConnId_ dbConn connId =
|
||||
rcvQueue _ = Nothing
|
||||
|
||||
getSndQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe SndQueue)
|
||||
getSndQueueByConnId_ dbConn connId =
|
||||
sndQueue
|
||||
<$> DB.query
|
||||
getSndQueueByConnId_ dbConn connId = do
|
||||
-- sndQueue
|
||||
-- <$> DB.query
|
||||
-- dbConn
|
||||
-- -- [sql|
|
||||
-- -- SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_private_key, q.e2e_dh_secret, q.status
|
||||
-- -- FROM snd_queues q
|
||||
-- -- INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
-- -- WHERE q.conn_id = ?;
|
||||
-- -- |]
|
||||
-- [sql|
|
||||
-- SELECT s.key_hash, q.host, q.port, q.snd_private_key, q.status
|
||||
-- FROM snd_queues q
|
||||
-- INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
-- WHERE q.conn_id = ?;
|
||||
-- |]
|
||||
-- (Only connId)
|
||||
print "inside: getSndQueueByConnId_"
|
||||
-- r1 <- (DB.query
|
||||
-- dbConn
|
||||
-- [sql|
|
||||
-- SELECT host, port, key_hash
|
||||
-- FROM servers
|
||||
-- WHERE host = ?
|
||||
-- |]
|
||||
-- (DB.Only ("localhost" :: HostName))) :: (IO [(HostName, ServiceName, KeyHash)])
|
||||
-- putStrLn $ show r1
|
||||
r <- DB.query
|
||||
dbConn
|
||||
[sql|
|
||||
SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_private_key, q.e2e_dh_secret, q.status
|
||||
@@ -750,12 +779,26 @@ getSndQueueByConnId_ dbConn connId =
|
||||
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
WHERE q.conn_id = ?;
|
||||
|]
|
||||
(Only connId)
|
||||
-- [sql|
|
||||
-- SELECT q.host, q.port, q.status
|
||||
-- FROM snd_queues q
|
||||
-- INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
-- WHERE q.conn_id = ?;
|
||||
-- |]
|
||||
(DB.Only connId)
|
||||
print $ "r: " <> show r
|
||||
let q = sndQueue r
|
||||
print $ "q: " <> show q
|
||||
pure q
|
||||
where
|
||||
sndQueue [(keyHash, host, port, sndId, sndPrivateKey, e2eDhSecret, status)] =
|
||||
sndQueue [(keyHash, host, port, DB.Binary sndId, sndPrivateKey, e2eDhSecret, status)] =
|
||||
let server = SMPServer host port keyHash
|
||||
in Just SndQueue {server, sndId, sndPrivateKey, e2eDhSecret, status}
|
||||
sndQueue _ = Nothing
|
||||
-- sndQueue [(host, port, status)] = do
|
||||
-- let server = SMPServer host port "abcd"
|
||||
-- in Just SndQueue {server, sndId="3456", sndPrivateKey=(C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"), e2eDhSecret="MCowBQYDK2VuAyEAjiswwI3O_NlS8Fk3HJUW870EY2bAwmttMBsvRB9eV3o=", status}
|
||||
-- sndQueue _ = Nothing
|
||||
|
||||
-- * updateRcvIds helpers
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ import Data.String
|
||||
import Data.Type.Equality
|
||||
import Data.Typeable (Typeable)
|
||||
import Data.X509
|
||||
import qualified Database.PostgreSQL.Simple as PDB
|
||||
import qualified Database.PostgreSQL.Simple.FromField as PF
|
||||
import qualified Database.PostgreSQL.Simple.ToField as PT
|
||||
import qualified Database.PostgreSQL.Simple.TypeInfo as PTI
|
||||
@@ -161,6 +162,7 @@ import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (blobFieldDecoder, parseAll, parseString)
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
import qualified Database.PostgreSQL.Simple as PDB
|
||||
|
||||
-- | Cryptographic algorithms.
|
||||
data Algorithm = Ed25519 | Ed448 | X25519 | X448
|
||||
@@ -584,7 +586,7 @@ instance AlgorithmI a => PT.ToField (PrivateKey a) where toField = PT.toField .
|
||||
|
||||
instance AlgorithmI a => PT.ToField (PublicKey a) where toField = PT.toField . encodePubKey
|
||||
|
||||
instance PT.ToField (DhSecret a) where toField = PT.toField . dhBytes'
|
||||
instance PT.ToField (DhSecret a) where toField = PT.toField . PDB.Binary . dhBytes'
|
||||
|
||||
instance PF.FromField APrivateSignKey where fromField = fromByteStringField decodePrivKey
|
||||
|
||||
@@ -598,7 +600,8 @@ instance (Typeable a, AlgorithmI a) => PF.FromField (PrivateKey a) where fromFie
|
||||
|
||||
instance (Typeable a, AlgorithmI a) => PF.FromField (PublicKey a) where fromField = fromByteStringField decodePubKey
|
||||
|
||||
instance (Typeable a, AlgorithmI a) => PF.FromField (DhSecret a) where fromField = fromByteStringField strDecode
|
||||
-- instance (Typeable a, AlgorithmI a) => PF.FromField (DhSecret a) where fromField = fromByteStringField strDecode
|
||||
instance (Typeable a, AlgorithmI a) => PF.FromField (DhSecret a) where fromField x = fromByteStringField strDecode x
|
||||
|
||||
instance IsString (Maybe ASignature) where
|
||||
fromString = parseString $ decode >=> decodeSignature
|
||||
|
||||
+4
-1
@@ -13,6 +13,7 @@ import AgentTests.ConnectionRequestTests
|
||||
import AgentTests.DoubleRatchetTests (doubleRatchetTests)
|
||||
import AgentTests.FunctionalAPITests (functionalAPITests)
|
||||
import AgentTests.SQLiteTests (storeTests)
|
||||
import AgentTests.PostgresTests (postgresStoreTests)
|
||||
import Control.Concurrent
|
||||
import Control.Monad (forM_)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
@@ -36,6 +37,7 @@ agentTests (ATransport t) = do
|
||||
describe "Double ratchet tests" doubleRatchetTests
|
||||
describe "Functional API" $ functionalAPITests (ATransport t)
|
||||
describe "SQLite store" storeTests
|
||||
describe "Postgres store" postgresStoreTests
|
||||
describe "SMP agent protocol syntax" $ syntaxTests t
|
||||
describe "Establishing duplex connection" $ do
|
||||
it "should connect via one server and one agent" $
|
||||
@@ -422,9 +424,10 @@ syntaxTests t = do
|
||||
-- TODO: add tests with defined connection id
|
||||
it "with incorrect parameter" $ ("222", "", "NEW hi") >#> ("222", "", "ERR CMD SYNTAX")
|
||||
|
||||
-- focus this test to test postgres
|
||||
describe "JOIN" $ do
|
||||
describe "valid" $ do
|
||||
fit "using same server as in invitation" $
|
||||
it "using same server as in invitation" $
|
||||
( "311",
|
||||
"a",
|
||||
"JOIN https://simpex.chat/invitation#/?smp=smp%3A%2F%2F"
|
||||
|
||||
@@ -0,0 +1,136 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
module AgentTests.PostgresTests (postgresStoreTests) where
|
||||
|
||||
import Control.Concurrent.Async (concurrently_)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (replicateM_)
|
||||
import Control.Monad.Except (ExceptT, runExceptT)
|
||||
import Crypto.Random (drgNew)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Time
|
||||
import Data.Word (Word32)
|
||||
import Database.PostgreSQL.Simple (ConnectInfo (..), defaultConnectInfo)
|
||||
import qualified Database.PostgreSQL.Simple as DB
|
||||
import SMPClient (testKeyHash)
|
||||
import Simplex.Messaging.Agent.Client ()
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.Postgres
|
||||
import qualified Simplex.Messaging.Agent.Store.Postgres.Migrations as Migrations
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import System.Random
|
||||
import Test.Hspec
|
||||
import UnliftIO.Directory (removeFile)
|
||||
|
||||
withStore :: SpecWith PostgresStore -> Spec
|
||||
withStore = before createStore
|
||||
|
||||
createStore :: IO PostgresStore
|
||||
createStore = do
|
||||
let dbConnInfo = defaultConnectInfo {connectDatabase = "agent_poc_1"}
|
||||
createPostgresStore dbConnInfo 1 Migrations.app
|
||||
|
||||
returnsResult :: (Eq a, Eq e, Show a, Show e) => ExceptT e IO a -> a -> Expectation
|
||||
action `returnsResult` r = runExceptT action `shouldReturn` Right r
|
||||
|
||||
throwsError :: (Eq a, Eq e, Show a, Show e) => ExceptT e IO a -> e -> Expectation
|
||||
action `throwsError` e = runExceptT action `shouldReturn` Left e
|
||||
|
||||
-- TODO add null port tests
|
||||
postgresStoreTests :: Spec
|
||||
postgresStoreTests = do
|
||||
-- withStore2 $ do
|
||||
-- describe "stress test" testConcurrentWrites
|
||||
withStore $ do
|
||||
-- describe "store setup" $ do
|
||||
-- testCompiledThreadsafe
|
||||
-- testForeignKeysEnabled
|
||||
describe "store methods" $ do
|
||||
describe "Queue and Connection management" $ do
|
||||
-- describe "createRcvConn" $ do
|
||||
-- testCreateRcvConn
|
||||
-- testCreateRcvConnRandomId
|
||||
-- testCreateRcvConnDuplicate
|
||||
fdescribe "createSndConn" $ do
|
||||
testCreateSndConn
|
||||
|
||||
-- testCreateSndConnRandomID
|
||||
-- testCreateSndConnDuplicate
|
||||
-- describe "getRcvConn" testGetRcvConn
|
||||
-- describe "deleteConn" $ do
|
||||
-- testDeleteRcvConn
|
||||
-- testDeleteSndConn
|
||||
-- testDeleteDuplexConn
|
||||
-- describe "upgradeRcvConnToDuplex" $ do
|
||||
-- testUpgradeRcvConnToDuplex
|
||||
-- describe "upgradeSndConnToDuplex" $ do
|
||||
-- testUpgradeSndConnToDuplex
|
||||
-- describe "set Queue status" $ do
|
||||
-- describe "setRcvQueueStatus" $ do
|
||||
-- testSetRcvQueueStatus
|
||||
-- describe "setSndQueueStatus" $ do
|
||||
-- testSetSndQueueStatus
|
||||
-- testSetQueueStatusDuplex
|
||||
-- describe "Msg management" $ do
|
||||
-- describe "create Msg" $ do
|
||||
-- testCreateRcvMsg
|
||||
-- testCreateSndMsg
|
||||
-- testCreateRcvAndSndMsgs
|
||||
|
||||
cData1 :: ConnData
|
||||
cData1 = ConnData {connId = "conn1"}
|
||||
|
||||
testPrivateSignKey :: C.APrivateSignKey
|
||||
testPrivateSignKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"
|
||||
|
||||
testPrivDhKey :: C.PrivateKeyX25519
|
||||
testPrivDhKey = "MC4CAQAwBQYDK2VuBCIEINCzbVFaCiYHoYncxNY8tSIfn0pXcIAhLBfFc0m+gOpk"
|
||||
|
||||
testDhSecret :: C.DhSecretX25519
|
||||
testDhSecret = "01234567890123456789012345678901"
|
||||
|
||||
rcvQueue1 :: RcvQueue
|
||||
rcvQueue1 =
|
||||
RcvQueue
|
||||
{ server = SMPServer "smp.simplex.im" "5223" testKeyHash,
|
||||
rcvId = "1234",
|
||||
rcvPrivateKey = testPrivateSignKey,
|
||||
rcvDhSecret = testDhSecret,
|
||||
e2ePrivKey = testPrivDhKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = Just "2345",
|
||||
status = New
|
||||
}
|
||||
|
||||
sndQueue1 :: SndQueue
|
||||
sndQueue1 =
|
||||
SndQueue
|
||||
{ server = SMPServer "smp.simplex.im" "5223" testKeyHash,
|
||||
sndId = "3456",
|
||||
sndPrivateKey = testPrivateSignKey,
|
||||
e2eDhSecret = testDhSecret,
|
||||
status = New
|
||||
}
|
||||
|
||||
testCreateSndConn :: SpecWith PostgresStore
|
||||
testCreateSndConn =
|
||||
it "should create SndConnection and add RcvQueue" $ \store -> do
|
||||
g <- newTVarIO =<< drgNew
|
||||
createSndConn store g cData1 sndQueue1
|
||||
`returnsResult` "conn1"
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSnd (SndConnection cData1 sndQueue1)
|
||||
|
||||
-- upgradeSndConnToDuplex store "conn1" rcvQueue1
|
||||
-- `returnsResult` ()
|
||||
-- getConn store "conn1"
|
||||
-- `returnsResult` SomeConn SCDuplex (DuplexConnection cData1 rcvQueue1 sndQueue1)
|
||||
Reference in New Issue
Block a user