From b59669a65eafeb92d4a986a03e481ecb343ee307 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 24 Jan 2023 13:14:13 +0000 Subject: [PATCH] refactor functions operating on single connections using batch functions (#620) * refactor functions operating on single connections using batch functions * update tests --- src/Simplex/Messaging/Agent.hs | 75 +++++--------------------- src/Simplex/Messaging/Agent/Client.hs | 16 ------ tests/AgentTests/FunctionalAPITests.hs | 8 +-- 3 files changed, 16 insertions(+), 83 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index fc8315809..d45f15c43 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -90,7 +90,7 @@ module Simplex.Messaging.Agent where import Control.Concurrent.STM (stateTVar) -import Control.Logger.Simple (logInfo, showText) +import Control.Logger.Simple (logError, logInfo, showText) import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader @@ -463,20 +463,7 @@ ackMessageAsync' c corrId connId msgId = do enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () -deleteConnectionAsync' c connId = withConnLock c connId "deleteConnectionAsync" $ do - SomeConn _ conn <- withStore c (`getConn` connId) - case connRcvQueues conn of - [] -> delete - rqs -> do - withStore' c (`setConnDeleted` connId) - disableConn c connId - void . forkIO $ do - forM_ rqs $ deleteConnQueue c True - delete - where - delete = do - withStore' c (`deleteConn` connId) - atomically $ writeTBQueue (subQ c) ("", connId, DEL_CONN) +deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId] deleteConnectionsAsync' :: AgentMonad m => AgentClient -> [ConnId] -> m () deleteConnectionsAsync' = deleteConnectionsAsync_ $ pure () @@ -627,30 +614,14 @@ rejectContact' c contactConnId invId = withStore c $ \db -> deleteInvitation db contactConnId invId -- | Subscribe to receive connection messages (SUB command) in Reader monad -subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () -subscribeConnection' c connId = do - SomeConn _ conn <- withStore c (`getConn` connId) - resumeConnCmds c connId - case conn of - DuplexConnection cData (rq :| rqs) sqs -> do - mapM_ (resumeMsgDelivery c cData) sqs - subscribe cData rq - mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs - SndConnection cData sq -> do - resumeMsgDelivery c cData sq - case status (sq :: SndQueue) of - Confirmed -> pure () - Active -> throwError $ CONN SIMPLEX - _ -> throwError $ INTERNAL "unexpected queue status" - RcvConnection cData rq -> subscribe cData rq - ContactConnection cData rq -> subscribe cData rq - NewConnection _ -> pure () - where - subscribe :: ConnData -> RcvQueue -> m () - subscribe ConnData {enableNtfs} rq = do - subscribeQueue c rq - ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, if enableNtfs then NSCCreate else NSCDelete) +subscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m () +subscribeConnection' c connId = toConnResult connId =<< subscribeConnections' c [connId] + +toConnResult :: AgentMonad m => ConnId -> Map ConnId (Either AgentErrorType ()) -> m () +toConnResult connId rs = case M.lookup connId rs of + Just (Right ()) -> when (M.size rs > 1) $ logError $ T.pack $ "too many results " <> show (M.size rs) + Just (Left e) -> throwError e + _ -> throwError $ INTERNAL $ "no result for connection " <> B.unpack connId type QCmdResult = (QueueStatus, Either AgentErrorType ()) @@ -720,10 +691,7 @@ subscribeConnections' c connIds = do writeTBQueue (subQ c) ("", "", ERR . INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected) resubscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m () -resubscribeConnection' c connId = - unlessM - (atomically $ hasActiveSubscription c connId) - (subscribeConnection' c connId) +resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId] resubscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ())) resubscribeConnections' _ [] = pure M.empty @@ -1207,13 +1175,7 @@ suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do -- unlike deleteConnectionAsync, this function does not mark connection as deleted in case of deletion failure -- currently it is used only in tests deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () -deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do - SomeConn _ conn <- withStore c (`getConn` connId) - let rqs = connRcvQueues conn - unless (null rqs) $ do - disableConn c connId - forM_ rqs $ deleteConnQueue c False - withStore' c (`deleteConn` connId) +deleteConnection' c connId = toConnResult connId =<< deleteConnections' c [connId] connRcvQueues :: Connection d -> [RcvQueue] connRcvQueues = \case @@ -1223,19 +1185,6 @@ connRcvQueues = \case SndConnection _ _ -> [] NewConnection _ -> [] -deleteConnQueue :: AgentMonad m => AgentClient -> Bool -> RcvQueue -> m () -deleteConnQueue c ntf rq = do - maxErrs <- asks $ deleteErrorCount . config - (deleteQueue c rq >> notify Nothing) `catchError` handleError maxErrs - withStore' c (`deleteConnRcvQueue` rq) - where - handleError maxErrs e - | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs = do - withStore' c (`incRcvDeleteErrors` rq) - throwError e - | otherwise = notify $ Just e - notify e_ = when ntf $ atomically $ writeTBQueue (subQ c) ("", qConnId rq, DEL_RCVQ (qServer rq) (queueId rq) e_) - disableConn :: AgentMonad m => AgentClient -> ConnId -> m () disableConn c connId = do atomically $ removeSubscription c connId diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 3975c8d12..b683a2786 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -25,7 +25,6 @@ module Simplex.Messaging.Agent.Client closeProtocolServerClients, runSMPServerTest, newRcvQueue, - subscribeQueue, subscribeQueues, getQueueMessage, decryptSMPMessage, @@ -683,21 +682,6 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange = do } pure (rq, SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey) -subscribeQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () -subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do - whenM (atomically . TM.member (server, rcvId) $ getMsgLocks c) . throwError $ CMD PROHIBITED - atomically $ do - modifyTVar' (subscrConns c) $ S.insert connId - RQ.addQueue rq $ pendingSubs c - r <- withSMPClient c rq "SUB" $ \smp -> - liftIO $ runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq - case r of - Left e -> do - tSess <- mkSMPTransportSession c rq - when (temporaryClientError e) $ reconnectServer c tSess - throwError (protocolClientError SMP (B.unpack $ strEncode server) e) - _ -> pure () - processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ()) processSubResult c rq r = do case r of diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index f3052ccfe..bdcd1d661 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -751,9 +751,9 @@ testDeleteConnectionAsync t = do pure ([bId1, bId2, bId3] :: [ConnId]) runRight_ $ do deleteConnectionsAsync a connIds - get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False - get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False - get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False + get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False + get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False + get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False @@ -792,7 +792,7 @@ testUsersNoServer t = do get b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False runRight_ $ do deleteUser a auId True - get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c == bId'; _ -> False + get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False get a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice"