Merge branch 'master' into notifications-server

This commit is contained in:
Evgeny Poberezkin
2022-03-28 19:03:40 +01:00
12 changed files with 256 additions and 213 deletions

View File

@@ -47,8 +47,6 @@ library
Simplex.Messaging.Crypto.Ratchet
Simplex.Messaging.Encoding
Simplex.Messaging.Encoding.String
Simplex.Messaging.Notifications.Client
Simplex.Messaging.Notifications.Client.Env
Simplex.Messaging.Notifications.Protocol
Simplex.Messaging.Notifications.Server
Simplex.Messaging.Notifications.Server.Env
@@ -63,6 +61,7 @@ library
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.TMap
Simplex.Messaging.Transport
Simplex.Messaging.Transport.Client
Simplex.Messaging.Transport.KeepAlive

View File

@@ -83,6 +83,7 @@ import Simplex.Messaging.Encoding
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (MsgBody)
import qualified Simplex.Messaging.Protocol as SMP
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (bshow, liftError, tryError, unlessM)
import Simplex.Messaging.Version
import System.Random (randomR)
@@ -361,18 +362,13 @@ resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
unlessM (queueDelivering qKey) $
async (runSmpQueueMsgDelivery c connId sq)
>>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey
>>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c)
unlessM connQueued $
withStore (`getPendingMsgs` connId)
>>= queuePendingMsgs c connId sq
where
queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c)
connQueued =
atomically $
isJust
<$> stateTVar
(connMsgsQueued c)
(\m -> (M.lookup connId m, M.insert connId True m))
queueDelivering qKey = atomically $ isJust <$> TM.lookup qKey (smpQueueMsgDeliveries c)
connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connMsgsQueued c)
queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m ()
queuePendingMsgs c connId sq msgIds = atomically $ do
@@ -382,11 +378,11 @@ queuePendingMsgs c connId sq msgIds = atomically $ do
getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId)
getPendingMsgQ c connId SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c)
maybe (newMsgQueue qKey) pure =<< TM.lookup qKey (smpQueueMsgQueues c)
where
newMsgQueue qKey = do
mq <- newTQueue
modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq
TM.insert qKey mq $ smpQueueMsgQueues c
pure mq
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()

View File

