diff --git a/package.yaml b/package.yaml index 18a3d2a8f..d55fba282 100644 --- a/package.yaml +++ b/package.yaml @@ -64,6 +64,7 @@ dependencies: - time-manager == 0.0.* - tls >= 1.6.0 && < 1.7 - transformers == 0.5.* + - unix == 2.7.* - unliftio == 0.2.* - unliftio-core == 0.2.* - websockets == 0.12.* diff --git a/simplexmq.cabal b/simplexmq.cabal index dcf76cd0d..1733b4718 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -132,6 +132,7 @@ library , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* + , unix ==2.7.* , unliftio ==0.2.* , unliftio-core ==0.2.* , websockets ==0.12.* @@ -193,6 +194,7 @@ executable ntf-server , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* + , unix ==2.7.* , unliftio ==0.2.* , unliftio-core ==0.2.* , websockets ==0.12.* @@ -254,6 +256,7 @@ executable smp-agent , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* + , unix ==2.7.* , unliftio ==0.2.* , unliftio-core ==0.2.* , websockets ==0.12.* @@ -315,6 +318,7 @@ executable smp-server , time-manager ==0.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* + , unix ==2.7.* , unliftio ==0.2.* , unliftio-core ==0.2.* , websockets ==0.12.* @@ -396,6 +400,7 @@ test-suite smp-server-test , timeit ==2.0.* , tls >=1.6.0 && <1.7 , transformers ==0.5.* + , unix ==2.7.* , unliftio ==0.2.* , unliftio-core ==0.2.* , websockets ==0.12.* diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index ceae7fc56..78cd177f0 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -89,6 +89,7 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (bshow, catchAll_, ifM, liftEitherError, liftError, tryError, unlessM, whenM) import Simplex.Messaging.Version +import System.Posix.Semaphore (semPost, semThreadWait) import System.Timeout (timeout) import UnliftIO (async, pooledForConcurrentlyN) import qualified UnliftIO.Exception as E @@ -646,13 +647,14 @@ notifyAgentPhaseChanged AgentClient {subQ, agentEnv = Env {agentPhase, agentOper withStore :: AgentMonad m => AgentClient -> (forall m'. AgentStoreMonad m' => SQLiteStore -> m' a) -> m a withStore c action = do - st <- asks store - atomically $ beginAgentOperation c AODatabase - r <- runExceptT (action st `E.catch` handleInternal) - atomically $ endAgentOperation c AODatabase - case r of - Right res -> pure res - Left e -> throwError $ storeError e + st@SQLiteStore {dbSemaphore} <- asks store + E.bracket_ (liftIO $ mapM_ semThreadWait dbSemaphore) (liftIO $ mapM_ semPost dbSemaphore) $ do + atomically $ beginAgentOperation c AODatabase + r <- runExceptT (action st `E.catch` handleInternal) + atomically $ endAgentOperation c AODatabase + case r of + Right res -> pure res + Left e -> throwError $ storeError e where -- TODO when parsing exception happens in store, the agent hangs; -- changing SQLError to SomeException does not help diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ffd5af8b7..5695a0e8c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -63,6 +63,7 @@ data AgentConfig = AgentConfig connIdBytes :: Int, tbqSize :: Natural, dbFile :: FilePath, + dbSemName :: Maybe String, yesToMigrations :: Bool, smpCfg :: ProtocolClientConfig, ntfCfg :: ProtocolClientConfig, @@ -96,6 +97,7 @@ defaultAgentConfig = connIdBytes = 12, tbqSize = 64, dbFile = "smp-agent.db", + dbSemName = Nothing, yesToMigrations = False, smpCfg = defaultClientConfig {defaultTransport = ("5223", transport @TLS)}, ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)}, @@ -134,9 +136,9 @@ disallowedOperations = \case APSuspended -> [AONetwork, AODatabase] newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env -newSMPAgentEnv config@AgentConfig {dbFile, yesToMigrations} = do +newSMPAgentEnv config@AgentConfig {dbFile, dbSemName, yesToMigrations} = do idsDrg <- newTVarIO =<< drgNew - store <- liftIO $ createSQLiteStore dbFile Migrations.app yesToMigrations + store <- liftIO $ createSQLiteStore dbFile dbSemName Migrations.app yesToMigrations clientCounter <- newTVarIO 0 randomServer <- newTVarIO =<< liftIO newStdGen agentPhase <- newTVarIO (APActive, True) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 99e1f955e..a2f254bb4 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -71,6 +71,8 @@ import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) import System.FilePath (takeDirectory) import System.IO (hFlush, stdout) +import System.Posix.Files (stdFileMode) +import System.Posix.Semaphore (OpenSemFlags (..), Semaphore, semOpen) import qualified UnliftIO.Exception as E -- * SQLite Store implementation @@ -78,14 +80,15 @@ import qualified UnliftIO.Exception as E data SQLiteStore = SQLiteStore { dbFilePath :: FilePath, dbConnection :: TMVar DB.Connection, + dbSemaphore :: Maybe Semaphore, dbNew :: Bool } -createSQLiteStore :: FilePath -> [Migration] -> Bool -> IO SQLiteStore -createSQLiteStore dbFilePath migrations yesToMigrations = do +createSQLiteStore :: FilePath -> Maybe String -> [Migration] -> Bool -> IO SQLiteStore +createSQLiteStore dbFilePath dbSemName_ migrations yesToMigrations = do let dbDir = takeDirectory dbFilePath createDirectoryIfMissing False dbDir - st <- connectSQLiteStore dbFilePath + st <- connectSQLiteStore dbFilePath dbSemName_ checkThreadsafe st migrateSchema st migrations yesToMigrations pure st @@ -121,11 +124,13 @@ confirmOrExit s = do ok <- getLine when (map toLower ok /= "y") exitFailure -connectSQLiteStore :: FilePath -> IO SQLiteStore -connectSQLiteStore dbFilePath = do +connectSQLiteStore :: FilePath -> Maybe String -> IO SQLiteStore +connectSQLiteStore dbFilePath dbSemName_ = do dbNew <- not <$> doesFileExist dbFilePath dbConnection <- newTMVarIO =<< connectDB dbFilePath - pure SQLiteStore {dbFilePath, dbConnection, dbNew} + -- let filemode = unionFileModes ownerReadMode ownerWriteMode + dbSemaphore <- forM dbSemName_ $ \name -> semOpen name OpenSemFlags {semCreate = True, semExclusive = False} stdFileMode 1 + pure SQLiteStore {dbFilePath, dbConnection, dbSemaphore, dbNew} connectDB :: FilePath -> IO DB.Connection connectDB path = do diff --git a/stack.yaml b/stack.yaml index f6a60ed74..35e4c30c8 100644 --- a/stack.yaml +++ b/stack.yaml @@ -38,7 +38,8 @@ extra-deps: - cryptostore-0.2.1.0@sha256:9896e2984f36a1c8790f057fd5ce3da4cbcaf8aa73eb2d9277916886978c5b19,3881 - network-3.1.2.7@sha256:e3d78b13db9512aeb106e44a334ab42b7aa48d26c097299084084cb8be5c5568,4888 - simple-logger-0.1.0@sha256:be8ede4bd251a9cac776533bae7fb643369ebd826eb948a9a18df1a8dd252ff8,1079 - - tls-1.6.0@sha256:7ae39373fd2de27fb80e90f76d22aeeb9a074a0ddd120cbd02c9c52f516a9e55,6987 # below dependancies are to update Aeson to 2.0.3 + - tls-1.6.0@sha256:7ae39373fd2de27fb80e90f76d22aeeb9a074a0ddd120cbd02c9c52f516a9e55,6987 + # below dependancies are to update Aeson to 2.0.3 - OneTuple-0.3.1@sha256:a848c096c9d29e82ffdd30a9998aa2931cbccb3a1bc137539d80f6174d31603e,2262 - attoparsec-0.14.4@sha256:79584bdada8b730cb5138fca8c35c76fbef75fc1d1e01e6b1d815a5ee9843191,5810 - hashable-1.4.0.2@sha256:0cddd0229d1aac305ea0404409c0bbfab81f075817bd74b8b2929eff58333e55,5005 diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index c921e91c2..fd357953d 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -29,6 +29,8 @@ import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..)) import Simplex.Messaging.Util (bshow) import System.Directory (removeFile) +import System.Posix.Files (stdFileMode) +import System.Posix.Semaphore (OpenSemFlags (..), semOpen, semPost, semThreadWait) import System.Timeout import Test.Hspec @@ -42,6 +44,7 @@ agentTests (ATransport t) = do describe "SQLite schema dump" schemaDumpTest describe "SMP agent protocol syntax" $ syntaxTests t describe "Establishing duplex connection" $ do + it "test semaphore" testSemaphore it "should connect via one server and one agent" $ smpAgentTest2_1_1 $ testDuplexConnection t it "should connect via one server and one agent (random IDs)" $ @@ -121,6 +124,12 @@ h #:# err = tryGet `shouldReturn` () pattern Msg :: MsgBody -> ACommand 'Agent pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody +testSemaphore :: IO () +testSemaphore = do + sem <- semOpen "/test" OpenSemFlags {semCreate = True, semExclusive = False} stdFileMode 1 + semThreadWait sem + semPost sem + testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO () testDuplexConnection _ alice bob = do ("1", "bob", Right (INV cReq)) <- alice #: ("1", "bob", "NEW INV") diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 47a818503..e5e2b1f83 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -44,7 +44,7 @@ withStore2 = before connect2 . after (removeStore . fst) connect2 :: IO (SQLiteStore, SQLiteStore) connect2 = do s1 <- createStore - s2 <- connectSQLiteStore (dbFilePath s1) + s2 <- connectSQLiteStore (dbFilePath s1) Nothing pure (s1, s2) createStore :: IO SQLiteStore @@ -52,7 +52,7 @@ createStore = do -- Randomize DB file name to avoid SQLite IO errors supposedly caused by asynchronous -- IO operations on multiple similarly named files; error seems to be environment specific r <- randomIO :: IO Word32 - createSQLiteStore (testDB <> show r) Migrations.app True + createSQLiteStore (testDB <> show r) Nothing Migrations.app True removeStore :: SQLiteStore -> IO () removeStore store = do diff --git a/tests/AgentTests/SchemaDump.hs b/tests/AgentTests/SchemaDump.hs index 03baa28b2..ae24b6abb 100644 --- a/tests/AgentTests/SchemaDump.hs +++ b/tests/AgentTests/SchemaDump.hs @@ -20,7 +20,7 @@ schemaDumpTest = testVerifySchemaDump :: IO () testVerifySchemaDump = do - void $ createSQLiteStore testDB Migrations.app False + void $ createSQLiteStore testDB Nothing Migrations.app False void $ readCreateProcess (shell $ "touch " <> schema) "" savedSchema <- readFile schema savedSchema `seq` pure () diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index fd3d5b980..28954e5a2 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -171,6 +171,7 @@ agentCfg = { tcpPort = agentTestPort, tbqSize = 1, dbFile = testDB, + dbSemName = Nothing, -- Just "/simplex-agent", smpCfg = defaultClientConfig { qSize = 1,