Merge branch 'master' into xftp

This commit is contained in:
Evgeny Poberezkin
2023-02-08 20:57:24 +00:00
11 changed files with 135 additions and 83 deletions
+12 -11
View File
@@ -12,6 +12,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- Module : Simplex.Messaging.Agent
@@ -472,7 +473,7 @@ deleteConnectionsAsync_ :: forall m. AgentMonad m => m () -> AgentClient -> [Con
deleteConnectionsAsync_ onSuccess c connIds = case connIds of
[] -> onSuccess
_ -> do
(_, rqs, connIds') <- prepareDeleteConnections_ getConn c connIds
(_, rqs, connIds') <- prepareDeleteConnections_ getConns c connIds
withStore' c $ forM_ connIds' . setConnDeleted
void . forkIO $
withLock (deleteLock c) "deleteConnectionsAsync" $
@@ -628,7 +629,7 @@ type QCmdResult = (QueueStatus, Either AgentErrorType ())
subscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
subscribeConnections' _ [] = pure M.empty
subscribeConnections' c connIds = do
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConn)
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (`getConns` connIds)
let (errs, cs) = M.mapEither id conns
errs' = M.map (Left . storeError) errs
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
@@ -1193,20 +1194,20 @@ disableConn c connId = do
-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure.
deleteConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
deleteConnections' = deleteConnections_ getConn False
deleteConnections' = deleteConnections_ getConns False
deleteDeletedConns :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
deleteDeletedConns = deleteConnections_ getDeletedConn True
deleteDeletedConns = deleteConnections_ getDeletedConns True
prepareDeleteConnections_ ::
forall m.
AgentMonad m =>
(DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) ->
(DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]) ->
AgentClient ->
[ConnId] ->
m (Map ConnId (Either AgentErrorType ()), [RcvQueue], [ConnId])
prepareDeleteConnections_ getConnection c connIds = do
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConnection)
prepareDeleteConnections_ getConnections c connIds = do
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (`getConnections` connIds)
let (errs, cs) = M.mapEither id conns
errs' = M.map (Left . storeError) errs
(delRs, rcvQs) = M.mapEither rcvQueues cs
@@ -1259,14 +1260,14 @@ deleteConnQueues c ntf rqs = do
deleteConnections_ ::
forall m.
AgentMonad m =>
(DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) ->
(DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]) ->
Bool ->
AgentClient ->
[ConnId] ->
m (Map ConnId (Either AgentErrorType ()))
deleteConnections_ _ _ _ [] = pure M.empty
deleteConnections_ getConnection ntf c connIds = do
(rs, rqs, _) <- prepareDeleteConnections_ getConnection c connIds
deleteConnections_ getConnections ntf c connIds = do
(rs, rqs, _) <- prepareDeleteConnections_ getConnections c connIds
rcvRs <- deleteConnQueues c ntf rqs
let rs' = M.union rs rcvRs
notifyResultError rs'
@@ -1576,7 +1577,7 @@ cleanupManager c = do
forever $ do
void . runExceptT $
withLock (deleteLock c) "cleanupManager" $ do
void $ withStore' c getDeletedConns >>= deleteDeletedConns c
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
withStore' c deleteUsersWithoutConns >>= mapM_ notifyUserDeleted
threadDelay int
where
+2 -2
View File
@@ -520,7 +520,7 @@ throwWhenNoDelivery c SndQueue {server, sndId} =
closeProtocolServerClients :: AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO ()
closeProtocolServerClients c clientsSel =
readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty)
atomically (swapTVar cs M.empty) >>= mapM_ (forkIO . closeClient)
where
cs = clientsSel c
closeClient cVar = do
@@ -530,7 +530,7 @@ closeProtocolServerClients c clientsSel =
_ -> pure ()
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
cancelActions as = readTVarIO as >>= mapM_ (forkIO . uninterruptibleCancel) >> atomically (writeTVar as mempty)
cancelActions as = atomically (swapTVar as mempty) >>= mapM_ (forkIO . uninterruptibleCancel)
withConnLock :: MonadUnliftIO m => AgentClient -> ConnId -> String -> m a -> m a
withConnLock _ "" _ = id
+17 -3
View File
@@ -43,9 +43,11 @@ module Simplex.Messaging.Agent.Store.SQLite
createSndConn,
getConn,
getDeletedConn,
getConns,
getDeletedConns,
getConnData,
setConnDeleted,
getDeletedConns,
getDeletedConnIds,
getRcvConn,
deleteConn,
upgradeRcvConnToDuplex,
@@ -1404,6 +1406,18 @@ getAnyConn deleted' dbConn connId =
(Nothing, Nothing, _) -> Right $ SomeConn SCNew (NewConnection cData)
_ -> Left SEConnNotFound
getConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getConns = getAnyConns_ False
getDeletedConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getDeletedConns = getAnyConns_ True
getAnyConns_ :: Bool -> DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAnyConn deleted' db
where
handleDBError :: E.SomeException -> IO (Either StoreError SomeConn)
handleDBError = pure . Left . SEInternal . bshow
getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode))
getConnData dbConn connId' =
maybeFirstRow cData $ DB.query dbConn "SELECT user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, deleted FROM connections WHERE conn_id = ?;" (Only connId')
@@ -1413,8 +1427,8 @@ getConnData dbConn connId' =
setConnDeleted :: DB.Connection -> ConnId -> IO ()
setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
getDeletedConns :: DB.Connection -> IO [ConnId]
getDeletedConns db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True)
getDeletedConnIds :: DB.Connection -> IO [ConnId]
getDeletedConnIds db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True)
-- | returns all connection queues, the first queue is the primary one
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
+7 -3
View File
@@ -26,6 +26,7 @@ import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Char (isAsciiLower, isDigit)
import Data.Default (def)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
@@ -70,11 +71,14 @@ instance StrEncoding TransportHost where
strP =
A.choice
[ THIPv4 <$> ((,,,) <$> ipNum <*> ipNum <*> ipNum <*> A.decimal),
THOnionHost <$> ((<>) <$> A.takeTill (== '.') <*> A.string ".onion"),
THDomainName . B.unpack <$> A.takeWhile1 (A.notInClass ":#,;/ ")
THOnionHost <$> ((<>) <$> A.takeWhile (\c -> isAsciiLower c || isDigit c) <*> A.string ".onion"),
THDomainName . B.unpack <$> (notOnion <$?> A.takeWhile1 (A.notInClass ":#,;/ \n\r\t"))
]
where
ipNum = A.decimal <* A.char '.'
ipNum = validIP <$?> (A.decimal <* A.char '.')
validIP :: Int -> Either String Word8
validIP n = if 0 <= n && n <= 255 then Right $ fromIntegral n else Left "invalid IP address"
notOnion s = if ".onion" `B.isSuffixOf` s then Left "invalid onion host" else Right s
instance ToJSON TransportHost where
toEncoding = strToJEncoding