@@ -51,8 +51,6 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text.Encoding
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
@@ -63,10 +61,12 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol (QueueId, QueueIdsKeys (..), SndPublicVerifyKey)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (bshow, liftEitherError, liftError, tryError)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (bshow, liftEitherError, liftError, tryError, whenM)
import Simplex.Messaging.Version
import System.Timeout (timeout)
import UnliftIO (async)
import UnliftIO (async, forConcurrently_)
import UnliftIO.Exception (Exception, IOException)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -78,13 +78,13 @@ data AgentClient = AgentClient
subQ :: TBQueue (ATransmission 'Agent),
msgQ :: TBQueue SMPServerTransmission,
smpServers :: TVar (NonEmpty SMPServer),
smpClients :: TVar (Map SMPServer SMPClientVar),
subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)),
pendingSubscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)),
subscrConns :: TVar (Map ConnId SMPServer),
connMsgsQueued :: TVar (Map ConnId Bool),
smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)),
smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())),
smpClients :: TMap SMPServer SMPClientVar,
subscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue),
pendingSubscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue),
subscrConns :: TMap ConnId SMPServer,
connMsgsQueued :: TMap ConnId Bool,
smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId),
smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()),
reconnections :: TVar [Async ()],
asyncClients :: TVar [Async ()],
clientId :: Int,
@@ -100,13 +100,13 @@ newAgentClient agentEnv = do
subQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
smpServers <- newTVar $ initialSMPServers (config agentEnv)
smpClients <- newTVar M.empty
subscrSrvrs <- newTVar M.empty
pendingSubscrSrvrs <- newTVar M.empty
subscrConns <- newTVar M.empty
connMsgsQueued <- newTVar M.empty
smpQueueMsgQueues <- newTVar M.empty
smpQueueMsgDeliveries <- newTVar M.empty
smpClients <- TM.empty
subscrSrvrs <- TM.empty
pendingSubscrSrvrs <- TM.empty
subscrConns <- TM.empty
connMsgsQueued <- TM.empty
smpQueueMsgQueues <- TM.empty
smpQueueMsgDeliveries <- TM.empty
reconnections <- newTVar []
asyncClients <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
@@ -133,12 +133,12 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
atomically getClientVar >>= either newSMPClient waitForSMPClient
where
getClientVar :: STM (Either SMPClientVar SMPClientVar)
getClientVar = maybe (Left <$> newClientVar) (pure . Right) . M.lookup srv =<< readTVar smpClients
getClientVar = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup srv smpClients
newClientVar :: STM SMPClientVar
newClientVar = do
smpVar <- newEmptyTMVar
modifyTVar smpClients $ M.insert srv smpVar
TM.insert srv smpVar smpClients
pure smpVar
waitForSMPClient :: TMVar (Either AgentErrorType SMPClient) -> m SMPClient
@@ -165,12 +165,12 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
then retryAction
else atomically $ do
putTMVar smpVar (Left e)
modifyTVar smpClients $ M.delete srv
TM.delete srv smpClients
throwError e
tryConnectAsync :: m ()
tryConnectAsync = do
a <- async connectAsync
atomically $ modifyTVar (asyncClients c) (a :)
atomically $ modifyTVar' (asyncClients c) (a :)
connectAsync :: m ()
connectAsync = do
ri <- asks $ reconnectInterval . config
@@ -188,23 +188,24 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
clientDisconnected :: UnliftIO m -> IO ()
clientDisconnected u = do
removeClientSubs >>= (`forM_` serverDown u)
removeClientAndSubs >>= (`forM_` serverDown u)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
removeClientSubs :: IO (Maybe (Map ConnId RcvQueue))
removeClientSubs = atomically $ do
modifyTVar smpClients $ M.delete srv
cs <- M.lookup srv <$> readTVar (subscrSrvrs c)
modifyTVar (subscrSrvrs c) $ M.delete srv
modifyTVar (subscrConns c) $ maybe id (deleteKeys . M.keysSet) cs
mapM_ (modifyTVar (pendingSubscrSrvrs c) . addPendingSubs) cs
return cs
removeClientAndSubs :: IO (Maybe (Map ConnId RcvQueue))
removeClientAndSubs = atomically $ do
TM.delete srv smpClients
cVar_ <- TM.lookupDelete srv $ subscrSrvrs c
forM cVar_ $ \cVar -> do
cs <- readTVar cVar
modifyTVar' (subscrConns c) (`M.withoutKeys` M.keysSet cs)
addPendingSubs cVar cs
pure cs
where
addPendingSubs :: Map ConnId RcvQueue -> Map SMPServer (Map ConnId RcvQueue) -> Map SMPServer (Map ConnId RcvQueue)
addPendingSubs cs = M.alter (Just . addSubs cs) srv
addSubs cs = maybe cs (M.union cs)
deleteKeys :: Ord k => Set k -> Map k a -> Map k a
deleteKeys ks m = S.foldr' M.delete m ks
addPendingSubs cVar cs = do
let ps = pendingSubscrSrvrs c
TM.lookup srv ps >>= \case
Just v -> TM.union cs v
_ -> TM.insert srv cVar ps
serverDown :: UnliftIO m -> Map ConnId RcvQueue -> IO ()
serverDown u cs = unless (M.null cs) $ do
@@ -214,7 +215,7 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
reconnectServer :: m ()
reconnectServer = do
a <- async tryReconnectClient
atomically $ modifyTVar (reconnections c) (a :)
atomically $ modifyTVar' (reconnections c) (a :)
tryReconnectClient :: m ()
tryReconnectClient = do
@@ -223,19 +224,26 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
reconnectClient `catchError` const loop
reconnectClient :: m ()
reconnectClient = do
reconnectClient =
withAgentLock c . withSMP c srv $ \smp -> do
subs <- readTVarIO $ subscrConns c
cs <- M.lookup srv <$> readTVarIO (pendingSubscrSrvrs c)
forM_ (maybe [] M.toList cs) $ \(connId, rq@RcvQueue {rcvPrivateKey, rcvId}) ->
when (isNothing $ M.lookup connId subs) $ do
subscribeSMPQueue smp rcvPrivateKey rcvId
`catchError` \case
e@SMPResponseTimeout -> throwError e
e@SMPNetworkError -> throwError e
e -> liftIO $ notifySub (ERR $ smpClientError e) connId
addSubscription c rq connId
liftIO $ notifySub UP connId
cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSubscrSrvrs c)
forConcurrently_ (maybe [] M.toList cs) $ \sub@(connId, _) ->
whenM (atomically $ isNothing <$> TM.lookup connId (subscrConns c)) $
subscribe_ smp sub `catchError` handleError connId
where
subscribe_ :: SMPClient -> (ConnId, RcvQueue) -> ExceptT SMPClientError IO ()
subscribe_ smp (connId, rq@RcvQueue {rcvPrivateKey, rcvId}) = do
subscribeSMPQueue smp rcvPrivateKey rcvId
addSubscription c rq connId
liftIO $ notifySub UP connId
handleError :: ConnId -> SMPClientError -> ExceptT SMPClientError IO ()
handleError connId = \case
e@SMPResponseTimeout -> throwError e
e@SMPNetworkError -> throwError e
e -> do
liftIO $ notifySub (ERR $ smpClientError e) connId
atomically $ removePendingSubscription c srv connId
notifySub :: ACommand 'Agent -> ConnId -> IO ()
notifySub cmd connId = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
@@ -333,7 +341,7 @@ newRcvQueue_ a c srv = do
subscribeQueue :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m ()
subscribeQueue c rq@RcvQueue {server, rcvPrivateKey, rcvId} connId = do
addPendingSubscription c rq connId
atomically $ addPendingSubscription c rq connId
withLogSMP c server rcvId "SUB" $ \smp -> do
liftIO (runExceptT $ subscribeSMPQueue smp rcvPrivateKey rcvId) >>= \case
Left e -> do
@@ -344,34 +352,30 @@ subscribeQueue c rq@RcvQueue {server, rcvPrivateKey, rcvId} connId = do
addSubscription :: MonadUnliftIO m => AgentClient -> RcvQueue -> ConnId -> m ()
addSubscription c rq@RcvQueue {server} connId = atomically $ do
modifyTVar (subscrConns c) $ M.insert connId server
TM.insert connId server $ subscrConns c
addSubs_ (subscrSrvrs c) rq connId
removePendingSubscription c server connId
addPendingSubscription :: MonadUnliftIO m => AgentClient -> RcvQueue -> ConnId -> m ()
addPendingSubscription c rq connId =
atomically $ addSubs_ (pendingSubscrSrvrs c) rq connId
addPendingSubscription :: AgentClient -> RcvQueue -> ConnId -> STM ()
addPendingSubscription = addSubs_ . pendingSubscrSrvrs
addSubs_ :: TVar (Map SMPServer (Map ConnId RcvQueue)) -> RcvQueue -> ConnId -> STM ()
addSubs_ ss rq@RcvQueue {server} connId = modifyTVar ss $ M.alter (Just . addSub) server
where
addSub = maybe (M.singleton connId rq) (M.insert connId rq)
addSubs_ :: TMap SMPServer (TMap ConnId RcvQueue) -> RcvQueue -> ConnId -> STM ()
addSubs_ ss rq@RcvQueue {server} connId =
TM.lookup server ss >>= \case
Just m -> TM.insert connId rq m
_ -> TM.singleton connId rq >>= \m -> TM.insert server m ss
removeSubscription :: MonadUnliftIO m => AgentClient -> ConnId -> m ()
removeSubscription c@AgentClient {subscrConns} connId = atomically $ do
server_ <- stateTVar subscrConns $ \cs -> (M.lookup connId cs, M.delete connId cs)
server_ <- TM.lookupDelete connId subscrConns
mapM_ (\server -> removeSubs_ (subscrSrvrs c) server connId) server_
removePendingSubscription :: AgentClient -> SMPServer -> ConnId -> STM ()
removePendingSubscription c = removeSubs_ (pendingSubscrSrvrs c)
removePendingSubscription = removeSubs_ . pendingSubscrSrvrs
removeSubs_ :: TVar (Map SMPServer (Map ConnId RcvQueue)) -> SMPServer -> ConnId -> STM ()
removeSubs_ ss server connId = modifyTVar ss $ M.update delSub server
where
delSub :: Map ConnId RcvQueue -> Maybe (Map ConnId RcvQueue)
delSub cs =
let cs' = M.delete connId cs
in if M.null cs' then Nothing else Just cs'
removeSubs_ :: TMap SMPServer (TMap ConnId RcvQueue) -> SMPServer -> ConnId -> STM ()
removeSubs_ ss server connId =
TM.lookup server ss >>= mapM_ (TM.delete connId)
logServer :: AgentMonad m => ByteString -> AgentClient -> SMPServer -> QueueId -> ByteString -> m ()
logServer dir AgentClient {clientId} srv qId cmdStr =

