Merge branch 'master' into users

This commit is contained in:
Evgeny Poberezkin
2023-01-18 21:18:42 +00:00
7 changed files with 128 additions and 64 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 4.3.0
version: 4.3.1
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+2 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 4.3.0
version: 4.3.1
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
@@ -57,6 +57,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
+60 -53
View File
@@ -96,7 +96,7 @@ import Data.Bifunctor (bimap, first, second)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (partitionEithers)
import Data.Either (lefts, partitionEithers)
import Data.Functor (($>))
import Data.List (foldl', partition)
import Data.List.NonEmpty (NonEmpty (..), (<|))
@@ -117,6 +117,7 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
import Simplex.Messaging.Agent.TAsyncs
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues)
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
@@ -157,7 +158,7 @@ import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Timeout (timeout)
import UnliftIO (async, mapConcurrently)
import UnliftIO (mapConcurrently)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -201,8 +202,8 @@ data AgentClient = AgentClient
connLocks :: TMap ConnId Lock,
-- locks to prevent concurrent reconnections to SMP servers
reconnectLocks :: TMap SMPTransportSession Lock,
reconnections :: TVar [Async ()],
asyncClients :: TVar [Async ()],
reconnections :: TAsyncs,
asyncClients :: TAsyncs,
agentStats :: TMap AgentStatsKey (TVar Int),
clientId :: Int,
agentEnv :: Env
@@ -271,8 +272,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
getMsgLocks <- TM.empty
connLocks <- TM.empty
reconnectLocks <- TM.empty
reconnections <- newTVar []
asyncClients <- newTVar []
reconnections <- newTAsyncs
asyncClients <- newTAsyncs
agentStats <- TM.empty
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv}
@@ -297,7 +298,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
atomically (getClientVar tSess smpClients)
>>= either
(newProtocolClient c tSess smpClients connectClient reconnectClient)
(newProtocolClient c tSess smpClients connectClient reconnectSMPClient)
(waitForProtocolClient c tSess)
where
connectClient :: m SMPClient
@@ -314,9 +315,11 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
removeClientAndSubs :: IO ([RcvQueue], [ConnId])
removeClientAndSubs = atomically $ do
TM.delete tSess smpClients
(qs, conns) <- RQ.getDelSessQueues tSess $ activeSubs c
qs <- RQ.getDelSessQueues tSess $ activeSubs c
mapM_ (`RQ.addQueue` pendingSubs c) qs
pure (qs, S.toList conns)
let cs = S.fromList $ map qConnId qs
cs' <- RQ.getConns $ activeSubs c
pure (qs, S.toList $ cs `S.difference` cs')
serverDown :: ([RcvQueue], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
@@ -325,36 +328,36 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
unless (null conns) $ notifySub "" $ DOWN srv conns
unless (null qs) $ do
atomically $ mapM_ (releaseGetLock c) qs
unliftIO u reconnectServer
unliftIO u $ reconnectServer c tSess
reconnectServer :: m ()
reconnectServer = do
a <- async tryReconnectClient
atomically $ modifyTVar' (reconnections c) (a :)
notifySub :: ConnId -> ACommand 'Agent -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
tryReconnectClient :: m ()
tryReconnectClient = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
reconnectClient `catchError` const loop
reconnectClient :: m ()
reconnectClient =
withLockMap_ (reconnectLocks c) tSess "reconnect" $
atomically (RQ.getSessQueues tSess $ pendingSubs c) >>= mapM_ resubscribe . L.nonEmpty
where
resubscribe :: NonEmpty RcvQueue -> m ()
resubscribe qs = do
cs <- atomically . RQ.getConns $ activeSubs c
rs <- subscribeQueues c $ L.toList qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
liftIO $ do
let conns = S.toList $ S.fromList okConns `S.difference` cs
unless (null conns) $ notifySub "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
mapM_ (throwError . snd) $ listToMaybe tempErrs
reconnectServer :: AgentMonad m => AgentClient -> SMPTransportSession -> m ()
reconnectServer c tSess = newAsyncAction tryReconnectSMPClient $ reconnections c
where
tryReconnectSMPClient aId = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
reconnectSMPClient c tSess `catchError` const loop
atomically . removeAsyncAction aId $ reconnections c
reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m ()
reconnectSMPClient c tSess@(_, srv, _) =
withLockMap_ (reconnectLocks c) tSess "reconnect" $
atomically (RQ.getSessQueues tSess $ pendingSubs c) >>= mapM_ resubscribe . L.nonEmpty
where
resubscribe :: NonEmpty RcvQueue -> m ()
resubscribe qs = do
cs <- atomically . RQ.getConns $ activeSubs c
rs <- subscribeQueues c $ L.toList qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
liftIO $ do
let conns = S.toList $ S.fromList okConns `S.difference` cs
unless (null conns) $ notifySub "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
mapM_ (throwError . snd) $ listToMaybe tempErrs
notifySub :: ConnId -> ACommand 'Agent -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
@@ -363,7 +366,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
atomically (getClientVar tSess ntfClients)
>>= either
(newProtocolClient c tSess ntfClients connectClient $ pure ())
(newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ())
(waitForProtocolClient c tSess)
where
connectClient :: m NtfClient
@@ -403,7 +406,7 @@ newProtocolClient ::
TransportSession msg ->
TMap (TransportSession msg) (ClientVar msg) ->
m (ProtocolClient msg) ->
m () ->
(AgentClient -> TransportSession msg -> m ()) ->
ClientVar msg ->
m (ProtocolClient msg)
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync
@@ -426,13 +429,12 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne
TM.delete tSess clients
throwError e
tryConnectAsync :: m ()
tryConnectAsync = do
a <- async connectAsync
atomically $ modifyTVar' (asyncClients c) (a :)
connectAsync :: m ()
connectAsync = do
tryConnectAsync = newAsyncAction connectAsync $ asyncClients c
connectAsync :: Int -> m ()
connectAsync aId = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop -> void $ tryConnectClient (const reconnectClient) loop
withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop
atomically . removeAsyncAction aId $ asyncClients c
hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent
hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client
@@ -448,8 +450,8 @@ closeAgentClient c = liftIO $ do
atomically $ writeTVar (active c) False
closeProtocolServerClients c smpClients
closeProtocolServerClients c ntfClients
cancelActions $ reconnections c
cancelActions $ asyncClients c
cancelActions . actions $ reconnections c
cancelActions . actions $ asyncClients c
cancelActions $ smpQueueMsgDeliveries c
cancelActions $ asyncCmdProcesses c
atomically . RQ.clear $ activeSubs c
@@ -647,9 +649,13 @@ subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do
atomically $ do
modifyTVar' (subscrConns c) $ S.insert connId
RQ.addQueue rq $ pendingSubs c
withSMPClient c rq "SUB" $ \smp ->
r <- withSMPClient c rq "SUB" $ \smp ->
liftIO (runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq)
>>= either throwError pure
case r of
Left e -> do
tSess <- mkSMPTransportSession c rq
reconnectServer c tSess >> throwError (protocolClientError SMP (B.unpack $ strEncode server) e)
_ -> pure ()
processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ())
processSubResult c rq r = do
@@ -697,15 +703,16 @@ subscribeQueues c qs = do
in M.alter (Just . maybe [rq] (rq <|)) tSess m
subscribeQueues_ :: AgentMonad m => AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> m (NonEmpty (RcvQueue, Either AgentErrorType ()))
subscribeQueues_ c tSess@(userId, srv, _) qs =
subscribeQueues_ c tSess@(userId, srv, _) qs = do
tryError (getSMPServerClient c tSess) >>= \case
Left e -> pure $ L.map (,Left e) qs
Right smp -> liftIO $ do
Right smp -> do
logServer "-->" c srv (bshow (length qs) <> " queues") "SUB"
let n = (length qs - 1) `div` 90 + 1
incClientStatN c userId smp n "SUBS" "OK"
rs <- L.zip qs <$> subscribeSMPQueues smp (L.map queueCreds qs)
mapM_ (uncurry $ processSubResult c) rs
liftIO $ incClientStatN c userId smp n "SUBS" "OK"
rs <- liftIO $ L.zip qs <$> subscribeSMPQueues smp (L.map queueCreds qs)
liftIO $ mapM_ (uncurry $ processSubResult c) rs
when (any temporaryClientError . lefts . map snd $ L.toList rs) $ reconnectServer c tSess
pure $ L.map (second . first $ protocolClientError SMP $ clientServer smp) rs
where
queueCreds RcvQueue {rcvPrivateKey, rcvId} = (rcvPrivateKey, rcvId)
+25
View File
@@ -0,0 +1,25 @@
module Simplex.Messaging.Agent.TAsyncs where
import Control.Concurrent.STM (stateTVar)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import UnliftIO.Async (Async, async)
import UnliftIO.STM
data TAsyncs = TAsyncs
{ actionId :: TVar Int,
actions :: TMap Int (Async ())
}
newTAsyncs :: STM TAsyncs
newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty
newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m ()
newAsyncAction action as = do
aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1)
a <- async $ action aId
atomically $ TM.insert aId a $ actions as
removeAsyncAction :: Int -> TAsyncs -> STM ()
removeAsyncAction aId = TM.delete aId . actions
+4 -6
View File
@@ -1,5 +1,3 @@
{-# LANGUAGE NamedFieldPuns #-}
module Simplex.Messaging.Agent.TRcvQueues
( TRcvQueues,
empty,
@@ -52,11 +50,11 @@ getSessQueues tSess (TRcvQueues qs) = M.foldl' addQ [] <$> readTVar qs
where
addQ qs' rq = if rq `isSession` tSess then rq : qs' else qs'
getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues -> STM ([RcvQueue], Set ConnId)
getDelSessQueues tSess (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ (([], S.empty), M.empty)
getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues -> STM [RcvQueue]
getDelSessQueues tSess (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ ([], M.empty)
where
addQ (removed@(remQs, remConns), qs') rq@RcvQueue {connId}
| rq `isSession` tSess = ((rq : remQs, S.insert connId remConns), qs')
addQ (removed, qs') rq
| rq `isSession` tSess = (rq : removed, qs')
| otherwise = (removed, M.insert (qKey rq) rq qs')
isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool
+3 -1
View File
@@ -47,6 +47,7 @@ data ServerStatsData = ServerStatsData
_qCount :: Int,
_msgCount :: Int
}
deriving (Show)
newServerStats :: UTCTime -> STM ServerStats
newServerStats ts = do
@@ -88,7 +89,7 @@ setServerStats s d = do
writeTVar (qDeleted s) $! _qDeleted d
writeTVar (msgSent s) $! _msgSent d
writeTVar (msgRecv s) $! _msgRecv d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
setPeriodStats (activeQueues s) (_activeQueues d)
writeTVar (msgSentNtf s) $! _msgSentNtf d
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
@@ -152,6 +153,7 @@ data PeriodStatsData a = PeriodStatsData
_week :: Set a,
_month :: Set a
}
deriving (Show)
newPeriodStatsData :: PeriodStatsData a
newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty}
+33 -2
View File
@@ -11,6 +11,7 @@
module ServerTests where
import AgentTests.NotificationTests (removeFileIfExists)
import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, try)
@@ -20,6 +21,7 @@ import Data.Bifunctor (first)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.Set as S
import SMPClient
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
@@ -28,6 +30,7 @@ import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..))
import Simplex.Messaging.Transport
import System.Directory (removeFile)
import System.TimeIt (timeItT)
@@ -605,6 +608,10 @@ logSize f =
testRestoreMessages :: ATransport -> Spec
testRestoreMessages at@(ATransport t) =
it "should store messages on exit and restore on start" $ do
removeFileIfExists testStoreLogFile
removeFileIfExists testStoreMsgsFile
removeFileIfExists testServerStatsBackupFile
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
recipientId <- newTVarIO ""
recipientKey <- newTVarIO Nothing
@@ -632,11 +639,15 @@ testRestoreMessages at@(ATransport t) =
Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6")
pure ()
rId <- readTVarIO recipientId
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 16
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV3 dh
@@ -650,9 +661,11 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 16
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV3 dh
@@ -667,9 +680,13 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 16
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5
removeFile testStoreLogFile
removeFile testStoreMsgsFile
removeFile testServerStatsBackupFile
where
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do
@@ -679,6 +696,20 @@ testRestoreMessages at@(ATransport t) =
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
checkStats :: ServerStatsData -> [RecipientId] -> Int -> Int -> Expectation
checkStats s qs sent received = do
_qCreated s `shouldBe` length qs
_qSecured s `shouldBe` length qs
_qDeleted s `shouldBe` 0
_msgSent s `shouldBe` sent
_msgRecv s `shouldBe` received
_msgSentNtf s `shouldBe` 0
_msgRecvNtf s `shouldBe` 0
let PeriodStatsData {_day, _week, _month} = _activeQueues s
S.toList _day `shouldBe` qs
S.toList _week `shouldBe` qs
S.toList _month `shouldBe` qs
testRestoreMessagesV2 :: ATransport -> Spec
testRestoreMessagesV2 at@(ATransport t) =
it "should store messages on exit and restore on start" $ do