mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 18:35:59 +00:00
async batch connection deletion (#617)
* async batch connection deletion * delete user record with connections, async connection deletion * updates * update query
This commit is contained in:
committed by
GitHub
parent
f66e8239f4
commit
d4fc638478
@@ -57,6 +57,7 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
Simplex.Messaging.Client
|
||||
|
||||
@@ -47,6 +47,7 @@ module Simplex.Messaging.Agent
|
||||
ackMessageAsync,
|
||||
switchConnectionAsync,
|
||||
deleteConnectionAsync,
|
||||
deleteConnectionsAsync,
|
||||
createConnection,
|
||||
joinConnection,
|
||||
allowConnection,
|
||||
@@ -113,6 +114,7 @@ import Data.Time.Clock.System (systemToUTCTime)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Lock (withLock)
|
||||
import Simplex.Messaging.Agent.NtfSubSupervisor
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
@@ -146,7 +148,7 @@ getSMPAgentClient cfg initServers = newSMPAgentEnv cfg >>= runReaderT runAgent
|
||||
where
|
||||
runAgent = do
|
||||
c <- getAgentClient initServers
|
||||
void $ race_ (subscriber c) (runNtfSupervisor c) `forkFinally` const (disconnectAgentClient c)
|
||||
void $ raceAny_ [subscriber c, runNtfSupervisor c, cleanupManager c] `forkFinally` const (disconnectAgentClient c)
|
||||
pure c
|
||||
|
||||
disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m ()
|
||||
@@ -158,14 +160,14 @@ disconnectAgentClient c@AgentClient {agentEnv = Env {ntfSupervisor = ns}} = do
|
||||
resumeAgentClient :: MonadIO m => AgentClient -> m ()
|
||||
resumeAgentClient c = atomically $ writeTVar (active c) True
|
||||
|
||||
-- |
|
||||
type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m)
|
||||
|
||||
createUser :: AgentErrorMonad m => AgentClient -> NonEmpty SMPServerWithAuth -> m UserId
|
||||
createUser c = withAgentEnv c . createUser' c
|
||||
|
||||
deleteUser :: AgentErrorMonad m => AgentClient -> UserId -> m ()
|
||||
deleteUser c = withAgentEnv c . deleteUser' c
|
||||
-- | Delete user record optionally deleting all user's connections on SMP servers
|
||||
deleteUser :: AgentErrorMonad m => AgentClient -> UserId -> Bool -> m ()
|
||||
deleteUser c = withAgentEnv c .: deleteUser' c
|
||||
|
||||
-- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id
|
||||
createConnectionAsync :: forall m c. (AgentErrorMonad m, ConnectionModeI c) => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> m ConnId
|
||||
@@ -192,8 +194,12 @@ switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -
|
||||
switchConnectionAsync c = withAgentEnv c .: switchConnectionAsync' c
|
||||
|
||||
-- | Delete SMP agent connection (DEL command) asynchronously, no synchronous response
|
||||
deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ()
|
||||
deleteConnectionAsync c = withAgentEnv c .: deleteConnectionAsync' c
|
||||
deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteConnectionAsync c = withAgentEnv c . deleteConnectionAsync' c
|
||||
|
||||
-- -- | Delete SMP agent connections using batch commands asynchronously, no synchronous response
|
||||
deleteConnectionsAsync :: AgentErrorMonad m => AgentClient -> [ConnId] -> m ()
|
||||
deleteConnectionsAsync c = withAgentEnv c . deleteConnectionsAsync' c
|
||||
|
||||
-- | Create SMP agent connection (NEW command)
|
||||
createConnection :: AgentErrorMonad m => AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe CRClientData -> m (ConnId, ConnectionRequestUri c)
|
||||
@@ -384,9 +390,11 @@ createUser' c srvs = do
|
||||
atomically $ TM.insert userId srvs $ smpServers c
|
||||
pure userId
|
||||
|
||||
deleteUser' :: AgentMonad m => AgentClient -> UserId -> m ()
|
||||
deleteUser' c userId = do
|
||||
withStore c (`deleteUserRecord` userId)
|
||||
deleteUser' :: AgentMonad m => AgentClient -> UserId -> Bool -> m ()
|
||||
deleteUser' c userId delSMPQueues = do
|
||||
if delSMPQueues
|
||||
then withStore c (`setUserDeleted` userId) >>= deleteConnectionsAsync_ (Just userId) c
|
||||
else withStore c (`deleteUserRecord` userId)
|
||||
atomically $ TM.delete userId $ smpServers c
|
||||
|
||||
newConnAsync :: forall m c. (AgentMonad m, ConnectionModeI c) => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> m ConnId
|
||||
@@ -450,23 +458,36 @@ ackMessageAsync' c corrId connId msgId = do
|
||||
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId
|
||||
|
||||
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ()
|
||||
deleteConnectionAsync' c@AgentClient {subQ} corrId connId = withConnLock c connId "deleteConnectionAsync" $ do
|
||||
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteConnectionAsync' c connId = withConnLock c connId "deleteConnectionAsync" $ do
|
||||
SomeConn _ conn <- withStore c (`getConn` connId)
|
||||
case conn of
|
||||
DuplexConnection _ (rq :| _) _ -> enqueueDelete rq
|
||||
RcvConnection _ rq -> enqueueDelete rq
|
||||
ContactConnection _ rq -> enqueueDelete rq
|
||||
SndConnection _ _ -> delete
|
||||
NewConnection _ -> delete
|
||||
where
|
||||
enqueueDelete :: RcvQueue -> m ()
|
||||
enqueueDelete RcvQueue {server} = do
|
||||
withStore' c $ \db -> setConnDeleted db connId
|
||||
case connRcvQueues conn of
|
||||
[] -> delete
|
||||
rqs -> do
|
||||
withStore' c (`setConnDeleted` connId)
|
||||
disableConn c connId
|
||||
enqueueCommand c corrId connId (Just server) $ AInternalCommand ICDeleteConn
|
||||
delete :: m ()
|
||||
delete = withStore' c (`deleteConn` connId) >> atomically (writeTBQueue subQ (corrId, connId, OK))
|
||||
void . forkIO $ do
|
||||
forM_ rqs $ deleteConnQueue c
|
||||
delete
|
||||
where
|
||||
delete = withStore' c (`deleteConn` connId)
|
||||
|
||||
deleteConnectionsAsync' :: AgentMonad m => AgentClient -> [ConnId] -> m ()
|
||||
deleteConnectionsAsync' = deleteConnectionsAsync_ Nothing
|
||||
|
||||
deleteConnectionsAsync_ :: forall m. AgentMonad m => Maybe UserId -> AgentClient -> [ConnId] -> m ()
|
||||
deleteConnectionsAsync_ userId_ c connIds = case connIds of
|
||||
[] -> delUser
|
||||
_ -> do
|
||||
(_, rqs, connIds') <- prepareDeleteConnections_ getConn c connIds
|
||||
withStore' c $ forM_ connIds' . setConnDeleted
|
||||
void . forkIO $
|
||||
withLock (deleteLock c) "deleteConnectionsAsync" $ do
|
||||
void $ deleteConnQueues c rqs
|
||||
delUser
|
||||
where
|
||||
delUser :: m ()
|
||||
delUser = forM_ userId_ $ \userId -> withStore' c (`deleteUserWithoutConns` userId)
|
||||
|
||||
-- | Add connection to the new receive queue
|
||||
switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ()
|
||||
@@ -851,7 +872,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
|
||||
enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO
|
||||
ICDeleteConn ->
|
||||
withServer $ \srv -> tryWithLock "ICDeleteConn" $ do
|
||||
SomeConn _ conn <- withStore c $ \db -> getAnyConn db connId True
|
||||
SomeConn _ conn <- withStore c $ \db -> getDeletedConn db connId
|
||||
case conn of
|
||||
DuplexConnection _ (rq :| rqs) _ -> delete srv rq $ case rqs of
|
||||
[] -> notify OK
|
||||
@@ -1196,25 +1217,36 @@ suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do
|
||||
NewConnection _ -> throwError $ CMD PROHIBITED
|
||||
|
||||
-- | Delete SMP agent connection (DEL command) in Reader monad
|
||||
-- unlike deleteConnectionAsync, this function does not mark connection as deleted in case of deletion failure
|
||||
-- currently it is used only in tests
|
||||
deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do
|
||||
SomeConn _ conn <- withStore c (`getConn` connId)
|
||||
case conn of
|
||||
DuplexConnection _ rqs _ -> mapM_ (deleteConnQueue c) rqs >> disableConn c connId >> deleteConn'
|
||||
RcvConnection _ rq -> delete rq
|
||||
ContactConnection _ rq -> delete rq
|
||||
SndConnection _ _ -> deleteConn'
|
||||
NewConnection _ -> deleteConn'
|
||||
where
|
||||
delete :: RcvQueue -> m ()
|
||||
delete rq = deleteConnQueue c rq >> disableConn c connId >> deleteConn'
|
||||
deleteConn' = withStore' c (`deleteConn` connId)
|
||||
let rqs = connRcvQueues conn
|
||||
unless (null rqs) $ do
|
||||
disableConn c connId
|
||||
forM_ rqs $ deleteConnQueue c
|
||||
withStore' c (`deleteConn` connId)
|
||||
|
||||
connRcvQueues :: Connection d -> [RcvQueue]
|
||||
connRcvQueues = \case
|
||||
DuplexConnection _ rqs _ -> L.toList rqs
|
||||
RcvConnection _ rq -> [rq]
|
||||
ContactConnection _ rq -> [rq]
|
||||
SndConnection _ _ -> []
|
||||
NewConnection _ -> []
|
||||
|
||||
deleteConnQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
deleteConnQueue c rq = do
|
||||
deleteQueue c rq
|
||||
withStore' c $ \db -> deleteConnRcvQueue db rq
|
||||
removeQueueSubscription c rq
|
||||
maxErrs <- asks $ deleteErrorCount . config
|
||||
deleteQueue c rq `catchError` handleError maxErrs
|
||||
withStore' c (`deleteConnRcvQueue` rq)
|
||||
where
|
||||
handleError maxErrs e
|
||||
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs = do
|
||||
withStore' c (`incRcvDeleteErrors` rq)
|
||||
throwError e
|
||||
| otherwise = pure ()
|
||||
|
||||
disableConn :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
disableConn c connId = do
|
||||
@@ -1222,31 +1254,54 @@ disableConn c connId = do
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
|
||||
|
||||
-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure.
|
||||
deleteConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
|
||||
deleteConnections' _ [] = pure M.empty
|
||||
deleteConnections' c connIds = do
|
||||
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConn)
|
||||
deleteConnections' = deleteConnections_ getConn
|
||||
|
||||
deleteDeletedConns :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
|
||||
deleteDeletedConns = deleteConnections_ getDeletedConn
|
||||
|
||||
prepareDeleteConnections_ ::
|
||||
forall m.
|
||||
AgentMonad m =>
|
||||
(DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) ->
|
||||
AgentClient ->
|
||||
[ConnId] ->
|
||||
m (Map ConnId (Either AgentErrorType ()), [RcvQueue], [ConnId])
|
||||
prepareDeleteConnections_ getConnection c connIds = do
|
||||
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (forM connIds . getConnection)
|
||||
let (errs, cs) = M.mapEither id conns
|
||||
errs' = M.map (Left . storeError) errs
|
||||
(delRs, rcvQs) = M.mapEither rcvQueueOrResult cs
|
||||
rcvRs <- deleteQueues c (concat $ M.elems rcvQs)
|
||||
forM_ rcvRs $ \case
|
||||
(rq, Right _) -> withStore' c (`deleteConnRcvQueue` rq) >> removeQueueSubscription c rq
|
||||
_ -> pure ()
|
||||
let rs = M.unions ([errs', delRs, connResults rcvRs] :: [Map ConnId (Either AgentErrorType ())])
|
||||
(delRs, rcvQs) = M.mapEither rcvQueues cs
|
||||
rqs = concat $ M.elems rcvQs
|
||||
connIds' = M.keys rcvQs
|
||||
forM_ connIds' $ disableConn c
|
||||
withStore' c $ forM_ (M.keys delRs) . deleteConn
|
||||
pure (errs' <> delRs, rqs, connIds')
|
||||
where
|
||||
rcvQueues :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue]
|
||||
rcvQueues (SomeConn _ conn) = case connRcvQueues conn of
|
||||
[] -> Left $ Right ()
|
||||
rqs -> Right rqs
|
||||
|
||||
deleteConnQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m (Map ConnId (Either AgentErrorType ()))
|
||||
deleteConnQueues c rqs = do
|
||||
rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs)
|
||||
forM_ (M.assocs rs) $ \case
|
||||
(connId, Right _) -> disableConn c connId >> withStore' c (`deleteConn` connId)
|
||||
(connId, Right _) -> withStore' c (`deleteConn` connId)
|
||||
_ -> pure ()
|
||||
notifyResultError rs
|
||||
pure rs
|
||||
where
|
||||
rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue]
|
||||
rcvQueueOrResult (SomeConn _ conn) = case conn of
|
||||
DuplexConnection _ rqs _ -> Right $ L.toList rqs
|
||||
RcvConnection _ rq -> Right [rq]
|
||||
ContactConnection _ rq -> Right [rq]
|
||||
SndConnection _ _ -> Left $ Right ()
|
||||
NewConnection _ -> Left $ Right ()
|
||||
deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
deleteQueueRecs rs = do
|
||||
maxErrs <- asks $ deleteErrorCount . config
|
||||
forM rs $ \(rq, r) -> do
|
||||
r' <- case r of
|
||||
Right _ -> withStore' c (`deleteConnRcvQueue` rq) $> r
|
||||
Left e
|
||||
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> withStore' c (`incRcvDeleteErrors` rq) $> r
|
||||
| otherwise -> withStore' c (`deleteConnRcvQueue` rq) $> Right ()
|
||||
pure (rq, r')
|
||||
connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())
|
||||
connResults = M.map snd . foldl' addResult M.empty
|
||||
where
|
||||
@@ -1261,6 +1316,22 @@ deleteConnections' c connIds = do
|
||||
order (Active, Left _) = 1
|
||||
order (_, Left _) = 2
|
||||
order _ = 3
|
||||
|
||||
deleteConnections_ ::
|
||||
forall m.
|
||||
AgentMonad m =>
|
||||
(DB.Connection -> ConnId -> IO (Either StoreError SomeConn)) ->
|
||||
AgentClient ->
|
||||
[ConnId] ->
|
||||
m (Map ConnId (Either AgentErrorType ()))
|
||||
deleteConnections_ _ _ [] = pure M.empty
|
||||
deleteConnections_ getConnection c connIds = do
|
||||
(rs, rqs, _) <- prepareDeleteConnections_ getConnection c connIds
|
||||
rcvRs <- deleteConnQueues c rqs
|
||||
let rs' = M.union rs rcvRs
|
||||
notifyResultError rs'
|
||||
pure rs'
|
||||
where
|
||||
notifyResultError :: Map ConnId (Either AgentErrorType ()) -> m ()
|
||||
notifyResultError rs = do
|
||||
let actual = M.size rs
|
||||
@@ -1520,10 +1591,11 @@ execAgentStoreSQL' :: AgentMonad m => AgentClient -> Text -> m [Text]
|
||||
execAgentStoreSQL' c sql = withStore' c (`execSQL` sql)
|
||||
|
||||
debugAgentLocks' :: AgentMonad m => AgentClient -> m AgentLocks
|
||||
debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs} = do
|
||||
debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock = d} = do
|
||||
connLocks <- getLocks cs
|
||||
srvLocks <- getLocks rs
|
||||
pure AgentLocks {connLocks, srvLocks}
|
||||
delLock <- atomically $ tryReadTMVar d
|
||||
pure AgentLocks {connLocks, srvLocks, delLock}
|
||||
where
|
||||
getLocks ls = atomically $ M.mapKeys (B.unpack . strEncode) . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls)
|
||||
|
||||
@@ -1557,6 +1629,17 @@ subscriber c@AgentClient {msgQ} = forever $ do
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
|
||||
cleanupManager :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
cleanupManager c = do
|
||||
threadDelay =<< asks (initialCleanupDelay . config)
|
||||
int <- asks (cleanupInterval . config)
|
||||
forever $ do
|
||||
void . runExceptT $
|
||||
withLock (deleteLock c) "cleanupManager" $ do
|
||||
void $ withStore' c getDeletedConns >>= deleteDeletedConns c
|
||||
withStore' c deleteUsersWithoutConns
|
||||
threadDelay int
|
||||
|
||||
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
|
||||
processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, sessId, rId, cmd) = do
|
||||
(rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId)
|
||||
|
||||
@@ -57,7 +57,6 @@ module Simplex.Messaging.Agent.Client
|
||||
deleteQueues,
|
||||
logServer,
|
||||
logSecret,
|
||||
removeQueueSubscription,
|
||||
removeSubscription,
|
||||
hasActiveSubscription,
|
||||
agentClientStore,
|
||||
@@ -202,6 +201,8 @@ data AgentClient = AgentClient
|
||||
getMsgLocks :: TMap (SMPServer, SMP.RecipientId) (TMVar ()),
|
||||
-- locks to prevent concurrent operations with connection
|
||||
connLocks :: TMap ConnId Lock,
|
||||
-- lock to prevent concurrency between periodic and async connection deletions
|
||||
deleteLock :: Lock,
|
||||
-- locks to prevent concurrent reconnections to SMP servers
|
||||
reconnectLocks :: TMap SMPTransportSession Lock,
|
||||
reconnections :: TAsyncs,
|
||||
@@ -230,7 +231,7 @@ data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int}
|
||||
data AgentState = ASActive | ASSuspending | ASSuspended
|
||||
deriving (Eq, Show)
|
||||
|
||||
data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String}
|
||||
data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String}
|
||||
deriving (Show, Generic)
|
||||
|
||||
instance ToJSON AgentLocks where toEncoding = J.genericToEncoding J.defaultOptions
|
||||
@@ -273,12 +274,48 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
|
||||
agentState <- newTVar ASActive
|
||||
getMsgLocks <- TM.empty
|
||||
connLocks <- TM.empty
|
||||
deleteLock <- createLock
|
||||
reconnectLocks <- TM.empty
|
||||
reconnections <- newTAsyncs
|
||||
asyncClients <- newTAsyncs
|
||||
agentStats <- TM.empty
|
||||
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv}
|
||||
return
|
||||
AgentClient
|
||||
{ active,
|
||||
rcvQ,
|
||||
subQ,
|
||||
msgQ,
|
||||
smpServers,
|
||||
smpClients,
|
||||
ntfServers,
|
||||
ntfClients,
|
||||
useNetworkConfig,
|
||||
subscrConns,
|
||||
activeSubs,
|
||||
pendingSubs,
|
||||
pendingMsgsQueued,
|
||||
smpQueueMsgQueues,
|
||||
smpQueueMsgDeliveries,
|
||||
connCmdsQueued,
|
||||
asyncCmdQueues,
|
||||
asyncCmdProcesses,
|
||||
ntfNetworkOp,
|
||||
rcvNetworkOp,
|
||||
msgDeliveryOp,
|
||||
sndNetworkOp,
|
||||
databaseOp,
|
||||
agentState,
|
||||
getMsgLocks,
|
||||
connLocks,
|
||||
deleteLock,
|
||||
reconnectLocks,
|
||||
reconnections,
|
||||
asyncClients,
|
||||
agentStats,
|
||||
clientId,
|
||||
agentEnv
|
||||
}
|
||||
|
||||
agentClientStore :: AgentClient -> SQLiteStore
|
||||
agentClientStore AgentClient {agentEnv = Env {store}} = store
|
||||
@@ -503,7 +540,7 @@ withConnLock AgentClient {connLocks} connId name = withLockMap_ connLocks connId
|
||||
withLockMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> k -> String -> m a -> m a
|
||||
withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pure
|
||||
where
|
||||
newLock = newEmptyTMVar >>= \l -> TM.insert key l locks $> l
|
||||
newLock = createLock >>= \l -> TM.insert key l locks $> l
|
||||
|
||||
withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient msg -> m a) -> m a
|
||||
withClient_ c tSess@(userId, srv, _) statCmd action = do
|
||||
@@ -641,7 +678,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange = do
|
||||
primary = True,
|
||||
dbReplaceQueueId = Nothing,
|
||||
smpClientVersion = maxVersion vRange,
|
||||
clientNtfCreds = Nothing
|
||||
clientNtfCreds = Nothing,
|
||||
deleteErrors = 0
|
||||
}
|
||||
pure (rq, SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey)
|
||||
|
||||
@@ -686,6 +724,7 @@ temporaryOrHostError = \case
|
||||
BROKER _ HOST -> True
|
||||
e -> temporaryAgentError e
|
||||
|
||||
-- | Subscribe to queues. The list of results can have a different order.
|
||||
subscribeQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
subscribeQueues c qs = do
|
||||
(errs, qs') <- partitionEithers <$> mapM checkQueue qs
|
||||
@@ -746,11 +785,6 @@ addSubscription c rq@RcvQueue {connId} = atomically $ do
|
||||
RQ.addQueue rq $ activeSubs c
|
||||
RQ.deleteQueue rq $ pendingSubs c
|
||||
|
||||
removeQueueSubscription :: MonadIO m => AgentClient -> RcvQueue -> m ()
|
||||
removeQueueSubscription c rq = atomically $ do
|
||||
RQ.deleteQueue rq $ activeSubs c
|
||||
RQ.deleteQueue rq $ pendingSubs c
|
||||
|
||||
hasActiveSubscription :: AgentClient -> ConnId -> STM Bool
|
||||
hasActiveSubscription c connId = RQ.hasConn connId $ activeSubs c
|
||||
|
||||
|
||||
@@ -88,6 +88,9 @@ data AgentConfig = AgentConfig
|
||||
messageRetryInterval :: RetryInterval2,
|
||||
messageTimeout :: NominalDiffTime,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
initialCleanupDelay :: Int,
|
||||
cleanupInterval :: Int,
|
||||
deleteErrorCount :: Int,
|
||||
ntfCron :: Word16,
|
||||
ntfWorkerDelay :: Int,
|
||||
ntfSMPWorkerDelay :: Int,
|
||||
@@ -145,6 +148,9 @@ defaultAgentConfig =
|
||||
messageRetryInterval = defaultMessageRetryInterval,
|
||||
messageTimeout = 2 * nominalDay,
|
||||
helloTimeout = 2 * nominalDay,
|
||||
initialCleanupDelay = 30 * 1000000, -- 30 seconds
|
||||
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
|
||||
deleteErrorCount = 10,
|
||||
ntfCron = 20, -- minutes
|
||||
ntfWorkerDelay = 100000, -- microseconds
|
||||
ntfSMPWorkerDelay = 500000, -- microseconds
|
||||
|
||||
@@ -10,6 +10,10 @@ import UnliftIO.STM
|
||||
|
||||
type Lock = TMVar String
|
||||
|
||||
createLock :: STM Lock
|
||||
createLock = newEmptyTMVar
|
||||
{-# INLINE createLock #-}
|
||||
|
||||
withLock :: MonadUnliftIO m => TMVar String -> String -> m a -> m a
|
||||
withLock lock name =
|
||||
E.bracket_
|
||||
|
||||
@@ -74,7 +74,8 @@ data RcvQueue = RcvQueue
|
||||
-- | SMP client version
|
||||
smpClientVersion :: Version,
|
||||
-- | credentials used in context of notifications
|
||||
clientNtfCreds :: Maybe ClientNtfCreds
|
||||
clientNtfCreds :: Maybe ClientNtfCreds,
|
||||
deleteErrors :: Int
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
|
||||
@@ -30,6 +30,10 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
-- * Users
|
||||
createUserRecord,
|
||||
deleteUserRecord,
|
||||
setUserDeleted,
|
||||
deleteUserWithoutConns,
|
||||
deleteUsersWithoutConns,
|
||||
checkUser,
|
||||
|
||||
-- * Queues and connections
|
||||
createNewConn,
|
||||
@@ -38,9 +42,10 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
createRcvConn,
|
||||
createSndConn,
|
||||
getConn,
|
||||
getAnyConn,
|
||||
getDeletedConn,
|
||||
getConnData,
|
||||
setConnDeleted,
|
||||
getDeletedConns,
|
||||
getRcvConn,
|
||||
deleteConn,
|
||||
upgradeRcvConnToDuplex,
|
||||
@@ -53,6 +58,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
setRcvQueuePrimary,
|
||||
setSndQueuePrimary,
|
||||
deleteConnRcvQueue,
|
||||
incRcvDeleteErrors,
|
||||
deleteConnSndQueue,
|
||||
getPrimaryRcvQueue,
|
||||
getRcvQueue,
|
||||
@@ -306,13 +312,46 @@ createUserRecord db = do
|
||||
DB.execute_ db "INSERT INTO users DEFAULT VALUES"
|
||||
insertedRowId db
|
||||
|
||||
checkUser :: DB.Connection -> UserId -> IO (Either StoreError ())
|
||||
checkUser db userId =
|
||||
firstRow (\(_ :: Only Int64) -> ()) SEUserNotFound $
|
||||
DB.query db "SELECT user_id FROM users WHERE user_id = ? AND deleted = ?" (userId, False)
|
||||
|
||||
deleteUserRecord :: DB.Connection -> UserId -> IO (Either StoreError ())
|
||||
deleteUserRecord db userId = runExceptT $ do
|
||||
_ :: Only Int64 <-
|
||||
ExceptT . firstRow id SEUserNotFound $
|
||||
DB.query db "SELECT user_id FROM users WHERE user_id = ?" (Only userId)
|
||||
ExceptT $ checkUser db userId
|
||||
liftIO $ DB.execute db "DELETE FROM users WHERE user_id = ?" (Only userId)
|
||||
|
||||
setUserDeleted :: DB.Connection -> UserId -> IO (Either StoreError [ConnId])
|
||||
setUserDeleted db userId = runExceptT $ do
|
||||
ExceptT $ checkUser db userId
|
||||
liftIO $ do
|
||||
DB.execute db "UPDATE users SET deleted = ? WHERE user_id = ?" (True, userId)
|
||||
map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE user_id = ?" (Only userId)
|
||||
|
||||
deleteUserWithoutConns :: DB.Connection -> UserId -> IO ()
|
||||
deleteUserWithoutConns db userId =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
DELETE FROM users u
|
||||
WHERE user_id = ?
|
||||
AND u.deleted = ?
|
||||
AND NOT EXISTS (SELECT * FROM connections WHERE user_id = u.user_id)
|
||||
|]
|
||||
(userId, True)
|
||||
|
||||
deleteUsersWithoutConns :: DB.Connection -> IO ()
|
||||
deleteUsersWithoutConns db =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
DELETE FROM users u
|
||||
WHERE u.deleted = ?
|
||||
AND NOT EXISTS (SELECT * FROM connections WHERE user_id = u.user_id)
|
||||
|]
|
||||
(Only True)
|
||||
|
||||
createConn_ ::
|
||||
TVar ChaChaDRG ->
|
||||
ConnData ->
|
||||
@@ -471,6 +510,10 @@ setSndQueuePrimary db connId SndQueue {dbQueueId} = do
|
||||
"UPDATE snd_queues SET snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?"
|
||||
(True, Nothing :: Maybe Int64, connId, dbQueueId)
|
||||
|
||||
incRcvDeleteErrors :: DB.Connection -> RcvQueue -> IO ()
|
||||
incRcvDeleteErrors db RcvQueue {connId, dbQueueId} =
|
||||
DB.execute db "UPDATE rcv_queues SET delete_errors = delete_errors + 1 WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
|
||||
|
||||
deleteConnRcvQueue :: DB.Connection -> RcvQueue -> IO ()
|
||||
deleteConnRcvQueue db RcvQueue {connId, dbQueueId} =
|
||||
DB.execute db "DELETE FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
|
||||
@@ -1330,10 +1373,13 @@ newQueueId_ (Only maxId : _) = maxId + 1
|
||||
-- * getConn helpers
|
||||
|
||||
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
|
||||
getConn db connId = getAnyConn db connId False
|
||||
getConn = getAnyConn False
|
||||
|
||||
getAnyConn :: DB.Connection -> ConnId -> Bool -> IO (Either StoreError SomeConn)
|
||||
getAnyConn dbConn connId deleted' =
|
||||
getDeletedConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
|
||||
getDeletedConn = getAnyConn True
|
||||
|
||||
getAnyConn :: Bool -> DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
|
||||
getAnyConn deleted' dbConn connId =
|
||||
getConnData dbConn connId >>= \case
|
||||
Nothing -> pure $ Left SEConnNotFound
|
||||
Just (cData@ConnData {deleted}, cMode)
|
||||
@@ -1358,6 +1404,9 @@ getConnData dbConn connId' =
|
||||
setConnDeleted :: DB.Connection -> ConnId -> IO ()
|
||||
setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
|
||||
|
||||
getDeletedConns :: DB.Connection -> IO [ConnId]
|
||||
getDeletedConns db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True)
|
||||
|
||||
-- | returns all connection queues, the first queue is the primary one
|
||||
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
|
||||
getRcvQueuesByConnId_ db connId =
|
||||
@@ -1373,7 +1422,7 @@ rcvQueueQuery =
|
||||
[sql|
|
||||
SELECT c.user_id, s.key_hash, q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
|
||||
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status,
|
||||
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version,
|
||||
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version, q.delete_errors,
|
||||
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret
|
||||
FROM rcv_queues q
|
||||
JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
@@ -1382,16 +1431,16 @@ rcvQueueQuery =
|
||||
|
||||
toRcvQueue ::
|
||||
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus)
|
||||
:. (Int64, Bool, Maybe Int64, Maybe Version)
|
||||
:. (Int64, Bool, Maybe Int64, Maybe Version, Int)
|
||||
:. (Maybe SMP.NtfPublicVerifyKey, Maybe SMP.NtfPrivateSignKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) ->
|
||||
RcvQueue
|
||||
toRcvQueue ((userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
|
||||
toRcvQueue ((userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_, deleteErrors) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
|
||||
let server = SMPServer host port keyHash
|
||||
smpClientVersion = fromMaybe 1 smpClientVersion_
|
||||
clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of
|
||||
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
|
||||
_ -> Nothing
|
||||
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds}
|
||||
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds, deleteErrors}
|
||||
|
||||
getRcvQueueById_ :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue)
|
||||
getRcvQueueById_ db connId dbRcvId =
|
||||
|
||||
@@ -39,6 +39,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
|
||||
@@ -57,7 +58,8 @@ schemaMigrations =
|
||||
("m20220905_commands", m20220905_commands),
|
||||
("m20220915_connection_queues", m20220915_connection_queues),
|
||||
("m20230110_users", m20230110_users),
|
||||
("m20230117_fkey_indexes", m20230117_fkey_indexes)
|
||||
("m20230117_fkey_indexes", m20230117_fkey_indexes),
|
||||
("m20230120_delete_errors", m20230120_delete_errors)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20230120_delete_errors :: Query
|
||||
m20230120_delete_errors =
|
||||
[sql|
|
||||
PRAGMA ignore_check_constraints=ON;
|
||||
|
||||
ALTER TABLE rcv_queues ADD COLUMN delete_errors INTEGER DEFAULT 0 CHECK (delete_errors NOT NULL);
|
||||
UPDATE rcv_queues SET delete_errors = 0;
|
||||
|
||||
ALTER TABLE users ADD COLUMN deleted INTEGER DEFAULT 0 CHECK (deleted NOT NULL);
|
||||
UPDATE users SET deleted = 0;
|
||||
|
||||
PRAGMA ignore_check_constraints=OFF;
|
||||
|]
|
||||
@@ -47,6 +47,7 @@ CREATE TABLE rcv_queues(
|
||||
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
|
||||
rcv_primary INTEGER CHECK(rcv_primary NOT NULL),
|
||||
replace_rcv_queue_id INTEGER NULL,
|
||||
delete_errors INTEGER DEFAULT 0 CHECK(delete_errors NOT NULL),
|
||||
PRIMARY KEY(host, port, rcv_id),
|
||||
FOREIGN KEY(host, port) REFERENCES servers
|
||||
ON DELETE RESTRICT ON UPDATE CASCADE,
|
||||
@@ -230,7 +231,11 @@ CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries(
|
||||
conn_id,
|
||||
snd_queue_id
|
||||
);
|
||||
CREATE TABLE users(user_id INTEGER PRIMARY KEY AUTOINCREMENT);
|
||||
CREATE TABLE users(
|
||||
user_id INTEGER PRIMARY KEY AUTOINCREMENT
|
||||
,
|
||||
deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL)
|
||||
);
|
||||
CREATE INDEX idx_connections_user ON connections(user_id);
|
||||
CREATE INDEX idx_commands_conn_id ON commands(conn_id);
|
||||
CREATE INDEX idx_commands_host_port ON commands(host, port);
|
||||
|
||||
@@ -670,8 +670,9 @@ testAsyncCommands = do
|
||||
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
|
||||
ackMessageAsync alice "7" bobId $ baseId + 4
|
||||
("7", _, OK) <- get alice
|
||||
deleteConnectionAsync alice "8" bobId
|
||||
("8", _, OK) <- get alice
|
||||
deleteConnectionAsync alice bobId
|
||||
-- deleteConnectionAsync alice "8" bobId
|
||||
-- ("8", _, OK) <- get alice
|
||||
liftIO $ noMessages alice "nothing else should be delivered to alice"
|
||||
where
|
||||
baseId = 3
|
||||
@@ -810,9 +811,11 @@ testSwitchDelete servers = do
|
||||
disconnectAgentClient b
|
||||
switchConnectionAsync a "" bId
|
||||
phase a bId QDRcv SPStarted
|
||||
deleteConnectionAsync a "1" bId
|
||||
("1", bId', OK) <- get a
|
||||
liftIO $ bId `shouldBe` bId'
|
||||
deleteConnectionAsync a bId
|
||||
|
||||
-- deleteConnectionAsync a "1" bId
|
||||
-- ("1", bId', OK) <- get a
|
||||
-- liftIO $ bId `shouldBe` bId'
|
||||
|
||||
testCreateQueueAuth :: (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int
|
||||
testCreateQueueAuth clnt1 clnt2 = do
|
||||
|
||||
@@ -168,7 +168,8 @@ rcvQueue1 =
|
||||
primary = True,
|
||||
dbReplaceQueueId = Nothing,
|
||||
smpClientVersion = 1,
|
||||
clientNtfCreds = Nothing
|
||||
clientNtfCreds = Nothing,
|
||||
deleteErrors = 0
|
||||
}
|
||||
|
||||
sndQueue1 :: SndQueue
|
||||
@@ -357,7 +358,8 @@ testUpgradeSndConnToDuplex =
|
||||
primary = True,
|
||||
dbReplaceQueueId = Nothing,
|
||||
smpClientVersion = 1,
|
||||
clientNtfCreds = Nothing
|
||||
clientNtfCreds = Nothing,
|
||||
deleteErrors = 0
|
||||
}
|
||||
upgradeSndConnToDuplex db "conn1" anotherRcvQueue
|
||||
`shouldReturn` Left (SEBadConnType CRcv)
|
||||
|
||||
Reference in New Issue
Block a user