View File

@@ -56,13 +56,13 @@ import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe)
import Network.Socket (ServiceName)
import Numeric.Natural
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TLS, TProxy, Transport (..), TransportError)
import Simplex.Messaging.Transport.Client (runTransportClient, smpClientHandshake)
import Simplex.Messaging.Transport.KeepAlive
@@ -83,7 +83,7 @@ data SMPClient = SMPClient
smpServer :: SMPServer,
tcpTimeout :: Int,
clientCorrId :: TVar Natural,
sentCommands :: TVar (Map CorrId Request),
sentCommands :: TMap CorrId Request,
sndQ :: TBQueue SentRawTransmission,
rcvQ :: TBQueue (SignedTransmission BrokerMsg),
msgQ :: TBQueue SMPServerTransmission
@@ -137,7 +137,7 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smp
mkSMPClient = do
connected <- newTVar False
clientCorrId <- newTVar 0
sentCommands <- newTVar M.empty
sentCommands <- TM.empty
sndQ <- newTBQueue qSize
rcvQ <- newTBQueue qSize
return
@@ -202,11 +202,10 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smp
if B.null $ bs corrId
then sendMsg qId respOrErr
else do
cs <- readTVarIO sentCommands
case M.lookup corrId cs of
atomically (TM.lookup corrId sentCommands) >>= \case
Nothing -> sendMsg qId respOrErr
Just Request {queueId, responseVar} -> atomically $ do
modifyTVar sentCommands $ M.delete corrId
TM.delete corrId sentCommands
putTMVar responseVar $
if queueId == qId
then case respOrErr of
@@ -368,6 +367,6 @@ sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeou
send :: CorrId -> SentRawTransmission -> STM (TMVar Response)
send corrId t = do
r <- newEmptyTMVar
modifyTVar sentCommands . M.insert corrId $ Request qId r
TM.insert corrId (Request qId r) sentCommands
writeTBQueue sndQ t
return r

