refactor functions operating on single connections using batch functions (#620)

* refactor functions operating on single connections using batch functions

* update tests
This commit is contained in:
Evgeny Poberezkin
2023-01-24 13:14:13 +00:00
committed by GitHub
parent 2ccef1690b
commit b59669a65e
3 changed files with 16 additions and 83 deletions

View File

@@ -90,7 +90,7 @@ module Simplex.Messaging.Agent
where
import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple (logInfo, showText)
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
@@ -463,20 +463,7 @@ ackMessageAsync' c corrId connId msgId = do
enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnectionAsync' c connId = withConnLock c connId "deleteConnectionAsync" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
case connRcvQueues conn of
[] -> delete
rqs -> do
withStore' c (`setConnDeleted` connId)
disableConn c connId
void . forkIO $ do
forM_ rqs $ deleteConnQueue c True
delete
where
delete = do
withStore' c (`deleteConn` connId)
atomically $ writeTBQueue (subQ c) ("", connId, DEL_CONN)
deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId]
deleteConnectionsAsync' :: AgentMonad m => AgentClient -> [ConnId] -> m ()
deleteConnectionsAsync' = deleteConnectionsAsync_ $ pure ()
@@ -627,30 +614,14 @@ rejectContact' c contactConnId invId =
withStore c $ \db -> deleteInvitation db contactConnId invId
-- | Subscribe to receive connection messages (SUB command) in Reader monad
subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId = do
SomeConn _ conn <- withStore c (`getConn` connId)
resumeConnCmds c connId
case conn of
DuplexConnection cData (rq :| rqs) sqs -> do
mapM_ (resumeMsgDelivery c cData) sqs
subscribe cData rq
mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs
SndConnection cData sq -> do
resumeMsgDelivery c cData sq
case status (sq :: SndQueue) of
Confirmed -> pure ()
Active -> throwError $ CONN SIMPLEX
_ -> throwError $ INTERNAL "unexpected queue status"
RcvConnection cData rq -> subscribe cData rq
ContactConnection cData rq -> subscribe cData rq
NewConnection _ -> pure ()
where
subscribe :: ConnData -> RcvQueue -> m ()
subscribe ConnData {enableNtfs} rq = do
subscribeQueue c rq
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, if enableNtfs then NSCCreate else NSCDelete)
subscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId = toConnResult connId =<< subscribeConnections' c [connId]
toConnResult :: AgentMonad m => ConnId -> Map ConnId (Either AgentErrorType ()) -> m ()
toConnResult connId rs = case M.lookup connId rs of
Just (Right ()) -> when (M.size rs > 1) $ logError $ T.pack $ "too many results " <> show (M.size rs)
Just (Left e) -> throwError e
_ -> throwError $ INTERNAL $ "no result for connection " <> B.unpack connId
type QCmdResult = (QueueStatus, Either AgentErrorType ())
@@ -720,10 +691,7 @@ subscribeConnections' c connIds = do
writeTBQueue (subQ c) ("", "", ERR . INTERNAL $ "subscribeConnections result size: " <> show actual <> ", expected " <> show expected)
resubscribeConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
resubscribeConnection' c connId =
unlessM
(atomically $ hasActiveSubscription c connId)
(subscribeConnection' c connId)
resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId]
resubscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
resubscribeConnections' _ [] = pure M.empty
@@ -1207,13 +1175,7 @@ suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do
-- unlike deleteConnectionAsync, this function does not mark connection as deleted in case of deletion failure
-- currently it is used only in tests
deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
let rqs = connRcvQueues conn
unless (null rqs) $ do
disableConn c connId
forM_ rqs $ deleteConnQueue c False
withStore' c (`deleteConn` connId)
deleteConnection' c connId = toConnResult connId =<< deleteConnections' c [connId]
connRcvQueues :: Connection d -> [RcvQueue]
connRcvQueues = \case
@@ -1223,19 +1185,6 @@ connRcvQueues = \case
SndConnection _ _ -> []
NewConnection _ -> []
deleteConnQueue :: AgentMonad m => AgentClient -> Bool -> RcvQueue -> m ()
deleteConnQueue c ntf rq = do
maxErrs <- asks $ deleteErrorCount . config
(deleteQueue c rq >> notify Nothing) `catchError` handleError maxErrs
withStore' c (`deleteConnRcvQueue` rq)
where
handleError maxErrs e
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs = do
withStore' c (`incRcvDeleteErrors` rq)
throwError e
| otherwise = notify $ Just e
notify e_ = when ntf $ atomically $ writeTBQueue (subQ c) ("", qConnId rq, DEL_RCVQ (qServer rq) (queueId rq) e_)
disableConn :: AgentMonad m => AgentClient -> ConnId -> m ()
disableConn c connId = do
atomically $ removeSubscription c connId

View File

@@ -25,7 +25,6 @@ module Simplex.Messaging.Agent.Client
closeProtocolServerClients,
runSMPServerTest,
newRcvQueue,
subscribeQueue,
subscribeQueues,
getQueueMessage,
decryptSMPMessage,
@@ -683,21 +682,6 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange = do
}
pure (rq, SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey)
subscribeQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do
whenM (atomically . TM.member (server, rcvId) $ getMsgLocks c) . throwError $ CMD PROHIBITED
atomically $ do
modifyTVar' (subscrConns c) $ S.insert connId
RQ.addQueue rq $ pendingSubs c
r <- withSMPClient c rq "SUB" $ \smp ->
liftIO $ runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq
case r of
Left e -> do
tSess <- mkSMPTransportSession c rq
when (temporaryClientError e) $ reconnectServer c tSess
throwError (protocolClientError SMP (B.unpack $ strEncode server) e)
_ -> pure ()
processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ())
processSubResult c rq r = do
case r of

View File

@@ -751,9 +751,9 @@ testDeleteConnectionAsync t = do
pure ([bId1, bId2, bId3] :: [ConnId])
runRight_ $ do
deleteConnectionsAsync a connIds
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
@@ -792,7 +792,7 @@ testUsersNoServer t = do
get b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
runRight_ $ do
deleteUser a auId True
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ TIMEOUT))) -> c == bId'; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
get a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"