smp server: optimize concurrency and memory usage, refactor (#1544)

* smp server: optimize concurrency and memory usage, refactor

* hide clients IntMap

* reduce STM contention

* comment

* version

* correct stats for subscriptions

* version

* comment

* remove subscribed clients from map

* version

* optimze, refactor

* version

* debug test

* enable all tests

* remove test logs

* retry failed tests with debug logging

* increase test timeout

* sync between tests
This commit is contained in:
Evgeny
2025-05-23 12:52:18 +01:00
committed by GitHub
parent d352d518c2
commit af9ca59e51
43 changed files with 495 additions and 293 deletions
+1
View File
@@ -519,6 +519,7 @@ test-suite simplexmq-test
, generic-random ==1.5.*
, hashable
, hspec ==2.11.*
, hspec-core ==2.11.*
, http-client
, http-types
, http2
@@ -53,7 +53,6 @@ data NtfServerConfig = NtfServerConfig
subIdBytes :: Int,
regCodeBytes :: Int,
clientQSize :: Natural,
subQSize :: Natural,
pushQSize :: Natural,
smpAgentCfg :: SMPClientAgentConfig,
apnsConfig :: APNSPushClientConfig,
@@ -94,11 +93,11 @@ data NtfEnv = NtfEnv
}
newNtfServerEnv :: NtfServerConfig -> IO NtfEnv
newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do
newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do
when (compactLog startOptions) $ compactDbStoreLog $ dbStoreLogPath dbStoreConfig
random <- C.newRandom
store <- newNtfDbStore dbStoreConfig
subscriber <- newNtfSubscriber subQSize smpAgentCfg random
subscriber <- newNtfSubscriber smpAgentCfg random
pushServer <- newNtfPushServer pushQSize apnsConfig
tlsServerCreds <- loadServerCredential ntfCredentials
Fingerprint fp <- loadFingerprint ntfCredentials
@@ -121,8 +120,8 @@ data NtfSubscriber = NtfSubscriber
type SMPSubscriberVar = SessionVar SMPSubscriber
newNtfSubscriber :: Natural -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
newNtfSubscriber qSize smpAgentCfg random = do
newNtfSubscriber :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO NtfSubscriber
newNtfSubscriber smpAgentCfg random = do
smpSubscribers <- TM.emptyIO
subscriberSeq <- newTVarIO 0
smpAgent <- newSMPClientAgent smpAgentCfg random
@@ -233,7 +233,6 @@ ntfServerCLI cfgPath logPath =
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 64,
subQSize = 2048,
pushQSize = 32768,
smpAgentCfg =
defaultSMPClientAgentConfig
+127 -149
View File
@@ -87,7 +87,7 @@ import Network.Socket (ServiceName, Socket, socketToHandle)
import qualified Network.TLS as TLS
import Numeric.Natural (Natural)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, smpProxyError, temporaryClientError)
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, nonBlockingWriteTBQueue, smpProxyError, temporaryClientError)
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient, getConnectedSMPServerClient)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
@@ -162,8 +162,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
stopServer s
liftIO $ exitSuccess
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
( serverThread "server subscribers" s subscribers subscriptions cancelSub
: serverThread "server ntfSubscribers" s ntfSubscribers ntfSubscriptions (\_ -> pure ())
: deliverNtfsThread s
: sendPendingEvtsThread s
: receiveFromProxyAgent pa
@@ -229,66 +229,63 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
serverThread ::
forall s.
Server ->
String ->
(Server -> TQueue (QueueId, ClientId, Subscribed)) ->
(Server -> TMap QueueId (TVar AClient)) ->
(Server -> TVar (IM.IntMap AClient)) ->
(Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) ->
Server ->
(Server -> ServerSubscribers) ->
(forall st. Client st -> TMap QueueId s) ->
(s -> IO ()) ->
M ()
serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do
serverThread label srv srvSubscribers clientSubs unsub = do
labelMyThread label
cls <- asks clients
liftIO . forever $
(atomically (readTQueue $ subQ s) >>= atomically . updateSubscribers cls)
liftIO . forever $ do
-- Reading clients outside of `updateSubscribers` transaction to avoid transaction re-evaluation on each new connected client.
-- In case client disconnects during the transaction (its `connected` property is read),
-- the transaction will still be re-evaluated, and the client won't be stored as subscribed.
sub@(_, clntId, _) <- atomically $ readTQueue subQ
c_ <- getServerClient clntId srv
atomically (updateSubscribers c_ sub)
$>>= endPreviousSubscriptions
>>= mapM_ unsub
where
updateSubscribers :: TVar (IM.IntMap (Maybe AClient)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), AClient))
updateSubscribers cls (qId, clntId, subscribed) =
-- Client lookup by ID is in the same STM transaction.
-- In case client disconnects during the transaction,
-- it will be re-evaluated, and the client won't be stored as subscribed.
(readTVar cls >>= updateSub . IM.lookup clntId)
$>>= clientToBeNotified
ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents} = srvSubscribers srv
updateSubscribers :: Maybe AClient -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, BrokerMsg), AClient))
updateSubscribers c_ (qId, clntId, subscribed) = updateSub $>>= clientToBeNotified
where
ss = subs s
updateSub = \case
Just (Just clnt)
| subscribed -> do
modifyTVar' (subClnts s) $ IM.insert clntId clnt -- add client to server's subscribed cients
TM.lookup qId ss >>= -- insert subscribed and current client
maybe
(newTVar clnt >>= \cv -> TM.insert qId cv ss $> Nothing)
(\cv -> Just <$> swapTVar cv clnt)
| otherwise -> do
removeWhenNoSubs clnt
TM.lookupDelete qId ss >>= mapM readTVar
-- This case catches Just Nothing - it cannot happen here.
-- Nothing is there only before client thread is started.
_ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
clientToBeNotified ac@(AClient _ _ c')
| clntId == clientId c' = pure Nothing
| otherwise = (\yes -> if yes then Just ((qId, subscribed), ac) else Nothing) <$> readTVar (connected c')
endPreviousSubscriptions :: ((QueueId, Subscribed), AClient) -> IO (Maybe s)
endPreviousSubscriptions (qEvt@(qId, _), ac@(AClient _ _ c)) = do
atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c)
updateSub = case c_ of
Just c@(AClient _ _ Client {connected}) -> ifM (readTVar connected) (updateSubConnected c) updateSubDisconnected
Nothing -> updateSubDisconnected
updateSubConnected c
| subscribed = do
modifyTVar' subClients $ IS.insert clntId -- add client to server's subscribed cients
upsertSubscribedClient qId c queueSubscribers
| otherwise = do
removeWhenNoSubs c
lookupDeleteSubscribedClient qId queueSubscribers
-- do not insert client if it is already disconnected, but send END to any other client
updateSubDisconnected = lookupDeleteSubscribedClient qId queueSubscribers
clientToBeNotified ac@(AClient _ _ Client {clientId, connected})
| clntId == clientId = pure Nothing
| otherwise = (\yes -> if yes then Just ((qId, subEvt), ac) else Nothing) <$> readTVar connected
where
subEvt = if subscribed then END else DELD
endPreviousSubscriptions :: ((QueueId, BrokerMsg), AClient) -> IO (Maybe s)
endPreviousSubscriptions (evt@(qId, _), ac@(AClient _ _ c)) = do
atomically $ modifyTVar' pendingEvents $ IM.alter (Just . maybe [evt] (evt <|)) (clientId c)
atomically $ do
sub <- TM.lookupDelete qId (clientSubs c)
removeWhenNoSubs ac $> sub
-- remove client from server's subscribed cients
removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c)
removeWhenNoSubs (AClient _ _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' subClients $ IS.delete (clientId c)
deliverNtfsThread :: Server -> M ()
deliverNtfsThread Server {ntfSubClients} = do
deliverNtfsThread srv@Server {ntfSubscribers} = do
ntfInt <- asks $ ntfDeliveryInterval . config
NtfStore ns <- asks ntfStore
stats <- asks serverStats
liftIO $ forever $ do
threadDelay ntfInt
readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats)
cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers)
forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverNtfs ns stats)
where
deliverNtfs ns stats (AClient _ _ Client {clientId, ntfSubscriptions, sndQ, connected}) =
whenM (currentClient readTVarIO) $ do
@@ -308,7 +305,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
writeTBQueue sndQ ts
pure $ length ts_
currentClient :: Monad m => (forall a. TVar a -> m a) -> m Bool
currentClient rd = (&&) <$> rd connected <*> (IM.member clientId <$> rd ntfSubClients)
currentClient rd = (&&) <$> rd connected <*> (IS.member clientId <$> rd (subClients ntfSubscribers))
addNtfs :: [Transmission BrokerMsg] -> (NotifierId, TVar [MsgNtf]) -> STM [Transmission BrokerMsg]
addNtfs acc (nId, v) =
readTVar v >>= \case
@@ -324,37 +321,30 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread s = do
sendPendingEvtsThread srv@Server {subscribers, ntfSubscribers} = do
endInt <- asks $ pendingENDInterval . config
cls <- asks clients
forever $ do
stats <- asks serverStats
liftIO $ forever $ do
threadDelay endInt
sendPending cls $ pendingSubEvents s
sendPending cls $ pendingNtfSubEvents s
sendPending subscribers stats
sendPending ntfSubscribers stats
where
sendPending cls ref = do
ends <- atomically $ swapTVar ref IM.empty
unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) ->
mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls
queueEvts qEvts (AClient _ _ c@Client {connected, sndQ = q}) =
whenM (readTVarIO connected) $ do
sent <- atomically $ tryWriteTBQueue q ts
if sent
then updateEndStats
else -- if queue is full it can block
forkClient c ("sendPendingEvtsThread.queueEvts") $
atomically (writeTBQueue q ts) >> updateEndStats
sendPending ServerSubscribers {pendingEvents} stats = do
pending <- atomically $ swapTVar pendingEvents IM.empty
unless (null pending) $ forM_ (IM.assocs pending) $ \(cId, evts) ->
getServerClient cId srv >>= mapM_ (enqueueEvts evts)
where
ts = L.map (\(qId, subscribed) -> (CorrId "", qId, evt subscribed)) qEvts
evt True = END
evt False = DELD
-- this accounts for both END and DELD events
updateEndStats = do
stats <- asks serverStats
let len = L.length qEvts
when (len > 0) $ liftIO $ do
atomicModifyIORef'_ (qSubEnd stats) (+ len)
atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
enqueueEvts evts (AClient _ _ Client {connected, sndQ}) =
whenM (readTVarIO connected) $
nonBlockingWriteTBQueue sndQ ts >> updateEndStats
where
ts = L.map (\(qId, evt) -> (CorrId "", qId, evt)) evts
-- this accounts for both END and DELD events
updateEndStats = do
let len = L.length evts
when (len > 0) $ do
atomicModifyIORef'_ (qSubEnd stats) (+ len)
atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
receiveFromProxyAgent :: ProxyAgent -> M ()
receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} =
@@ -581,20 +571,23 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions}
getRealTimeMetrics :: Env -> IO RealTimeMetrics
getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do
getRealTimeMetrics Env {sockets, msgStore = AMS _ _ ms, server = srv@Server {subscribers, ntfSubscribers}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
#else
let threadsCount = 0
#endif
clientsCount <- IM.size <$> readTVarIO clients
smpSubsCount <- M.size <$> readTVarIO subscribers
smpSubClientsCount <- IM.size <$> readTVarIO subClients
ntfSubsCount <- M.size <$> readTVarIO notifiers
ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients
clientsCount <- IM.size <$> getServerClients srv
smpSubs <- getSubscribersMetrics subscribers
ntfSubs <- getSubscribersMetrics ntfSubscribers
loadedCounts <- loadedQueueCounts ms
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts}
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubs, ntfSubs, loadedCounts}
where
getSubscribersMetrics ServerSubscribers {queueSubscribers, subClients} = do
subsCount <- M.size <$> getSubscribedClients queueSubscribers
subClientsCount <- IS.size <$> readTVarIO subClients
pure RTSubscriberMetrics {subsCount, subClientsCount}
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
@@ -653,9 +646,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented"
CPResume -> withAdminRole $ hPutStrLn h "resume not implemented"
CPClients -> withAdminRole $ do
active <- unliftIO u (asks clients) >>= readTVarIO
cls <- getServerClients srv
hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \(AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
forM_ (IM.toList cls) $ \(cid, (AClient _ _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions})) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt
@@ -767,8 +760,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
#else
hPutStrLn h "Threads: not available on GHC 8.10"
#endif
Env {clients, server = Server {subscribers, notifiers, subClients, ntfSubClients}} <- unliftIO u ask
activeClients <- readTVarIO clients
let Server {subscribers, ntfSubscribers} = srv
activeClients <- getServerClients srv
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
when (r == CPRAdmin) $ do
clQs <- clientTBQueueLengths' activeClients
@@ -782,30 +775,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
putActiveClientsInfo "SMP" subscribers False
putActiveClientsInfo "Ntf" notifiers True
putSubscribedClients "SMP" subClients False
putSubscribedClients "Ntf" ntfSubClients True
putSubscribersInfo "SMP" subscribers False
putSubscribersInfo "Ntf" ntfSubscribers True
where
putActiveClientsInfo :: String -> TMap QueueId (TVar AClient) -> Bool -> IO ()
putActiveClientsInfo protoName clients showIds = do
activeSubs <- readTVarIO clients
putSubscribersInfo :: String -> ServerSubscribers -> Bool -> IO ()
putSubscribersInfo protoName ServerSubscribers {queueSubscribers, subClients} showIds = do
activeSubs <- getSubscribedClients queueSubscribers
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
clnts <- countSubClients activeSubs
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
clnts' <- readTVarIO subClients
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts') <> (if showIds then " " <> show clnts' else "")
where
countSubClients :: M.Map QueueId (TVar AClient) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId' <$> readTVarIO c) IS.empty
putSubscribedClients :: String -> TVar (IM.IntMap AClient) -> Bool -> IO ()
putSubscribedClients protoName subClnts showIds = do
clnts <- readTVarIO subClnts
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "")
countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe AClient) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countSubClients :: M.Map QueueId (TVar (Maybe AClient)) -> IO IS.IntSet
countSubClients = foldM (\ !s c -> maybe s ((`IS.insert` s) . clientId') <$> readTVarIO c) IS.empty
countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
where
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
addSubs acc Nothing = pure acc
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just acl@(AClient _ _ cl)) = do
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) acl@(AClient _ _ cl) = do
subs <- readTVarIO $ subSel cl
cnts' <- case countSubs_ of
Nothing -> pure cnts
@@ -816,8 +804,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
clCnt' = if cnt == 0 then clCnt else clCnt + 1
qs' <- if cnt == 0 then pure qs else addQueueLengths qs acl
pure (subCnt + cnt, cnts', clCnt', qs')
clientTBQueueLengths' :: Foldable t => t (Maybe AClient) -> IO (Natural, Natural, Natural)
clientTBQueueLengths' = foldM (\acc -> maybe (pure acc) (addQueueLengths acc)) (0, 0, 0)
clientTBQueueLengths' :: Foldable t => t AClient -> IO (Natural, Natural, Natural)
clientTBQueueLengths' = foldM addQueueLengths (0, 0, 0)
addQueueLengths (!rl, !sl, !ml) (AClient _ _ cl) = do
(rl', sl', ml') <- queueLengths cl
pure (rl + rl', sl + sl', ml + ml')
@@ -896,30 +884,28 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
nextClientId <- asks clientSeq
clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1)
atomically $ modifyTVar' active $ IM.insert clientId Nothing
AMS qt mt ms <- asks msgStore
c <- liftIO $ newClient qt mt clientId q thVersion sessionId ts
runClientThreads qt mt ms active c clientId `finally` clientDisconnected c
runClientThreads qt mt ms c `finally` clientDisconnected c
where
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore qs ms) -> IS.Key -> M ()
runClientThreads qt mt ms active c clientId = do
atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient qt mt c)
runClientThreads :: MsgStoreClass (MsgStore qs ms) => SQSType qs -> SMSType ms -> MsgStore qs ms -> Client (MsgStore qs ms) -> M ()
runClientThreads qt mt ms c = do
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg
whenM (liftIO $ insertServerClient (AClient qt mt c) s) $ do
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg
disconnectThread_ :: Client s -> Server -> Maybe ExpirationConfig -> [M ()]
disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)]
disconnectThread_ _ _ _ = []
noSubscriptions Client {clientId} s = do
hasSubs <- IM.member clientId <$> readTVarIO (subClients s)
noSubscriptions Client {clientId} Server {subscribers, ntfSubscribers} = do
hasSubs <- IS.member clientId <$> readTVarIO (subClients subscribers)
if hasSubs
then pure False
else not . IM.member clientId <$> readTVarIO (ntfSubClients s)
else not . IS.member clientId <$> readTVarIO (subClients ntfSubscribers)
clientDisconnected :: Client s -> M ()
clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do
@@ -931,26 +917,17 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte
ntfSubs <- atomically $ swapTVar ntfSubscriptions M.empty
liftIO $ mapM_ cancelSub subs
whenM (asks serverActive >>= readTVarIO) $ do
Server {subscribers, notifiers, subClients, ntfSubClients} <- asks server
srv@Server {subscribers, ntfSubscribers} <- asks server
liftIO $ updateSubscribers subs subscribers
liftIO $ updateSubscribers ntfSubs notifiers
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
atomically $ modifyTVar' subClients $ IM.delete clientId
atomically $ modifyTVar' ntfSubClients $ IM.delete clientId
liftIO $ updateSubscribers ntfSubs ntfSubscribers
liftIO $ deleteServerClient clientId srv
tIds <- atomically $ swapTVar endThreads IM.empty
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
where
updateSubscribers :: M.Map QueueId a -> TMap QueueId (TVar AClient) -> IO ()
updateSubscribers subs srvSubs =
forM_ (M.keys subs) $ \qId ->
-- lookup of the subscribed client TVar can be in separate transaction,
-- as long as the client is read in the same transaction -
-- it prevents removing the next subscribed client.
TM.lookupIO qId srvSubs >>=
mapM_ (\c' -> atomically $ whenM (sameClientId c <$> readTVar c') $ TM.delete qId srvSubs)
sameClientId :: Client s -> AClient -> Bool
sameClientId Client {clientId} ac = clientId == clientId' ac
updateSubscribers :: M.Map QueueId a -> ServerSubscribers -> IO ()
updateSubscribers subs ServerSubscribers {queueSubscribers, subClients} = do
mapM_ (\qId -> deleteSubcribedClient qId c queueSubscribers) (M.keys subs)
atomically $ modifyTVar' subClients $ IS.delete clientId
cancelSub :: Sub -> IO ()
cancelSub s = case subThread s of
@@ -1151,7 +1128,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client
thParams'
Server {subscribedQ, ntfSubscribedQ, subscribers}
Server {subscribers, ntfSubscribers}
ms
clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
@@ -1372,7 +1349,7 @@ client
Left e -> pure $ ERR e
Right nId_ -> do
incStat . ntfCreated =<< asks serverStats
forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
forM_ nId_ $ \nId -> atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
pure $ NID notifierId rcvPublicDhKey
deleteQueueNotifier_ :: StoreQueue s -> M (Transmission BrokerMsg)
@@ -1383,7 +1360,7 @@ client
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
incStat $ ntfDeleted stats
pure ok
Right Nothing -> pure ok
@@ -1394,7 +1371,7 @@ client
subscribeQueue :: StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)
subscribeQueue q qr =
atomically (TM.lookup rId subscriptions) >>= \case
liftIO (TM.lookupIO rId subscriptions) >>= \case
Nothing -> newSub >>= deliver True
Just s@Sub {subThread} -> do
stats <- asks serverStats
@@ -1410,7 +1387,7 @@ client
rId = recipientId q
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clientId, True)
writeTQueue (subQ subscribers) (rId, clientId, True)
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
pure sub
@@ -1486,7 +1463,7 @@ client
pure ok
where
newSub = do
writeTQueue ntfSubscribedQ (entId, clientId, True)
writeTQueue (subQ ntfSubscribers) (entId, clientId, True)
TM.insert entId () ntfSubscriptions
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)
@@ -1522,7 +1499,7 @@ client
incStat $ msgRecv stats
if isGet
then incStat $ msgRecvGet stats
else pure () -- TODO skip notification delivery for delivered message
else pure () -- TODO skip notification delivery for delivered message
-- skipping delivery fails tests, it should be counted in msgNtfSkipped
-- forM_ (notifierId <$> notifier qr) $ \nId -> do
-- ns <- asks ntfStore
@@ -1595,18 +1572,19 @@ client
-- - nothing was delivered to this subscription (to avoid race conditions with the recipient).
tryDeliverMessage :: Message -> IO ()
tryDeliverMessage msg =
-- the subscription is checked outside of STM to avoid transaction cost
-- the subscribed client var is read outside of STM to avoid transaction cost
-- in case no client is subscribed.
whenM (TM.memberIO rId subscribers) $
atomically deliverToSub >>= mapM_ forkDeliver
getSubscribedClient rId (queueSubscribers subscribers)
$>>= atomically . deliverToSub
>>= mapM_ forkDeliver
where
rId = recipientId q
deliverToSub =
-- lookup has ot be in the same transaction,
deliverToSub rcv =
-- reading client TVar in the same transaction,
-- so that if subscription ends, it re-evalutates
-- and delivery is cancelled -
-- the new client will receive message in response to SUB.
(TM.lookup rId subscribers >>= mapM readTVar)
readTVar rcv
$>>= \rc@(AClient _ _ Client {subscriptions = subs, sndQ = sndQ'}) -> TM.lookup rId subs
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
@@ -1635,9 +1613,9 @@ client
labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND"
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
TM.lookupIO rId subscribers >>= mapM_ deliverIfSame
deliverIfSame rc' = time "deliver" . atomically $
whenM (sameClientId rc <$> readTVar rc') $
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
deliverIfSame rcv = time "deliver" . atomically $
whenM (sameClient rc rcv) $
tryTakeTMVar delivered >>= \case
Just _ -> pure () -- if a message was already delivered, should not deliver more
Nothing -> do
@@ -1750,7 +1728,7 @@ client
Right qr -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ do
writeTQueue subscribedQ (entId, clientId, False)
writeTQueue (subQ subscribers) (entId, clientId, False)
-- queue is usually deleted by the same client that is currently subscribed,
-- we delete subscription here, so the client with no subscriptions can be disconnected.
TM.delete entId subscriptions
@@ -1760,7 +1738,7 @@ client
stats <- asks serverStats
deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId)
when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted)
atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False)
atomically $ writeTQueue (subQ ntfSubscribers) (nId, clientId, False)
updateDeletedStats qr
pure ok
Left e -> pure $ err e
@@ -1985,7 +1963,7 @@ restoreServerNtfs =
renameFile f $ f <> ".bak"
let NtfStore ns' = ns
storedQueues <- M.size <$> readTVarIO ns'
logNote $ "notifications restored, " <> tshow lineCount <> " lines processed"
logNote $ "notifications restored, " <> tshow lineCount <> " lines processed"
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
where
restoreNtf :: NtfStore -> Int64 -> (Int, Int, Int) -> LB.ByteString -> ExceptT String IO (Int, Int, Int)
+188 -22
View File
@@ -18,7 +18,59 @@
#endif
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Messaging.Server.Env.STM where
module Simplex.Messaging.Server.Env.STM
( ServerConfig (..),
ServerStoreCfg (..),
AServerStoreCfg (..),
StorePaths (..),
StartOptions (..),
Env (..),
Server (..),
ServerSubscribers (..),
SubscribedClients,
ProxyAgent (..),
Client (..),
AClient (..),
ClientId,
Subscribed,
Sub (..),
ServerSub (..),
SubscriptionThread (..),
MsgStore,
AMsgStore (..),
AStoreType (..),
newEnv,
mkJournalStoreConfig,
newClient,
getServerClients,
getServerClient,
insertServerClient,
deleteServerClient,
getSubscribedClients,
getSubscribedClient,
upsertSubscribedClient,
lookupDeleteSubscribedClient,
deleteSubcribedClient,
sameClientId,
sameClient,
clientId',
newSubscription,
newProhibitedSub,
defaultMsgQueueQuota,
defMsgExpirationDays,
defNtfExpirationHours,
defaultMessageExpiration,
defaultNtfExpiration,
defaultInactiveClientExpiration,
defaultProxyClientConcurrency,
defaultMaxJournalMsgCount,
defaultMaxJournalStateLines,
defaultIdleQueueInterval,
journalMsgStoreDepth,
readWriteQueueStore,
noPostgresExit,
)
where
import Control.Concurrent (ThreadId)
import Control.Logger.Simple
@@ -29,9 +81,12 @@ import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Kind (Constraint)
import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import Data.Maybe (isJust)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime, nominalDay)
@@ -66,6 +121,7 @@ import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util (ifM, whenM, ($>>=))
import System.Directory (doesFileExist)
import System.Exit (exitFailure)
import System.IO (IOMode (..))
@@ -203,7 +259,6 @@ data Env = Env
serverStats :: ServerStats,
sockets :: TVar [(ServiceName, SocketState)],
clientSeq :: TVar ClientId,
clients :: TVar (IntMap (Maybe AClient)),
proxyAgent :: ProxyAgent -- senders served on this proxy
}
@@ -236,17 +291,72 @@ data AMsgStore =
type Subscribed = Bool
data Server = Server
{ subscribedQ :: TQueue (RecipientId, ClientId, Subscribed),
subscribers :: TMap RecipientId (TVar AClient),
ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed),
notifiers :: TMap NotifierId (TVar AClient),
subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions
ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions
pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))),
pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))),
{ clients :: ServerClients,
subscribers :: ServerSubscribers,
ntfSubscribers :: ServerSubscribers,
savingLock :: Lock
}
-- not exported, to prevent concurrent IntMap lookups inside STM transactions.
newtype ServerClients = ServerClients {serverClients :: TVar (IntMap AClient)}
data ServerSubscribers = ServerSubscribers
{ subQ :: TQueue (QueueId, ClientId, Subscribed),
queueSubscribers :: SubscribedClients,
subClients :: TVar IntSet,
pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg)))
}
-- not exported, to prevent accidental concurrent Map lookups inside STM transactions.
-- Map stores TVars with pointers to the clients rather than client ID to allow reading the same TVar
-- inside transactions to ensure that transaction is re-evaluated in case subscriber changes.
-- Storing Maybe allows to have continuity of subscription when the same user client disconnects and re-connects -
-- any STM transaction that reads subscribed client will re-evaluate in this case.
-- The subscriptions that were made at any point are not removed -
-- this is a better trade-off with intermittently connected mobile clients.
data SubscribedClients = SubscribedClients (TMap EntityId (TVar (Maybe AClient)))
getSubscribedClients :: SubscribedClients -> IO (Map EntityId (TVar (Maybe AClient)))
getSubscribedClients (SubscribedClients cs) = readTVarIO cs
getSubscribedClient :: EntityId -> SubscribedClients -> IO (Maybe (TVar (Maybe AClient)))
getSubscribedClient entId (SubscribedClients cs) = TM.lookupIO entId cs
{-# INLINE getSubscribedClient #-}
-- insert subscribed and current client, return previously subscribed client if it is different
upsertSubscribedClient :: EntityId -> AClient -> SubscribedClients -> STM (Maybe AClient)
upsertSubscribedClient entId ac@(AClient _ _ c) (SubscribedClients cs) =
TM.lookup entId cs >>= \case
Nothing -> Nothing <$ TM.insertM entId (newTVar (Just ac)) cs
Just cv ->
readTVar cv >>= \case
Just c' | sameClientId c c' -> pure Nothing
c_ -> c_ <$ writeTVar cv (Just ac)
-- lookup and delete currently subscribed client
lookupDeleteSubscribedClient :: EntityId -> SubscribedClients -> STM (Maybe AClient)
lookupDeleteSubscribedClient entId (SubscribedClients cs) =
TM.lookupDelete entId cs $>>= (`swapTVar` Nothing)
deleteSubcribedClient :: EntityId -> Client s -> SubscribedClients -> IO ()
deleteSubcribedClient entId c (SubscribedClients cs) =
-- lookup of the subscribed client TVar can be in separate transaction,
-- as long as the client is read in the same transaction -
-- it prevents removing the next subscribed client and also avoids STM contention for the Map.
TM.lookupIO entId cs >>= mapM_ (\cv -> atomically $ whenM (sameClient c cv) $ delete cv)
where
delete cv = do
writeTVar cv Nothing
TM.delete entId cs
sameClientId :: Client s -> AClient -> Bool
sameClientId Client {clientId} ac = clientId == clientId' ac
{-# INLINE sameClientId #-}
sameClient :: Client s -> TVar (Maybe AClient) -> STM Bool
sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
{-# INLINE sameClient #-}
newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent
}
@@ -288,16 +398,40 @@ data Sub = Sub
newServer :: IO Server
newServer = do
subscribedQ <- newTQueueIO
subscribers <- TM.emptyIO
ntfSubscribedQ <- newTQueueIO
notifiers <- TM.emptyIO
subClients <- newTVarIO IM.empty
ntfSubClients <- newTVarIO IM.empty
pendingSubEvents <- newTVarIO IM.empty
pendingNtfSubEvents <- newTVarIO IM.empty
clients <- ServerClients <$> newTVarIO mempty
subscribers <- newServerSubscribers
ntfSubscribers <- newServerSubscribers
savingLock <- createLockIO
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock}
return Server {clients, subscribers, ntfSubscribers, savingLock}
getServerClients :: Server -> IO (IntMap AClient)
getServerClients = readTVarIO . serverClients . clients
{-# INLINE getServerClients #-}
getServerClient :: ClientId -> Server -> IO (Maybe AClient)
getServerClient cId s = IM.lookup cId <$> getServerClients s
{-# INLINE getServerClient #-}
insertServerClient :: AClient -> Server -> IO Bool
insertServerClient ac@(AClient _ _ Client {clientId, connected}) Server {clients} =
atomically $
ifM
(readTVar connected)
(True <$ modifyTVar' (serverClients clients) (IM.insert clientId ac))
(pure False)
{-# INLINE insertServerClient #-}
deleteServerClient :: ClientId -> Server -> IO ()
deleteServerClient cId Server {clients} = atomically $ modifyTVar' (serverClients clients) $ IM.delete cId
{-# INLINE deleteServerClient #-}
newServerSubscribers :: IO ServerSubscribers
newServerSubscribers = do
subQ <- newTQueueIO
queueSubscribers <- SubscribedClients <$> TM.emptyIO
subClients <- newTVarIO IS.empty
pendingEvents <- newTVarIO IM.empty
pure ServerSubscribers {subQ, queueSubscribers, subClients, pendingEvents}
newClient :: SQSType qs -> SMSType ms -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore qs ms))
newClient _ _ clientId qSize thVersion sessionId createdAt = do
@@ -312,7 +446,24 @@ newClient _ _ clientId qSize thVersion sessionId createdAt = do
connected <- newTVarIO True
rcvActiveAt <- newTVarIO createdAt
sndActiveAt <- newTVarIO createdAt
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
return
Client
{ clientId,
subscriptions,
ntfSubscriptions,
rcvQ,
sndQ,
msgQ,
procThreads,
endThreads,
endThreadSeq,
thVersion,
sessionId,
connected,
createdAt,
rcvActiveAt,
sndActiveAt
}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription st = do
@@ -362,9 +513,24 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
serverStats <- newServerStats =<< getCurrentTime
sockets <- newTVarIO []
clientSeq <- newTVarIO 0
clients <- newTVarIO mempty
proxyAgent <- newSMPProxyAgent smpAgentCfg random
pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
pure
Env
{ serverActive,
config,
serverInfo,
server,
serverIdentity,
msgStore,
ntfStore,
random,
tlsServerCreds,
httpServerCreds,
serverStats,
sockets,
clientSeq,
proxyAgent
}
where
loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
loadStoreLog mkQ f st = do
+16 -15
View File
@@ -33,13 +33,16 @@ data RealTimeMetrics = RealTimeMetrics
{ socketStats :: [(ServiceName, SocketStats)],
threadsCount :: Int,
clientsCount :: Int,
smpSubsCount :: Int,
smpSubClientsCount :: Int,
ntfSubsCount :: Int,
ntfSubClientsCount :: Int,
smpSubs :: RTSubscriberMetrics,
ntfSubs :: RTSubscriberMetrics,
loadedCounts :: LoadedQueueCounts
}
data RTSubscriberMetrics = RTSubscriberMetrics
{ subsCount :: Int,
subClientsCount :: Int
}
{-# FOURMOLU_DISABLE\n#-}
prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text
prometheusMetrics sm rtm ts =
@@ -50,10 +53,8 @@ prometheusMetrics sm rtm ts =
{ socketStats,
threadsCount,
clientsCount,
smpSubsCount,
smpSubClientsCount,
ntfSubsCount,
ntfSubClientsCount,
smpSubs,
ntfSubs,
loadedCounts
} = rtm
ServerStatsData
@@ -367,21 +368,21 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_clients_total gauge\n\
\simplex_smp_clients_total " <> mshow clientsCount <> "\n\
\\n\
\# HELP simplex_smp_subscribtion_total Total subscriptions\n\
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
\# TYPE simplex_smp_subscribtion_total gauge\n\
\simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\
\\n\
\# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\
\# HELP simplex_smp_subscribtion_clients_total Subscribed clients\n\
\# TYPE simplex_smp_subscribtion_clients_total gauge\n\
\simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\
\simplex_smp_subscribtion_clients_total " <> mshow (subClientsCount smpSubs) <> "\n# smp.subClientsCount\n\
\\n\
\# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\
\# TYPE simplex_smp_subscription_ntf_total gauge\n\
\simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\
\simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\
\\n\
\# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\
\# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers\n\
\# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\
\simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\
\simplex_smp_subscription_ntf_clients_total " <> mshow (subClientsCount ntfSubs) <> "\n# ntf.subClientsCount\n\
\\n\
\# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\
\# TYPE simplex_smp_loaded_queues_queue_count gauge\n\
+3 -6
View File
@@ -1,3 +1,5 @@
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.TMap
( TMap,
emptyIO,
@@ -11,7 +13,6 @@ module Simplex.Messaging.TMap
insert,
insertM,
delete,
lookupInsert,
lookupDelete,
adjust,
update,
@@ -71,12 +72,8 @@ delete :: Ord k => k -> TMap k a -> STM ()
delete k m = modifyTVar' m $ M.delete k
{-# INLINE delete #-}
lookupInsert :: Ord k => k -> a -> TMap k a -> STM (Maybe a)
lookupInsert k v m = stateTVar m $ \mv -> (M.lookup k mv, M.insert k v mv)
{-# INLINE lookupInsert #-}
lookupDelete :: Ord k => k -> TMap k a -> STM (Maybe a)
lookupDelete k m = stateTVar m $ \mv -> (M.lookup k mv, M.delete k mv)
lookupDelete k m = stateTVar m $ M.alterF (,Nothing) k
{-# INLINE lookupDelete #-}
adjust :: Ord k => (a -> a) -> k -> TMap k a -> STM ()
+2 -2
View File
@@ -16,7 +16,7 @@ import AgentTests.ServerChoice (serverChoiceTests)
import AgentTests.ShortLinkTests (shortLinkTests)
import Simplex.Messaging.Server.Env.STM (AStoreType (..))
import Simplex.Messaging.Transport (ATransport (..))
import Test.Hspec
import Test.Hspec hiding (fit, it)
#if defined(dbPostgres)
import Fixtures
@@ -47,7 +47,7 @@ agentTests ps = do
#endif
describe "Functional API" $ functionalAPITests ps
describe "Chosen servers" serverChoiceTests
#if defined(dbServerPostgres)
#if defined(dbServerPostgres)
around_ (postgressBracket ntfTestServerDBConnectInfo) $
describe "Notification tests" $ notificationTests ps
#endif
+5 -4
View File
@@ -28,7 +28,8 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EntityId (..), ProtocolServer (..), QueueMode (..), currentSMPClientVersion, supportedSMPClientVRange, pattern VersionSMPC)
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import Simplex.Messaging.Version
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
srv :: SMPServer
srv = SMPServer "smp.simplex.im,jjbyvoemxysm7qxap7m5d5m35jzv5qq6gnlv7s4rsn7tdwwmuqciwpid.onion" "5223" (C.KeyHash "\215m\248\251")
@@ -288,7 +289,7 @@ connectionRequestTests =
smpEncodingTest queueV1NoPort
smpEncodingTest connectionRequest
-- smpEncodingTest connectionRequestNoQM -- this fails, because of queue mode patch
smpEncodingTest connectionRequestContact -- this passes because of queue mode patch in ConnReqUriData encoding
smpEncodingTest connectionRequestContact -- this passes because of queue mode patch in ConnReqUriData encoding
smpEncodingTest connectionRequest1
smpEncodingTest connectionRequest2queues
smpEncodingTest connectionRequestNew
@@ -334,12 +335,12 @@ connectionRequestTests =
restoreShortLink [srv] (contact srv2 (LinkKey "0123456789abcdef0123456789abcdef"))
`shouldBe` contact srv2 (LinkKey "0123456789abcdef0123456789abcdef")
Right (lnk :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM?p=7001&c=LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI"
Right (lnk' :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM"
Right (lnk' :: ConnShortLink 'CMContact) <- pure $ strDecode "https://localhost/a#4AkRDmhf64tdRlN406g8lJRg5OCmhD6ynIhi6glOcCM"
let presetSrv :: SMPServer = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7001"
shortenShortLink [presetSrv] lnk `shouldBe` lnk'
restoreShortLink [presetSrv] lnk' `shouldBe` lnk
Right (inv :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY?p=7001&c=LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI"
Right (inv' :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY"
Right (inv' :: ConnShortLink 'CMInvitation) <- pure $ strDecode "https://localhost/i#tnUaHYp8saREmyEHR93SBpl8ySHBchOt/LJ1ZQUzxH9Udb0jw5wmJACv5o6oe8e7BsX_hUCUMTSY"
shortenShortLink [presetSrv] inv `shouldBe` inv'
restoreShortLink [presetSrv] inv' `shouldBe` inv
where
+3 -3
View File
@@ -26,13 +26,14 @@ import qualified Data.Map.Strict as M
import Data.Type.Equality
import Simplex.Messaging.Crypto (Algorithm (..), AlgorithmI, CryptoError, DhAlgorithm)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.SNTRUP761.Bindings
import Simplex.Messaging.Crypto.Ratchet
import Simplex.Messaging.Crypto.SNTRUP761.Bindings
import Simplex.Messaging.Encoding
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Util ((<$$>))
import Simplex.Messaging.Version
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
doubleRatchetTests :: Spec
doubleRatchetTests = do
@@ -82,7 +83,6 @@ runMessageTests initRatchets_ agreeRatchetKEMs = do
withRatchets_ @X25519 initRatchets_ test
withRatchets_ @X448 initRatchets_ test
testAlgs :: (forall a. (AlgorithmI a, DhAlgorithm a) => C.SAlgorithm a -> IO ()) -> IO ()
testAlgs test = test C.SX25519 >> test C.SX448
+2 -2
View File
@@ -108,7 +108,7 @@ import Simplex.Messaging.Version (VersionRange (..))
import qualified Simplex.Messaging.Version as V
import Simplex.Messaging.Version.Internal (Version (..))
import System.Directory (copyFile, renameFile)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
import XFTPClient (testXFTPServer)
@@ -3577,7 +3577,7 @@ exchangeGreetingsMsgId_ :: HasCallStack => PQEncryption -> Int64 -> AgentClient
exchangeGreetingsMsgId_ = exchangeGreetingsViaProxyMsgId_ False
exchangeGreetingsViaProxy :: HasCallStack => Bool -> AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO ()
exchangeGreetingsViaProxy viaProxy = exchangeGreetingsViaProxyMsgId_ viaProxy PQEncOn 2
exchangeGreetingsViaProxy viaProxy = exchangeGreetingsViaProxyMsgId_ viaProxy PQEncOn 2
exchangeGreetingsViaProxyMsgId_ :: HasCallStack => Bool -> PQEncryption -> Int64 -> AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO ()
exchangeGreetingsViaProxyMsgId_ viaProxy pqEnc msgId alice bobId bob aliceId = do
+2 -1
View File
@@ -11,7 +11,8 @@ import Simplex.Messaging.Agent.Store.Interface
import Simplex.Messaging.Agent.Store.Migrations (migrationsToRun)
import Simplex.Messaging.Agent.Store.Shared
import System.Random (randomIO)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
#if defined(dbPostgres)
import qualified Data.ByteString.Char8 as B
import Database.PostgreSQL.Simple (fromOnly)
+6 -12
View File
@@ -61,7 +61,7 @@ import Data.Time.Clock.System (systemToUTCTime)
import qualified Database.PostgreSQL.Simple as PSQL
import NtfClient
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2)
import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'')
import SMPClient (cfgJ2QS, cfgMS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn)
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore')
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)
@@ -83,8 +83,9 @@ import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..))
import Simplex.Messaging.Transport (ATransport)
import System.Process (callCommand)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
#if defined(dbPostgres)
import Database.PostgreSQL.Simple.SqlQQ (sql)
#else
@@ -156,10 +157,10 @@ notificationTests ps@(t, _) = do
it "should resume subscriptions after SMP server is restarted" $
withAPNSMockServer $ \apns ->
withNtfServer t $ testNotificationsSMPRestart ps apns
describe "Notifications after SMP server restart" $
describe "Notifications after SMP server restart (batched)" $
it "should resume batched subscriptions after SMP server is restarted" $
withAPNSMockServer $ \apns ->
withNtfServer t $ testNotificationsSMPRestartBatch 100 ps apns
withNtfServer t $ testNotificationsSMPRestartBatch 50 ps apns
describe "should switch notifications to the new queue" $
testServerMatrix2 ps $ \servers ->
withAPNSMockServer $ \apns ->
@@ -227,8 +228,6 @@ v .-> key = do
testNtfTokenRepeatRegistration :: APNSMockServer -> IO ()
testNtfTokenRepeatRegistration apns = do
-- setLogLevel LogError -- LogDebug
-- withGlobalLogging logCfg $ do
withAgent 1 agentCfg initAgentServers testDB $ \a -> runRight_ $ do
let tkn = DeviceToken PPApnsTest "abcd"
NTRegistered <- registerNtfToken a tkn NMPeriodic
@@ -248,8 +247,6 @@ testNtfTokenRepeatRegistration apns = do
testNtfTokenSecondRegistration :: APNSMockServer -> IO ()
testNtfTokenSecondRegistration apns =
-- setLogLevel LogError -- LogDebug
-- withGlobalLogging logCfg $ do
withAgentClients2 $ \a a' -> runRight_ $ do
let tkn = DeviceToken PPApnsTest "abcd"
NTRegistered <- registerNtfToken a tkn NMPeriodic
@@ -559,7 +556,6 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
verifyNtfToken alice tkn vNonce verification
NTActive <- checkNtfToken alice tkn
-- send message
liftIO $ threadDelay 250000
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
get bob ##> ("", aliceId, SENT $ baseId + 1)
-- notification
@@ -571,11 +567,10 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
-- alice client already has subscription for the connection,
[Left (CMD PROHIBITED _)] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs]
threadDelay 500000
threadDelay 1000000
suspendAgent alice 0
closeDBStore store
threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000
putStrLn "before opening the database from another agent"
-- aliceNtf client doesn't have subscription and is allowed to get notification message
withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do
@@ -583,7 +578,6 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag
pure ()
threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000
putStrLn "after closing the database in another agent"
reopenDBStore store
foregroundAgent alice
threadDelay 500000
+3 -2
View File
@@ -52,11 +52,12 @@ import Simplex.Messaging.Crypto.File (CryptoFile (..))
import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Protocol (EntityId (..), SubscriptionMode (..), QueueMode (..), pattern VersionSMPC)
import Simplex.Messaging.Protocol (EntityId (..), QueueMode (..), SubscriptionMode (..), pattern VersionSMPC)
import qualified Simplex.Messaging.Protocol as SMP
import System.Random
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO.Directory (removeFile)
import Util
testDB :: String
testDB = "tests/tmp/smp-agent.test.db"
+2 -1
View File
@@ -18,7 +18,8 @@ import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfirmati
import Simplex.Messaging.Util (ifM)
import System.Directory (doesFileExist, removeFile)
import System.Process (readCreateProcess, shell)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
testDB :: FilePath
testDB = "tests/tmp/test_agent_schema.db"
+2 -1
View File
@@ -14,8 +14,9 @@ import Simplex.Messaging.Agent.Client hiding (userServers)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Client (defaultNetworkConfig)
import Simplex.Messaging.Protocol
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Test.QuickCheck
import Util
import XFTPClient (testXFTPServer)
serverChoiceTests :: Spec
+3 -2
View File
@@ -11,7 +11,8 @@ import Control.Monad.Except
import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), ConnectionMode (..), LinkKey (..), SMPAgentError (..), linkUserData, supportedSMPAgentVRange)
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.ShortLink as SL
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
shortLinkTests :: Spec
shortLinkTests = do
@@ -20,7 +21,7 @@ shortLinkTests = do
it "should fail to decrypt invitation data with bad hash" testInvShortLinkBadDataHash
describe "contact short link" $ do
it "should encrypt and decrypt data" testContactShortLink
it "should encrypt updated user data" testUpdateContactShortLink
it "should encrypt updated user data" testUpdateContactShortLink
it "should fail to decrypt contact data with bad hash" testContactShortLinkBadDataHash
it "should fail to decrypt contact data with bad signature" testContactShortLinkBadSignature
+2 -1
View File
@@ -35,12 +35,13 @@ import System.Environment (withArgs)
import System.FilePath ((</>))
import System.IO.Silently (capture_)
import System.Timeout (timeout)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Test.Main (withStdin)
import UnliftIO (catchAny)
import UnliftIO.Async (async, cancel)
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Exception (bracket)
import Util
#if defined(dbServerPostgres)
import qualified Database.PostgreSQL.Simple as PSQL
+2 -1
View File
@@ -24,7 +24,8 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol
import Simplex.Messaging.Transport
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
batchingTests :: Spec
batchingTests = do
+2 -1
View File
@@ -13,7 +13,8 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), FTCryptoError (..))
import qualified Simplex.Messaging.Crypto.File as CF
import System.Directory (getFileSize)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
cryptoFileTests :: Spec
cryptoFileTests = do
+2 -1
View File
@@ -24,9 +24,10 @@ import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Crypto.SNTRUP761.Bindings
import Simplex.Messaging.Transport.Client
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Test.Hspec.QuickCheck (modifyMaxSuccess)
import Test.QuickCheck
import Util
cryptoTests :: Spec
cryptoTests = do
+2 -1
View File
@@ -16,9 +16,10 @@ import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Test.Hspec.QuickCheck (modifyMaxSuccess)
import Test.QuickCheck
import Util
int64 :: Int64
int64 = 1234567890123456789
+6 -7
View File
@@ -10,8 +10,8 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module CoreTests.MsgStoreTests where
@@ -23,13 +23,14 @@ import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import qualified Data.ByteString.Base64.URL as B64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Base64.URL as B64
import Data.List (isPrefixOf, isSuffixOf)
import Data.Maybe (fromJust)
import Data.Time.Clock (addUTCTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2)
import Simplex.Messaging.Crypto (pattern MaxLenBS)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (EntityId (..), LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags)
@@ -43,11 +44,11 @@ import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue)
import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2)
import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile)
import System.FilePath ((</>))
import System.IO (IOMode (..), withFile)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
msgStoreTests :: Spec
msgStoreTests = do
@@ -256,7 +257,6 @@ testQueueState ms = do
length . lines <$> readFile statePath `shouldReturn` 1
readQueueState ms statePath `shouldReturn` (Just state, False)
length <$> listDirectory dir `shouldReturn` 1 -- no backup
let state1 =
state
{ size = 1,
@@ -267,7 +267,6 @@ testQueueState ms = do
length . lines <$> readFile statePath `shouldReturn` 2
readQueueState ms statePath `shouldReturn` (Just state1, False)
length <$> listDirectory dir `shouldReturn` 1 -- no backup
let state2 =
state
{ size = 2,
@@ -343,7 +342,7 @@ testRemoveJournals ms = do
runRight $ do
q <- ExceptT $ addQueue ms rId qr
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {msgId = mId2}, False) <- write q "message 2"
Just (Message {msgId = mId2}, False) <- write q "message 2"
(Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1
(Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2
liftIO $ closeMsgQueue ms q
+6 -5
View File
@@ -8,14 +8,15 @@ import Control.Concurrent.STM
import Control.Monad (when)
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
import Simplex.Messaging.Agent.RetryInterval
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
retryIntervalTests :: Spec
retryIntervalTests = do
describe "Retry interval with 2 modes and lock" $ do
testRetryIntervalSameMode
testRetryIntervalSwitchMode
describe "Foreground retry interval" $ do
describe "Foreground retry interval" $ do
testRetryForeground
testRetryToBackground
testRetrySkipWhenForeground
@@ -103,7 +104,7 @@ testRetryForeground =
when (length ints < 8) $ loop
(reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4]
(reverse <$> readTVarIO reportedIntervals)
`shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000]
`shouldReturn` [10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000]
testRetryToBackground :: Spec
testRetryToBackground =
@@ -124,7 +125,7 @@ testRetryToBackground =
)
(reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4]
(reverse <$> readTVarIO reportedIntervals)
`shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000]
`shouldReturn` [10000, 10000, 15000, 22500, 33750, 40000, 40000, 40000]
testRetrySkipWhenForeground :: Spec
testRetrySkipWhenForeground =
@@ -149,7 +150,7 @@ testRetrySkipWhenForeground =
)
(reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 0, 1, 1, 1, 2, 3, 1]
(reverse <$> readTVarIO reportedIntervals)
`shouldReturn` [ 10000, 10000, 15000, 22500, 33750, 10000, 10000, 15000, 22500, 33750, 40000, 10000]
`shouldReturn` [10000, 10000, 15000, 22500, 33750, 10000, 10000, 15000, 22500, 33750, 40000, 10000]
addInterval :: TVar [Int] -> TVar UTCTime -> IO [Int]
addInterval intervals ts = do
+2 -1
View File
@@ -12,7 +12,8 @@ import Simplex.Messaging.Client
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ErrorType)
import Simplex.Messaging.Transport.Client
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
socksSettingsTests :: Spec
socksSettingsTests = do
+5 -4
View File
@@ -29,7 +29,8 @@ import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
testPublicAuthKey :: C.APublicAuthKey
testPublicAuthKey = C.APublicAuthKey C.SEd25519 (C.publicKey "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe")
@@ -81,19 +82,19 @@ storeLogTests =
saved = [CreateQueue rId' qr'],
compacted = [CreateQueue rId' qr'],
state = M.fromList [(rId', qr')]
},
},
SLTC
{ name = "create new queue, add link data",
saved = [CreateQueue rId' qr' {queueData = Nothing}, CreateLink rId' lnkId qd],
compacted = [CreateQueue rId' qr'],
state = M.fromList [(rId', qr')]
},
},
SLTC
{ name = "create new queue with link data, delete data",
saved = [CreateQueue rId' qr', DeleteLink rId'],
compacted = [CreateQueue rId' qr' {queueData = Nothing}],
state = M.fromList [(rId', qr' {queueData = Nothing})]
},
},
SLTC
{ name = "secure queue",
saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey],
+4 -3
View File
@@ -17,9 +17,10 @@ import Simplex.Messaging.Agent.Protocol (ConnId, QueueStatus (..), UserId)
import Simplex.Messaging.Agent.Store (DBQueueId (..), RcvQueue, StoredRcvQueue (..))
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (EntityId (..), RecipientId, SMPServer, QueueMode (..), pattern NoEntity, pattern VersionSMPC)
import Test.Hspec
import Simplex.Messaging.Protocol (EntityId (..), QueueMode (..), RecipientId, SMPServer, pattern NoEntity, pattern VersionSMPC)
import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
tRcvQueuesTests :: Spec
tRcvQueuesTests = do
@@ -120,7 +121,7 @@ getSessQueuesTest = do
atomically (RQ.hasSessQueues tSess3 trq) `shouldReturn` False
let tSess4 = (0, "smp://1234-w==@alpha", Nothing)
RQ.getSessQueues tSess4 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"]
atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn`True
atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn` True
getDelSessQueuesTest :: IO ()
getDelSessQueuesTest = do
+2 -1
View File
@@ -8,8 +8,9 @@ import Control.Monad.Except
import Control.Monad.IO.Class
import Data.IORef
import Simplex.Messaging.Util
import Test.Hspec
import Test.Hspec hiding (fit, it)
import qualified UnliftIO.Exception as UE
import Util
utilTests :: Spec
utilTests = do
+2 -1
View File
@@ -11,9 +11,10 @@ import GHC.Generics (Generic)
import Generic.Random (genericArbitraryU)
import Simplex.Messaging.Version
import Simplex.Messaging.Version.Internal
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Test.Hspec.QuickCheck (modifyMaxSuccess)
import Test.QuickCheck
import Util
data V = V1 | V2 | V3 | V4 | V5 deriving (Eq, Enum, Ord, Generic, Show)
+2 -1
View File
@@ -16,7 +16,8 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Protocol (EntityId (..))
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import System.Directory (removeFile)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
fileDescriptionTests :: Spec
fileDescriptionTests = do
+6 -7
View File
@@ -55,7 +55,7 @@ import Simplex.Messaging.Transport.Client
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams)
import Simplex.Messaging.Transport.HTTP2.Server
import Simplex.Messaging.Transport.Server
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO.Async
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
@@ -83,7 +83,7 @@ ntfTestPrometheusMetricsFile :: FilePath
ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt"
ntfTestStoreDBOpts :: DBOpts
ntfTestStoreDBOpts =
ntfTestStoreDBOpts =
DBOpts
{ connstr = ntfTestServerDBConnstr,
schema = "ntf_server",
@@ -99,10 +99,10 @@ ntfTestServerDBConnstr = "postgresql://ntf_test_server_user@/ntf_test_server_db"
ntfTestServerDBConnectInfo :: ConnectInfo
ntfTestServerDBConnectInfo =
defaultConnectInfo {
connectUser = "ntf_test_server_user",
connectDatabase = "ntf_test_server_db"
}
defaultConnectInfo
{ connectUser = "ntf_test_server_user",
connectDatabase = "ntf_test_server_db"
}
ntfTestDBCfg :: PostgresStoreCfg
ntfTestDBCfg =
@@ -134,7 +134,6 @@ ntfServerCfg =
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 2,
subQSize = 2,
pushQSize = 2,
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0},
apnsConfig =
+3 -2
View File
@@ -43,8 +43,9 @@ import Simplex.Messaging.Notifications.Transport (THandleNTF)
import Simplex.Messaging.Parsers (parse, parseAll)
import Simplex.Messaging.Protocol hiding (notification)
import Simplex.Messaging.Transport
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO.STM
import Util
ntfServerTests :: ATransport -> Spec
ntfServerTests t = do
@@ -243,7 +244,7 @@ registerToken nh apns token = do
let Right verification = nd .-> "verification"
Right nonce = C.cbNonce <$> nd .-> "nonce"
Right pt = C.cbDecrypt dhSecret nonce verification
in NtfRegCode pt
in NtfRegCode pt
let code = decryptCode ntfData
pure (tknKey, dhSecret, tId, code)
+2 -1
View File
@@ -17,7 +17,8 @@ import Simplex.Messaging.Util (ifM, whenM)
import System.Directory (doesFileExist, removeFile)
import System.Environment (lookupEnv)
import System.Process (readCreateProcess, shell)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
testSchemaPath :: FilePath
testSchemaPath = "tests/tmp/test_schema.sql"
+2 -1
View File
@@ -19,9 +19,10 @@ import qualified Simplex.RemoteControl.Client as RC
import Simplex.RemoteControl.Discovery (mkLastLocalHost, preferAddress)
import Simplex.RemoteControl.Invitation (RCSignedInvitation, verifySignedInvitation)
import Simplex.RemoteControl.Types
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO
import UnliftIO.Concurrent
import Util
remoteControlTests :: Spec
remoteControlTests = do
+6 -6
View File
@@ -14,7 +14,6 @@
module SMPClient where
import Control.Logger.Simple (LogLevel (..))
import Control.Monad.Except (runExceptT)
import Data.ByteString.Char8 (ByteString)
import Data.List.NonEmpty (NonEmpty)
@@ -38,7 +37,8 @@ import Simplex.Messaging.Version
import Simplex.Messaging.Version.Internal
import System.Environment (lookupEnv)
import System.Info (os)
import Test.Hspec
import System.Process (callCommand)
import Test.Hspec hiding (fit, it)
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar)
@@ -82,7 +82,7 @@ testStoreLogFile2 :: FilePath
testStoreLogFile2 = "tests/tmp/smp-server-store.log.2"
testStoreDBOpts :: DBOpts
testStoreDBOpts =
testStoreDBOpts =
DBOpts
{ connstr = testServerDBConnstr,
schema = "smp_server",
@@ -176,7 +176,7 @@ journalCfg :: ServerConfig -> FilePath -> FilePath -> ServerConfig
journalCfg cfg' storeLogFile storeMsgsPath = cfg' {serverStoreCfg = ASSCfg SQSMemory SMSJournal SSCMemoryJournal {storeLogFile, storeMsgsPath}}
journalCfgDB :: ServerConfig -> DBOpts -> FilePath -> ServerConfig
journalCfgDB cfg' dbOpts storeMsgsPath' =
journalCfgDB cfg' dbOpts storeMsgsPath' =
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCYesUp, deletedTTL = 86400}
in cfg' {serverStoreCfg = ASSCfg SQSPostgres SMSJournal SSCDatabaseJournal {storeCfg, storeMsgsPath'}}
@@ -228,7 +228,7 @@ cfgMS msType =
}
defaultStartOptions :: StartOptions
defaultStartOptions = StartOptions {maintenance = False, compactLog = False, logLevel = LogError, skipWarnings = False, confirmMigrations = MCYesUp}
defaultStartOptions = StartOptions {maintenance = False, compactLog = False, logLevel = testLogLevel, skipWarnings = False, confirmMigrations = MCYesUp}
serverStoreConfig :: AStoreType -> AServerStoreCfg
serverStoreConfig = serverStoreConfig_ False
@@ -303,7 +303,7 @@ serverBracket process afterProcess f = do
started <- newEmptyTMVarIO
E.bracket
(forkIOWithUnmask (\unmask -> unmask (process started) `E.catchAny` handleStartError started))
(\t -> killThread t >> afterProcess >> waitFor started "stop")
(\t -> killThread t >> afterProcess >> waitFor started "stop" >> callCommand "sync")
(\t -> waitFor started "start" >> f t >>= \r -> r <$ threadDelay 100000)
where
-- it putTMVar is called twise to unlock both parts of the bracket in case of start failure
+1 -1
View File
@@ -45,7 +45,7 @@ import Simplex.Messaging.Util (bshow, tshow)
import Simplex.Messaging.Version (mkVersionRange)
import System.FilePath (splitExtensions)
import System.Random (randomRIO)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
#if defined(dbPostgres)
+12 -10
View File
@@ -17,7 +17,7 @@ module ServerTests where
import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, try, throwIO)
import Control.Exception (SomeException, throwIO, try)
import Control.Monad
import Control.Monad.IO.Class
import CoreTests.MsgStoreTests (testJournalStoreCfg)
@@ -39,7 +39,7 @@ import Simplex.Messaging.Server (exportMessages)
import Simplex.Messaging.Server.Env.STM (AServerStoreCfg (..), AStoreType (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..))
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SQSType (..), SMSType (..), newMsgStore)
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore)
import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..))
import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..), closeStoreLog)
import Simplex.Messaging.Transport
@@ -50,8 +50,8 @@ import System.IO (IOMode (..), withFile)
import System.TimeIt (timeItT)
import System.Timeout
import Test.HUnit
import Test.Hspec
import Util (removeFileIfExists)
import Test.Hspec hiding (fit, it)
import Util
serverTests :: SpecWith (ATransport, AStoreType)
serverTests = do
@@ -687,7 +687,7 @@ testWithStoreLog =
runClient _ test' = testSMPClient test' `shouldReturn` ()
serverStoreLogCfg :: AStoreType -> (ServerConfig, Bool)
serverStoreLogCfg msType =
serverStoreLogCfg msType =
let serverStoreCfg = serverStoreConfig_ True msType
cfg' = (cfgMS msType) {serverStoreCfg, storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
compacting = case msType of
@@ -918,7 +918,9 @@ testTiming =
(C.AuthAlg C.SX25519, C.AuthAlg C.SX25519, 200) -- correct key type
]
timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const
similarTime t1 t2 = abs (t2 / t1 - 1) < 0.30 -- normally the difference between "no queue" and "wrong key" is less than 5%
similarTime t1 t2
| t1 <= t2 = abs (1 - t1 / t2) < 0.35 -- normally the difference between "no queue" and "wrong key" is less than 5%
| otherwise = similarTime t2 t1
testSameTiming :: forall c. Transport c => THandleSMP c 'TClient -> THandleSMP c 'TClient -> (C.AuthAlg, C.AuthAlg, Int) -> Expectation
testSameTiming rh sh (C.AuthAlg goodKeyAlg, C.AuthAlg badKeyAlg, n) = do
g <- C.newRandom
@@ -1091,8 +1093,8 @@ testBlockMessageQueue =
pure a
testInvQueueLinkData :: SpecWith (ATransport, AStoreType)
testInvQueueLinkData =
it "create and access queue short link data for 1-time invitation" $ \(ATransport t, msType) ->
testInvQueueLinkData =
it "create and access queue short link data for 1-time invitation" $ \(ATransport t, msType) ->
smpTest2 t msType $ \r s -> do
g <- C.newRandom
(rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
@@ -1144,8 +1146,8 @@ testInvQueueLinkData =
rId2 `shouldBe` rId
testContactQueueLinkData :: SpecWith (ATransport, AStoreType)
testContactQueueLinkData =
it "create and access queue short link data for contact address" $ \(ATransport t, msType) ->
testContactQueueLinkData =
it "create and access queue short link data for contact address" $ \(ATransport t, msType) ->
smpTest2 t msType $ \r s -> do
g <- C.newRandom
(rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
+3 -3
View File
@@ -30,7 +30,8 @@ import Simplex.Messaging.Transport (TLS, Transport (..))
-- import Simplex.Messaging.Transport.WebSockets (WS)
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
import System.Environment (setEnv)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
import XFTPAgent
import XFTPCLI
import XFTPServerTests (xftpServerTests)
@@ -59,8 +60,7 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
main :: IO ()
main = do
-- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs"
setLogLevel LogError -- LogInfo -- also change in SMPClient.hs in defaultStartOptions
setLogLevel testLogLevel
withGlobalLogging logCfg $ do
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
+45 -2
View File
@@ -1,12 +1,21 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Util where
import Control.Concurrent.Async
import Control.Exception as E
import Control.Logger.Simple
import Control.Monad (replicateM, when)
import Data.Either (partitionEithers)
import Data.List (tails)
import GHC.Conc (getNumCapabilities, getNumProcessors, setNumCapabilities)
import System.Directory (doesFileExist, removeFile)
import Test.Hspec
import UnliftIO
import System.Timeout (timeout)
import Test.Hspec hiding (fit, it)
import qualified Test.Hspec as Hspec
import Test.Hspec.Core.Spec (Example (..), Result (..), ResultStatus (..))
skip :: String -> SpecWith a -> SpecWith a
skip = before_ . pendingWith
@@ -32,3 +41,37 @@ removeFileIfExists :: FilePath -> IO ()
removeFileIfExists filePath = do
fileExists <- doesFileExist filePath
when fileExists $ removeFile filePath
newtype TestWrapper a = TestWrapper a
-- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs"
testLogLevel :: LogLevel
testLogLevel = LogError
instance Example a => Example (TestWrapper a) where
type Arg (TestWrapper a) = Arg a
evaluateExample (TestWrapper action) params hooks state = do
let tt = 120
runTest =
timeout (tt * 1000000) (evaluateExample action params hooks state) >>= \case
Just r -> pure r
Nothing -> throwIO $ userError $ "test timed out after " <> show tt <> " seconds"
retryTest = do
putStrLn "Retrying with more logs..."
setLogLevel LogNote
runTest `finally` setLogLevel testLogLevel -- change this to match log level in Test.hs
E.try runTest >>= \case
Right r -> case resultStatus r of
Failure loc_ reason -> do
putStrLn $ "Test failed: location " ++ show loc_ ++ ", reason: " ++ show reason
retryTest
_ -> pure r
Left (e :: E.SomeException) -> do
putStrLn $ "Test exception: " ++ show e
retryTest
it :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a)
it label action = Hspec.it label (TestWrapper action)
fit :: (HasCallStack, Example a) => String -> a -> SpecWith (Arg a)
fit = fmap focus . it
+2 -1
View File
@@ -42,9 +42,10 @@ import Simplex.Messaging.Transport (ALPN)
import Simplex.Messaging.Util (tshow)
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory, removeFile)
import System.FilePath ((</>))
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO
import UnliftIO.Concurrent
import Util
import XFTPCLI
import XFTPClient
#if defined(dbPostgres)
+2 -1
View File
@@ -9,7 +9,8 @@ import System.Directory (createDirectoryIfMissing, getFileSize, listDirectory, r
import System.Environment (withArgs)
import System.FilePath ((</>))
import System.IO.Silently (capture_)
import Test.Hspec
import Test.Hspec hiding (fit, it)
import Util
import XFTPClient (testXFTPServerStr, testXFTPServerStr2, withXFTPServer, withXFTPServer2, xftpServerFiles, xftpServerFiles2)
xftpCLITests :: Spec
+1 -1
View File
@@ -19,7 +19,7 @@ import Simplex.FileTransfer.Transport (supportedFileServerVRange, supportedXFTPh
import Simplex.Messaging.Protocol (XFTPServer)
import Simplex.Messaging.Transport (ALPN)
import Simplex.Messaging.Transport.Server
import Test.Hspec
import Test.Hspec hiding (fit, it)
xftpTest :: HasCallStack => (HasCallStack => XFTPClient -> IO ()) -> Expectation
xftpTest test = runXFTPTest test `shouldReturn` ()
+2 -1
View File
@@ -32,8 +32,9 @@ import Simplex.Messaging.Protocol (BasicAuth, EntityId (..), pattern NoEntity)
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..))
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive, removeFile)
import System.FilePath ((</>))
import Test.Hspec
import Test.Hspec hiding (fit, it)
import UnliftIO.STM
import Util
import XFTPClient
xftpServerTests :: Spec