View File

@@ -25,7 +25,6 @@
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking, verifyCmdSignature, dummyVerifyCmd) where
import Control.Concurrent.STM (stateTVar)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
@@ -47,6 +46,8 @@ import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
@@ -92,8 +93,8 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
MonadUnliftIO m' =>
Server ->
(Server -> TBQueue (QueueId, Client)) ->
(Server -> TVar (M.Map QueueId Client)) ->
(Client -> TVar (M.Map QueueId s)) ->
(Server -> TMap QueueId Client) ->
(Client -> TMap QueueId s) ->
(s -> m' ()) ->
m' ()
serverThread s subQ subs clientSubs unsub = forever $ do
@@ -110,13 +111,13 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
else do
yes <- readTVar $ connected c'
pure $ if yes then Just (qId, c') else Nothing
stateTVar (subs s) (\cs -> (M.lookup qId cs, M.insert qId clnt cs))
TM.lookupInsert qId clnt (subs s)
>>= fmap join . mapM clientToBeNotified
endPreviousSubscriptions :: (QueueId, Client) -> m' (Maybe s)
endPreviousSubscriptions (qId, c) = do
void . forkIO . atomically $
writeTBQueue (sndQ c) (CorrId "", qId, END)
atomically . stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss)
atomically $ TM.lookupDelete qId (clientSubs c)
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
runClient _ h = do
@@ -139,7 +140,7 @@ clientDisconnected c@Client {subscriptions, connected} = do
subs <- readTVarIO subscriptions
mapM_ cancelSub subs
cs <- asks $ subscribers . server
atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs
atomically . mapM_ (\rId -> TM.update deleteCurrentClient rId cs) $ M.keys subs
where
deleteCurrentClient :: Client -> Maybe Client
deleteCurrentClient c'
@@ -312,21 +313,19 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
getSubscription :: RecipientId -> STM Sub
getSubscription rId = do
subs <- readTVar subscriptions
case M.lookup rId subs of
TM.lookup rId subscriptions >>= \case
Just s -> tryTakeTMVar (delivered s) $> s
Nothing -> do
writeTBQueue subscribedQ (rId, clnt)
s <- newSubscription
writeTVar subscriptions $ M.insert rId s subs
TM.insert rId s subscriptions
return s
subscribeNotifications :: m (Transmission BrokerMsg)
subscribeNotifications = atomically $ do
subs <- readTVar ntfSubscriptions
when (isNothing $ M.lookup queueId subs) $ do
whenM (isNothing <$> TM.lookup queueId ntfSubscriptions) $ do
writeTBQueue ntfSubscribedQ (queueId, clnt)
writeTVar ntfSubscriptions $ M.insert queueId () subs
TM.insert queueId () ntfSubscriptions
pure ok
acknowledgeMsg :: m (Transmission BrokerMsg)
@@ -337,7 +336,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
_ -> return $ err NO_MSG
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub rId f = readTVar subscriptions >>= mapM f . M.lookup rId
withSub rId f = mapM f =<< TM.lookup rId subscriptions
sendMessage :: QueueStore -> MsgBody -> m (Transmission BrokerMsg)
sendMessage st msgBody
@@ -372,7 +371,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
trySendNotification :: STM ()
trySendNotification =
forM_ (notifier qr) $ \(nId, _) ->
mapM_ (writeNtf nId) . M.lookup nId =<< readTVar notifiers
mapM_ (writeNtf nId) =<< TM.lookup nId notifiers
writeNtf :: NotifierId -> Client -> STM ()
writeNtf nId Client {sndQ = q} =
@@ -406,7 +405,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
void setDelivered
setSub :: (Sub -> Sub) -> STM ()
setSub f = modifyTVar subscriptions $ M.adjust f rId
setSub f = TM.adjust f rId subscriptions
setDelivered :: STM (Maybe Bool)
setDelivered = withSub rId $ \s -> tryPutTMVar (delivered s) ()

View File

@@ -21,6 +21,8 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport)
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
import System.IO (IOMode (..))
@@ -53,14 +55,14 @@ data Env = Env
data Server = Server
{ subscribedQ :: TBQueue (RecipientId, Client),
subscribers :: TVar (Map RecipientId Client),
subscribers :: TMap RecipientId Client,
ntfSubscribedQ :: TBQueue (NotifierId, Client),
notifiers :: TVar (Map NotifierId Client)
notifiers :: TMap NotifierId Client
}
data Client = Client
{ subscriptions :: TVar (Map RecipientId Sub),
ntfSubscriptions :: TVar (Map NotifierId ()),
{ subscriptions :: TMap RecipientId Sub,
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (Transmission Cmd),
sndQ :: TBQueue (Transmission BrokerMsg),
sessionId :: ByteString,
@@ -77,15 +79,15 @@ data Sub = Sub
newServer :: Natural -> STM Server
newServer qSize = do
subscribedQ <- newTBQueue qSize
subscribers <- newTVar M.empty
subscribers <- TM.empty
ntfSubscribedQ <- newTBQueue qSize
notifiers <- newTVar M.empty
notifiers <- TM.empty
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers}
newClient :: Natural -> ByteString -> STM Client
newClient qSize sessionId = do
subscriptions <- newTVar M.empty
ntfSubscriptions <- newTVar M.empty
subscriptions <- TM.empty
ntfSubscriptions <- TM.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
connected <- newTVar True
@@ -109,15 +111,12 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile}
return Env {config, server, serverIdentity, queueStore, msgStore, idsDrg, storeLog = s', tlsServerParams}
where
restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
restoreQueues queueStore s = do
(queues, s') <- liftIO $ readWriteStoreLog s
atomically $
modifyTVar queueStore $ \d ->
d
{ queues,
senders = M.foldr' addSender M.empty queues,
notifiers = M.foldr' addNotifier M.empty queues
}
restoreQueues QueueStore {queues, senders, notifiers} s = do
(qs, s') <- liftIO $ readWriteStoreLog s
atomically $ do
writeTVar queues =<< mapM newTVar qs
writeTVar senders $ M.foldr' addSender M.empty qs
writeTVar notifiers $ M.foldr' addNotifier M.empty qs
pure s'
addSender :: QueueRec -> Map SenderId RecipientId -> Map SenderId RecipientId
addSender q = M.insert (senderId q) (recipientId q)

View File

@@ -6,36 +6,31 @@
module Simplex.Messaging.Server.MsgStore.STM where
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Numeric.Natural
import Simplex.Messaging.Protocol (RecipientId)
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import UnliftIO.STM
newtype MsgQueue = MsgQueue {msgQueue :: TBQueue Message}
newtype MsgStoreData = MsgStoreData {messages :: Map RecipientId MsgQueue}
type STMMsgStore = TVar MsgStoreData
type STMMsgStore = TMap RecipientId MsgQueue
newMsgStore :: STM STMMsgStore
newMsgStore = newTVar $ MsgStoreData M.empty
newMsgStore = TM.empty
instance MonadMsgStore STMMsgStore MsgQueue STM where
getMsgQueue :: STMMsgStore -> RecipientId -> Natural -> STM MsgQueue
getMsgQueue store rId quota = do
m <- messages <$> readTVar store
maybe (newQ m) return $ M.lookup rId m
getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st
where
newQ m' = do
newQ = do
q <- MsgQueue <$> newTBQueue quota
writeTVar store . MsgStoreData $ M.insert rId q m'
TM.insert rId q st
return q
delMsgQueue :: STMMsgStore -> RecipientId -> STM ()
delMsgQueue store rId =
modifyTVar store $ MsgStoreData . M.delete rId . messages
delMsgQueue st rId = TM.delete rId st
instance MonadMsgQueue MsgQueue STM where
isFull :: MsgQueue -> STM Bool

View File

@@ -15,6 +15,7 @@ data QueueRec = QueueRec
notifier :: Maybe (NotifierId, NtfPublicVerifyKey),
status :: QueueStatus
}
deriving (Eq, Show)
data QueueStatus = QueueActive | QueueOff deriving (Eq, Show)

View File

@@ -3,6 +3,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
@@ -11,107 +12,83 @@
module Simplex.Messaging.Server.QueueStore.STM where
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Control.Monad
import Data.Functor (($>))
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM)
import UnliftIO.STM
data QueueStoreData = QueueStoreData
{ queues :: Map RecipientId QueueRec,
senders :: Map SenderId RecipientId,
notifiers :: Map NotifierId RecipientId
data QueueStore = QueueStore
{ queues :: TMap RecipientId (TVar QueueRec),
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId
}
type QueueStore = TVar QueueStoreData
newQueueStore :: STM QueueStore
newQueueStore = newTVar QueueStoreData {queues = M.empty, senders = M.empty, notifiers = M.empty}
newQueueStore = do
queues <- TM.empty
senders <- TM.empty
notifiers <- TM.empty
pure QueueStore {queues, senders, notifiers}
instance MonadQueueStore QueueStore STM where
addQueue :: QueueStore -> QueueRec -> STM (Either ErrorType ())
addQueue store qRec@QueueRec {recipientId = rId, senderId = sId} = do
cs@QueueStoreData {queues, senders} <- readTVar store
if M.member rId queues || M.member sId senders
then return $ Left DUPLICATE_
else do
writeTVar store $
cs
{ queues = M.insert rId qRec queues,
senders = M.insert sId rId senders
}
return $ Right ()
addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = sId} = do
ifM hasId (pure $ Left DUPLICATE_) $ do
qVar <- newTVar q
TM.insert rId qVar queues
TM.insert sId rId senders
pure $ Right ()
where
hasId = (||) <$> TM.member rId queues <*> TM.member sId senders
getQueue :: QueueStore -> SParty p -> QueueId -> STM (Either ErrorType QueueRec)
getQueue st party qId = do
cs <- readTVar st
pure $ case party of
SRecipient -> getRcpQueue cs qId
SSender -> getPartyQueue cs senders
SNotifier -> getPartyQueue cs notifiers
getQueue QueueStore {queues, senders, notifiers} party qId =
toResult <$> (mapM readTVar =<< getVar)
where
getPartyQueue ::
QueueStoreData ->
(QueueStoreData -> Map QueueId RecipientId) ->
Either ErrorType QueueRec
getPartyQueue cs recipientIds =
case M.lookup qId $ recipientIds cs of
Just rId -> getRcpQueue cs rId
Nothing -> Left AUTH
getVar = case party of
SRecipient -> TM.lookup qId queues
SSender -> TM.lookup qId senders >>= get
SNotifier -> TM.lookup qId notifiers >>= get
get = fmap join . mapM (`TM.lookup` queues)
secureQueue :: QueueStore -> RecipientId -> SndPublicVerifyKey -> STM (Either ErrorType QueueRec)
secureQueue store rId sKey =
updateQueues store rId $ \cs c ->
case senderKey c of
Just _ -> (Left AUTH, cs)
_ -> (Right c, cs {queues = M.insert rId c {senderKey = Just sKey} (queues cs)})
secureQueue QueueStore {queues} rId sKey =
withQueue rId queues $ \qVar ->
readTVar qVar >>= \q -> case senderKey q of
Just _ -> pure Nothing
_ -> writeTVar qVar q {senderKey = Just sKey} $> Just q
addQueueNotifier :: QueueStore -> RecipientId -> NotifierId -> NtfPublicVerifyKey -> STM (Either ErrorType QueueRec)
addQueueNotifier store rId nId nKey = do
cs@QueueStoreData {queues, notifiers} <- readTVar store
if M.member nId notifiers
then pure $ Left DUPLICATE_
else case M.lookup rId queues of
Nothing -> pure $ Left AUTH
Just q -> case notifier q of
Just _ -> pure $ Left AUTH
addQueueNotifier QueueStore {queues, notifiers} rId nId nKey = do
ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $
withQueue rId queues $ \qVar ->
readTVar qVar >>= \q -> case notifier q of
Just _ -> pure Nothing
_ -> do
writeTVar store $
cs
{ queues = M.insert rId q {notifier = Just (nId, nKey)} queues,
notifiers = M.insert nId rId notifiers
}
pure $ Right q
writeTVar qVar q {notifier = Just (nId, nKey)}
TM.insert nId rId notifiers
pure $ Just q
suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
suspendQueue store rId =
updateQueues store rId $ \cs c ->
(Right (), cs {queues = M.insert rId c {status = QueueOff} (queues cs)})
suspendQueue QueueStore {queues} rId =
withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just ()
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
deleteQueue store rId =
updateQueues store rId $ \cs c ->
( Right (),
cs
{ queues = M.delete rId (queues cs),
senders = M.delete (senderId c) (senders cs)
}
)
deleteQueue QueueStore {queues, senders, notifiers} rId = do
TM.lookupDelete rId queues >>= \case
Just qVar ->
readTVar qVar >>= \q -> do
TM.delete (senderId q) senders
forM_ (notifier q) $ \(nId, _) -> TM.delete nId notifiers
pure $ Right ()
_ -> pure $ Left AUTH
updateQueues ::
QueueStore ->
RecipientId ->
(QueueStoreData -> QueueRec -> (Either ErrorType a, QueueStoreData)) ->
STM (Either ErrorType a)
updateQueues store rId update = do
cs <- readTVar store
let conn = getRcpQueue cs rId
either (return . Left) (_update cs) conn
where
_update cs c = do
let (res, cs') = update cs c
writeTVar store cs'
return res
toResult :: Maybe a -> Either ErrorType a
toResult = maybe (Left AUTH) Right
getRcpQueue :: QueueStoreData -> RecipientId -> Either ErrorType QueueRec
getRcpQueue cs rId = maybe (Left AUTH) Right . M.lookup rId $ queues cs
withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM (Maybe a)) -> STM (Either ErrorType a)
withQueue rId queues f = toResult <$> (TM.lookup rId queues >>= fmap join . mapM f)

View File

@@ -0,0 +1,70 @@
module Simplex.Messaging.TMap
( TMap,
empty,
singleton,
Simplex.Messaging.TMap.lookup,
member,
insert,
delete,
lookupInsert,
lookupDelete,
adjust,
update,
alter,
union,
)
where
import Control.Concurrent.STM
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
type TMap k a = TVar (Map k a)
empty :: STM (TMap k a)
empty = newTVar M.empty
{-# INLINE empty #-}
singleton :: k -> a -> STM (TMap k a)
singleton k v = newTVar $ M.singleton k v
{-# INLINE singleton #-}
lookup :: Ord k => k -> TMap k a -> STM (Maybe a)
lookup k m = M.lookup k <$> readTVar m
{-# INLINE lookup #-}
member :: Ord k => k -> TMap k a -> STM Bool
member k m = M.member k <$> readTVar m
{-# INLINE member #-}
insert :: Ord k => k -> a -> TMap k a -> STM ()
insert k v m = modifyTVar' m $ M.insert k v
{-# INLINE insert #-}
delete :: Ord k => k -> TMap k a -> STM ()
delete k m = modifyTVar' m $ M.delete k
{-# INLINE delete #-}
lookupInsert :: Ord k => k -> a -> TMap k a -> STM (Maybe a)
lookupInsert k v m = stateTVar m $ \mv -> (M.lookup k mv, M.insert k v mv)
{-# INLINE lookupInsert #-}
lookupDelete :: Ord k => k -> TMap k a -> STM (Maybe a)
lookupDelete k m = stateTVar m $ \mv -> (M.lookup k mv, M.delete k mv)
{-# INLINE lookupDelete #-}
adjust :: Ord k => (a -> a) -> k -> TMap k a -> STM ()
adjust f k m = modifyTVar' m $ M.adjust f k
{-# INLINE adjust #-}
update :: Ord k => (a -> Maybe a) -> k -> TMap k a -> STM ()
update f k m = modifyTVar' m $ M.update f k
{-# INLINE update #-}
alter :: Ord k => (Maybe a -> Maybe a) -> k -> TMap k a -> STM ()
alter f k m = modifyTVar' m $ M.alter f k
{-# INLINE alter #-}
union :: Ord k => Map k a -> TMap k a -> STM ()
union m' m = modifyTVar' m $ M.union m'
{-# INLINE union #-}

View File

@@ -41,7 +41,7 @@ runTransportServer started port serverParams server = do
$ \sock -> forever $ do
(connSock, _) <- accept sock
tid <- forkIO $ connectClient u connSock `E.catch` \(_ :: E.SomeException) -> pure ()
atomically . modifyTVar clients $ S.insert tid
atomically . modifyTVar' clients $ S.insert tid
where
connectClient :: UnliftIO m -> Socket -> IO ()
connectClient u connSock =

View File

@@ -58,6 +58,10 @@ ifM :: Monad m => m Bool -> m a -> m a -> m a
ifM ba t f = ba >>= \b -> if b then t else f
{-# INLINE ifM #-}
whenM :: Monad m => m Bool -> m () -> m ()
whenM b a = ifM b a $ pure ()
{-# INLINE whenM #-}
unlessM :: Monad m => m Bool -> m () -> m ()
unlessM b = ifM b $ pure ()
{-# INLINE unlessM #-}