mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 10:15:21 +00:00
trying to use semaphores
This commit is contained in:
@@ -64,6 +64,7 @@ dependencies:
|
||||
- time-manager == 0.0.*
|
||||
- tls >= 1.6.0 && < 1.7
|
||||
- transformers == 0.5.*
|
||||
- unix == 2.7.*
|
||||
- unliftio == 0.2.*
|
||||
- unliftio-core == 0.2.*
|
||||
- websockets == 0.12.*
|
||||
|
||||
@@ -132,6 +132,7 @@ library
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unix ==2.7.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
@@ -193,6 +194,7 @@ executable ntf-server
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unix ==2.7.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
@@ -254,6 +256,7 @@ executable smp-agent
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unix ==2.7.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
@@ -315,6 +318,7 @@ executable smp-server
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unix ==2.7.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
@@ -396,6 +400,7 @@ test-suite smp-server-test
|
||||
, timeit ==2.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unix ==2.7.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
|
||||
@@ -89,6 +89,7 @@ import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (bshow, catchAll_, ifM, liftEitherError, liftError, tryError, unlessM, whenM)
|
||||
import Simplex.Messaging.Version
|
||||
import System.Posix.Semaphore (semPost, semThreadWait)
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async, pooledForConcurrentlyN)
|
||||
import qualified UnliftIO.Exception as E
|
||||
@@ -646,13 +647,14 @@ notifyAgentPhaseChanged AgentClient {subQ, agentEnv = Env {agentPhase, agentOper
|
||||
|
||||
withStore :: AgentMonad m => AgentClient -> (forall m'. AgentStoreMonad m' => SQLiteStore -> m' a) -> m a
|
||||
withStore c action = do
|
||||
st <- asks store
|
||||
atomically $ beginAgentOperation c AODatabase
|
||||
r <- runExceptT (action st `E.catch` handleInternal)
|
||||
atomically $ endAgentOperation c AODatabase
|
||||
case r of
|
||||
Right res -> pure res
|
||||
Left e -> throwError $ storeError e
|
||||
st@SQLiteStore {dbSemaphore} <- asks store
|
||||
E.bracket_ (liftIO $ mapM_ semThreadWait dbSemaphore) (liftIO $ mapM_ semPost dbSemaphore) $ do
|
||||
atomically $ beginAgentOperation c AODatabase
|
||||
r <- runExceptT (action st `E.catch` handleInternal)
|
||||
atomically $ endAgentOperation c AODatabase
|
||||
case r of
|
||||
Right res -> pure res
|
||||
Left e -> throwError $ storeError e
|
||||
where
|
||||
-- TODO when parsing exception happens in store, the agent hangs;
|
||||
-- changing SQLError to SomeException does not help
|
||||
|
||||
@@ -63,6 +63,7 @@ data AgentConfig = AgentConfig
|
||||
connIdBytes :: Int,
|
||||
tbqSize :: Natural,
|
||||
dbFile :: FilePath,
|
||||
dbSemName :: Maybe String,
|
||||
yesToMigrations :: Bool,
|
||||
smpCfg :: ProtocolClientConfig,
|
||||
ntfCfg :: ProtocolClientConfig,
|
||||
@@ -96,6 +97,7 @@ defaultAgentConfig =
|
||||
connIdBytes = 12,
|
||||
tbqSize = 64,
|
||||
dbFile = "smp-agent.db",
|
||||
dbSemName = Nothing,
|
||||
yesToMigrations = False,
|
||||
smpCfg = defaultClientConfig {defaultTransport = ("5223", transport @TLS)},
|
||||
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
|
||||
@@ -134,9 +136,9 @@ disallowedOperations = \case
|
||||
APSuspended -> [AONetwork, AODatabase]
|
||||
|
||||
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
|
||||
newSMPAgentEnv config@AgentConfig {dbFile, yesToMigrations} = do
|
||||
newSMPAgentEnv config@AgentConfig {dbFile, dbSemName, yesToMigrations} = do
|
||||
idsDrg <- newTVarIO =<< drgNew
|
||||
store <- liftIO $ createSQLiteStore dbFile Migrations.app yesToMigrations
|
||||
store <- liftIO $ createSQLiteStore dbFile dbSemName Migrations.app yesToMigrations
|
||||
clientCounter <- newTVarIO 0
|
||||
randomServer <- newTVarIO =<< liftIO newStdGen
|
||||
agentPhase <- newTVarIO (APActive, True)
|
||||
|
||||
@@ -71,6 +71,8 @@ import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (takeDirectory)
|
||||
import System.IO (hFlush, stdout)
|
||||
import System.Posix.Files (stdFileMode)
|
||||
import System.Posix.Semaphore (OpenSemFlags (..), Semaphore, semOpen)
|
||||
import qualified UnliftIO.Exception as E
|
||||
|
||||
-- * SQLite Store implementation
|
||||
@@ -78,14 +80,15 @@ import qualified UnliftIO.Exception as E
|
||||
data SQLiteStore = SQLiteStore
|
||||
{ dbFilePath :: FilePath,
|
||||
dbConnection :: TMVar DB.Connection,
|
||||
dbSemaphore :: Maybe Semaphore,
|
||||
dbNew :: Bool
|
||||
}
|
||||
|
||||
createSQLiteStore :: FilePath -> [Migration] -> Bool -> IO SQLiteStore
|
||||
createSQLiteStore dbFilePath migrations yesToMigrations = do
|
||||
createSQLiteStore :: FilePath -> Maybe String -> [Migration] -> Bool -> IO SQLiteStore
|
||||
createSQLiteStore dbFilePath dbSemName_ migrations yesToMigrations = do
|
||||
let dbDir = takeDirectory dbFilePath
|
||||
createDirectoryIfMissing False dbDir
|
||||
st <- connectSQLiteStore dbFilePath
|
||||
st <- connectSQLiteStore dbFilePath dbSemName_
|
||||
checkThreadsafe st
|
||||
migrateSchema st migrations yesToMigrations
|
||||
pure st
|
||||
@@ -121,11 +124,13 @@ confirmOrExit s = do
|
||||
ok <- getLine
|
||||
when (map toLower ok /= "y") exitFailure
|
||||
|
||||
connectSQLiteStore :: FilePath -> IO SQLiteStore
|
||||
connectSQLiteStore dbFilePath = do
|
||||
connectSQLiteStore :: FilePath -> Maybe String -> IO SQLiteStore
|
||||
connectSQLiteStore dbFilePath dbSemName_ = do
|
||||
dbNew <- not <$> doesFileExist dbFilePath
|
||||
dbConnection <- newTMVarIO =<< connectDB dbFilePath
|
||||
pure SQLiteStore {dbFilePath, dbConnection, dbNew}
|
||||
-- let filemode = unionFileModes ownerReadMode ownerWriteMode
|
||||
dbSemaphore <- forM dbSemName_ $ \name -> semOpen name OpenSemFlags {semCreate = True, semExclusive = False} stdFileMode 1
|
||||
pure SQLiteStore {dbFilePath, dbConnection, dbSemaphore, dbNew}
|
||||
|
||||
connectDB :: FilePath -> IO DB.Connection
|
||||
connectDB path = do
|
||||
|
||||
+2
-1
@@ -38,7 +38,8 @@ extra-deps:
|
||||
- cryptostore-0.2.1.0@sha256:9896e2984f36a1c8790f057fd5ce3da4cbcaf8aa73eb2d9277916886978c5b19,3881
|
||||
- network-3.1.2.7@sha256:e3d78b13db9512aeb106e44a334ab42b7aa48d26c097299084084cb8be5c5568,4888
|
||||
- simple-logger-0.1.0@sha256:be8ede4bd251a9cac776533bae7fb643369ebd826eb948a9a18df1a8dd252ff8,1079
|
||||
- tls-1.6.0@sha256:7ae39373fd2de27fb80e90f76d22aeeb9a074a0ddd120cbd02c9c52f516a9e55,6987 # below dependancies are to update Aeson to 2.0.3
|
||||
- tls-1.6.0@sha256:7ae39373fd2de27fb80e90f76d22aeeb9a074a0ddd120cbd02c9c52f516a9e55,6987
|
||||
# below dependancies are to update Aeson to 2.0.3
|
||||
- OneTuple-0.3.1@sha256:a848c096c9d29e82ffdd30a9998aa2931cbccb3a1bc137539d80f6174d31603e,2262
|
||||
- attoparsec-0.14.4@sha256:79584bdada8b730cb5138fca8c35c76fbef75fc1d1e01e6b1d815a5ee9843191,5810
|
||||
- hashable-1.4.0.2@sha256:0cddd0229d1aac305ea0404409c0bbfab81f075817bd74b8b2929eff58333e55,5005
|
||||
|
||||
@@ -29,6 +29,8 @@ import Simplex.Messaging.Protocol (ErrorType (..), MsgBody)
|
||||
import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..))
|
||||
import Simplex.Messaging.Util (bshow)
|
||||
import System.Directory (removeFile)
|
||||
import System.Posix.Files (stdFileMode)
|
||||
import System.Posix.Semaphore (OpenSemFlags (..), semOpen, semPost, semThreadWait)
|
||||
import System.Timeout
|
||||
import Test.Hspec
|
||||
|
||||
@@ -42,6 +44,7 @@ agentTests (ATransport t) = do
|
||||
describe "SQLite schema dump" schemaDumpTest
|
||||
describe "SMP agent protocol syntax" $ syntaxTests t
|
||||
describe "Establishing duplex connection" $ do
|
||||
it "test semaphore" testSemaphore
|
||||
it "should connect via one server and one agent" $
|
||||
smpAgentTest2_1_1 $ testDuplexConnection t
|
||||
it "should connect via one server and one agent (random IDs)" $
|
||||
@@ -121,6 +124,12 @@ h #:# err = tryGet `shouldReturn` ()
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent
|
||||
pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody
|
||||
|
||||
testSemaphore :: IO ()
|
||||
testSemaphore = do
|
||||
sem <- semOpen "/test" OpenSemFlags {semCreate = True, semExclusive = False} stdFileMode 1
|
||||
semThreadWait sem
|
||||
semPost sem
|
||||
|
||||
testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO ()
|
||||
testDuplexConnection _ alice bob = do
|
||||
("1", "bob", Right (INV cReq)) <- alice #: ("1", "bob", "NEW INV")
|
||||
|
||||
@@ -44,7 +44,7 @@ withStore2 = before connect2 . after (removeStore . fst)
|
||||
connect2 :: IO (SQLiteStore, SQLiteStore)
|
||||
connect2 = do
|
||||
s1 <- createStore
|
||||
s2 <- connectSQLiteStore (dbFilePath s1)
|
||||
s2 <- connectSQLiteStore (dbFilePath s1) Nothing
|
||||
pure (s1, s2)
|
||||
|
||||
createStore :: IO SQLiteStore
|
||||
@@ -52,7 +52,7 @@ createStore = do
|
||||
-- Randomize DB file name to avoid SQLite IO errors supposedly caused by asynchronous
|
||||
-- IO operations on multiple similarly named files; error seems to be environment specific
|
||||
r <- randomIO :: IO Word32
|
||||
createSQLiteStore (testDB <> show r) Migrations.app True
|
||||
createSQLiteStore (testDB <> show r) Nothing Migrations.app True
|
||||
|
||||
removeStore :: SQLiteStore -> IO ()
|
||||
removeStore store = do
|
||||
|
||||
@@ -20,7 +20,7 @@ schemaDumpTest =
|
||||
|
||||
testVerifySchemaDump :: IO ()
|
||||
testVerifySchemaDump = do
|
||||
void $ createSQLiteStore testDB Migrations.app False
|
||||
void $ createSQLiteStore testDB Nothing Migrations.app False
|
||||
void $ readCreateProcess (shell $ "touch " <> schema) ""
|
||||
savedSchema <- readFile schema
|
||||
savedSchema `seq` pure ()
|
||||
|
||||
@@ -171,6 +171,7 @@ agentCfg =
|
||||
{ tcpPort = agentTestPort,
|
||||
tbqSize = 1,
|
||||
dbFile = testDB,
|
||||
dbSemName = Nothing, -- Just "/simplex-agent",
|
||||
smpCfg =
|
||||
defaultClientConfig
|
||||
{ qSize = 1,
|
||||
|
||||
Reference in New Issue
Block a user