agent: one delivery worker per session

This commit is contained in:
Evgeny Poberezkin
2024-01-18 14:01:15 +00:00
parent b2a119390c
commit ff600e3043
12 changed files with 246 additions and 125 deletions
+2
View File
@@ -68,6 +68,7 @@ library
Simplex.Messaging.Agent.Protocol
Simplex.Messaging.Agent.QueryString
Simplex.Messaging.Agent.RetryInterval
Simplex.Messaging.Agent.RetryInterval.Delivery
Simplex.Messaging.Agent.Server
Simplex.Messaging.Agent.Store
Simplex.Messaging.Agent.Store.SQLite
@@ -101,6 +102,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
+2 -2
View File
@@ -316,7 +316,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, xftpRetryInterval = ri} = cfg
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload SndFile {key, nonce, srcFile} fsEncPath = do
let CryptoFile {filePath} = srcFile
@@ -345,7 +345,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
where
tryCreate = do
usedSrvs <- newTVarIO ([] :: [XFTPServer])
withRetryInterval (riFast ri) $ \_ loop ->
withRetryInterval ri $ \_ loop ->
createWithNextSrv usedSrvs
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
+93 -77
View File
@@ -147,11 +147,12 @@ import Simplex.Messaging.Agent.Lock (withLock)
import Simplex.Messaging.Agent.NtfSubSupervisor
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.RetryInterval.Delivery
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission)
import Simplex.Messaging.Client (ProtocolClient (..), SMPTransportSession, ServerTransmission)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
@@ -782,7 +783,7 @@ subscribeConnections' c connIds = do
let (errs, cs) = M.mapEither id conns
errs' = M.map (Left . storeError) errs
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs
mapM_ (mapM_ (\(_, sqs) -> mapM_ (resumeMsgDelivery c) sqs) . sndQueue) cs
mapM_ (resumeConnCmds c) $ M.keys cs
rcvRs <- connResults <$> subscribeQueues c (concat $ M.elems rcvQs)
ns <- asks ntfSupervisor
@@ -1089,7 +1090,7 @@ enqueueMessageB c reqs = do
aVRange <- asks $ maxVersion . smpAgentVRange . config
reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db aVRange) reqs
forME reqMids $ \((cData, sq :| sqs, _, _), InternalId msgId) -> do
submitPendingMsg c cData sq
submitPendingMsg c sq
let sqs' = filter isActiveSndQ sqs
pure $ Right (msgId, if null sqs' then Nothing else Just (cData, sqs', msgId))
where
@@ -1116,45 +1117,47 @@ enqueueSavedMessageB :: (AgentMonad' m, Foldable t) => AgentClient -> t (ConnDat
enqueueSavedMessageB c reqs = do
-- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved
void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs
forM_ reqs $ \(cData, sqs, _) ->
forM sqs $ submitPendingMsg c cData
forM_ reqs $ \(_, sqs, _) ->
forM sqs $ submitPendingMsg c
where
storeDeliveries :: DB.Connection -> (ConnData, [SndQueue], AgentMsgId) -> [IO ()]
storeDeliveries db (ConnData {connId}, sqs, msgId) = do
let mId = InternalId msgId
in map (\sq -> createSndMsgDelivery db connId sq mId) sqs
resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
resumeMsgDelivery = void .:. getDeliveryWorker False
resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> SndQueue -> m ()
resumeMsgDelivery = void .: getDeliveryWorker False
getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ())
getDeliveryWorker hasWork c cData sq =
getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq)
where
mkLock w = do
retryLock <- newEmptyTMVar
pure (w, retryLock)
getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> SndQueue -> m Worker
getDeliveryWorker hasWork c sq = do
tSess <- mkSMPTransportSession c sq
getAgentWorker "msg_delivery" hasWork c tSess (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c tSess)
submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
submitPendingMsg c cData sq = do
submitPendingMsg :: AgentMonad' m => AgentClient -> SndQueue -> m ()
submitPendingMsg c sq = do
atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1}
void $ getDeliveryWorker True c cData sq
void $ getDeliveryWorker True c sq
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq (Worker {doWork}, qLock) = do
ri <- asks $ messageRetryInterval . config
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> Worker -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} tSess@(_, srv, _) Worker {doWork} = do
dCfg <- asks $ messageDeliveryCfg . config
forever $ do
atomically $ endAgentOperation c AOSndNetwork
waitForWork doWork
atomically $ throwWhenInactive c
atomically $ throwWhenNoDelivery c sq
atomically $ throwWhenNoDelivery c tSess
atomically $ beginAgentOperation c AOSndNetwork
withWork c doWork (\db -> getPendingQueueMsg db connId sq) $
\(rq_, PendingMsgData {msgId, msgType, msgBody, msgFlags, msgRetryState, internalTs}) -> do
withWork c doWork (`getPendingSessionMsg` tSess) (deliverMessage dCfg)
where
deliverMessage
dCfg@MsgDeliveryConfig {messageRetryInterval = ri}
(cData@ConnData {userId, connId, duplexHandshake}, sq@SndQueue {quotaExceeded = qe, retryState}, rq_, PendingMsgData {msgId, msgType, msgBody, msgFlags, msgRetryState, internalTs}) = do
atomically $ endAgentOperation c AOMsgDelivery -- this operation begins in submitPendingMsg
let mId = unId msgId
ri' = maybe id updateRetryInterval2 msgRetryState ri
withRetryLock2 ri' qLock $ \riState loop -> do
ri = (if qe then quotaExceededRetryInterval else messageRetryInterval) dCfg
ri' = maybe id updateRetryInterval retryState ri
withRetryIntervalCount ri' $ \n delay loop -> do
-- withRetryLock2 ri' qLock $ \riState loop -> do
resp <- tryError $ case msgType of
AM_CONN_INFO -> sendConfirmation c sq msgBody
AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody
@@ -1163,50 +1166,62 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
Left e -> do
let err = if msgType == AM_A_MSG_ then MERR mId e else ERR e
case e of
-- never loop on quota exceeded error, ignoring messageConsecutiveRetries,
-- to avoid blocking delivery in other queues.
SMP SMP.QUOTA -> case msgType of
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
_ -> retrySndMsg RISlow
AM_CONN_INFO -> connError NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError NOT_AVAILABLE
_ -> ifM (msgExpired quotaExceededTimeout) (notifyDelMessages err) (updateDeliverAfter True delay)
SMP SMP.AUTH -> case msgType of
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
AM_RATCHET_INFO -> connError msgId NOT_AVAILABLE
AM_CONN_INFO -> connError NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError NOT_AVAILABLE
AM_RATCHET_INFO -> connError NOT_AVAILABLE
AM_HELLO_
-- in duplexHandshake mode (v2) HELLO is only sent once, without retrying,
-- because the queue must be secured by the time the confirmation or the first HELLO is received
| duplexHandshake == Just True -> connErr
| otherwise ->
ifM (msgExpired helloTimeout) connErr (retrySndMsg RIFast)
-- otherwise branch is not used in clients with v2+ of agent protocol (June 2022)
-- TODO remove in v6
| otherwise -> retrySndMsg n delay err
where
connErr = case rq_ of
-- party initiating connection
Just _ -> connError msgId NOT_AVAILABLE
Just _ -> connError NOT_AVAILABLE
-- party joining connection
_ -> connError msgId NOT_ACCEPTED
AM_REPLY_ -> notifyDel msgId err
AM_A_MSG_ -> notifyDel msgId err
AM_A_RCVD_ -> notifyDel msgId err
AM_QCONT_ -> notifyDel msgId err
AM_QADD_ -> qError msgId "QADD: AUTH"
AM_QKEY_ -> qError msgId "QKEY: AUTH"
AM_QUSE_ -> qError msgId "QUSE: AUTH"
AM_QTEST_ -> qError msgId "QTEST: AUTH"
AM_EREADY_ -> notifyDel msgId err
_ -> connError NOT_ACCEPTED
AM_REPLY_ -> notifyDel err
AM_A_MSG_ -> notifyDel err
AM_A_RCVD_ -> notifyDel err
AM_QCONT_ -> notifyDel err
AM_QADD_ -> qError "QADD: AUTH"
AM_QKEY_ -> qError "QKEY: AUTH"
AM_QUSE_ -> qError "QUSE: AUTH"
AM_QTEST_ -> qError "QTEST: AUTH"
AM_EREADY_ -> notifyDel err
_
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
-- the message sending would be retried
| temporaryOrHostError e -> do
let timeoutSel = if msgType == AM_HELLO_ then helloTimeout else messageTimeout
ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndMsg RIFast)
| otherwise -> notifyDel msgId err
| temporaryOrHostError e -> retrySndMsg n delay err
| otherwise -> notifyDel err
where
msgExpired timeoutSel = do
msgTimeout <- asks $ timeoutSel . config
let msgTimeout = timeoutSel dCfg
currentTime <- liftIO getCurrentTime
pure $ diffUTCTime currentTime internalTs > msgTimeout
retrySndMsg riMode = do
withStore' c $ \db -> updatePendingMsgRIState db connId msgId riState
retrySndOp c $ loop riMode
notifyDelMessages err = do
notifyDel err
mIds <- withStore' c $ \db -> getExpiredSndMessages db connId sq
forM_ mIds $ \mId' -> notify $ MERR (unId mId') $ BROKER (B.unpack $ strEncode srv) TIMEOUT
withStore' c $ \db -> deleteExpiredSndMessages db connId sq mIds
retrySndMsg n delay err
| n + 1 < messageConsecutiveRetries dCfg = retrySndOp c loop
| otherwise = ifM (msgExpired messageTimeout) (notifyDelMessages err) (updateDeliverAfter False delay)
updateDeliverAfter qe' delay = do
let ri = (if qe then quotaExceededRetryInterval else messageRetryInterval) dCfg
-- TODO elapsed instead of 0?
delay' = if qe == qe' then nextDelay 0 delay ri else initialInterval ri
withStore' c $ \db -> updateSndQueueDelivery db connId sq qe' delay'
-- retrySndOp c $ loop riMode
Right () -> do
case msgType of
AM_CONN_INFO -> setConfirmed
@@ -1255,11 +1270,11 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
Just SndQueue {dbReplaceQueueId = Just replacedId, primary} ->
-- second part of this condition is a sanity check because dbReplaceQueueId cannot point to the same queue, see switchConnection'
case removeQP (\sq' -> dbQId sq' == replacedId && not (sameQueue addr sq')) sqs of
Nothing -> internalErr msgId "sent QTEST: queue not found in connection"
Nothing -> internalErr "sent QTEST: queue not found in connection"
Just (sq', sq'' : sqs') -> do
checkSQSwchStatus sq' SSSendingQTEST
-- remove the delivery from the map to stop the thread when the delivery loop is complete
atomically $ TM.delete (qAddress sq') $ smpDeliveryWorkers c
-- atomically $ TM.delete (qAddress sq') $ smpDeliveryWorkers c
withStore' c $ \db -> do
when primary $ setSndQueuePrimary db connId sq
deletePendingMsgs db connId sq'
@@ -1267,29 +1282,29 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
let sqs'' = sq'' :| sqs'
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH QDSnd SPCompleted $ connectionStats conn'
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
_ -> internalErr "sent QTEST: there is only one queue in connection"
_ -> internalErr "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr "QTEST sent not in duplex connection"
AM_EREADY_ -> pure ()
delMsgKeep (msgType == AM_A_MSG_) msgId
delMsgKeep (msgType == AM_A_MSG_)
where
setConfirmed = do
withStore' c $ \db -> do
setSndQueueStatus db sq Confirmed
when (isJust rq_) $ removeConfirmations db connId
unless (duplexHandshake == Just True) . void $ enqueueMessage c cData sq SMP.noMsgFlags HELLO
where
delMsg :: InternalId -> m ()
delMsg = delMsgKeep False
delMsgKeep :: Bool -> InternalId -> m ()
delMsgKeep keepForReceipt msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd)
notifyDel :: AEntityI e => InternalId -> ACommand 'Agent e -> m ()
notifyDel msgId cmd = notify cmd >> delMsg msgId
connError msgId = notifyDel msgId . ERR . CONN
qError msgId = notifyDel msgId . ERR . AGENT . A_QUEUE
internalErr msgId = notifyDel msgId . ERR . INTERNAL
where
delMsg :: m ()
delMsg = delMsgKeep False
delMsgKeep :: Bool -> m ()
delMsgKeep keepForReceipt = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd)
notifyDel :: AEntityI e => ACommand 'Agent e -> m ()
notifyDel cmd = notify cmd >> delMsg
connError = notifyDel . ERR . CONN
qError = notifyDel . ERR . AGENT . A_QUEUE
internalErr = notifyDel . ERR . INTERNAL
retrySndOp :: AgentMonad m => AgentClient -> m () -> m ()
retrySndOp c loop = do
@@ -2176,9 +2191,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
case findQ addr sqs of
Just sq -> do
logServer "<--" c srv rId $ "MSG <QCONT>:" <> logSecret srvMsgId
atomically $
TM.lookup (qAddress sq) (smpDeliveryWorkers c)
>>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ())
withStore' c $ \db -> setQuotaAvailable db connId sq
-- atomically $
-- TM.lookup (qAddress sq) (smpDeliveryWorkers c)
-- >>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ())
Nothing -> qError "QCONT: queue address not found"
messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ()
@@ -2403,7 +2419,7 @@ connectReplyQueues c cData@ConnData {userId, connId} ownConnInfo (qInfo :| _) =
confirmQueueAsync :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> SubscriptionMode -> m ()
confirmQueueAsync v c cData sq srv connInfo e2eEncryption_ subMode = do
storeConfirmation c cData sq e2eEncryption_ =<< mkAgentConfirmation v c cData sq srv connInfo subMode
submitPendingMsg c cData sq
submitPendingMsg c sq
confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> SubscriptionMode -> m ()
confirmQueue v@(Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo e2eEncryption_ subMode = do
@@ -2427,7 +2443,7 @@ mkAgentConfirmation (Compatible agentVersion) c cData sq srv connInfo subMode
enqueueConfirmation :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m ()
enqueueConfirmation c cData sq connInfo e2eEncryption_ = do
storeConfirmation c cData sq e2eEncryption_ $ AgentConnInfo connInfo
submitPendingMsg c cData sq
submitPendingMsg c sq
storeConfirmation :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> Maybe (CR.E2ERatchetParams 'C.X448) -> AgentMessage -> m ()
storeConfirmation c ConnData {connId, connAgentVersion} sq e2eEncryption_ agentMsg = withStore c $ \db -> runExceptT $ do
@@ -2448,10 +2464,10 @@ enqueueRatchetKeyMsgs c cData (sq :| sqs) e2eEncryption = do
mapM_ (enqueueSavedMessage c cData msgId) $ filter isActiveSndQ sqs
enqueueRatchetKey :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> CR.E2ERatchetParams 'C.X448 -> m AgentMsgId
enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do
enqueueRatchetKey c ConnData {connId} sq e2eEncryption = do
aVRange <- asks $ smpAgentVRange . config
msgId <- storeRatchetKey $ maxVersion aVRange
submitPendingMsg c cData sq
submitPendingMsg c sq
pure $ unId msgId
where
storeRatchetKey :: Version -> m InternalId
+9 -11
View File
@@ -86,6 +86,7 @@ module Simplex.Messaging.Agent.Client
AgentState (..),
AgentLocks (..),
AgentStatsKey (..),
mkSMPTransportSession,
getAgentWorker,
getAgentWorker',
cancelWorker,
@@ -155,7 +156,6 @@ import Data.Text.Encoding
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
-- import GHC.Conc (unsafeIOToSTM)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
@@ -228,7 +228,7 @@ data SessionVar a = SessionVar
sessionVarId :: Int
}
type ClientVar msg = SessionVar (Either AgentErrorType (Client msg))
type ClientVar msg = SessionVar (Either AgentErrorType (Client msg))
type SMPClientVar = ClientVar SMP.BrokerMsg
@@ -236,8 +236,6 @@ type NtfClientVar = ClientVar NtfResponse
type XFTPClientVar = ClientVar FileResponse
type SMPTransportSession = TransportSession SMP.BrokerMsg
type NtfTransportSession = TransportSession NtfResponse
type XFTPTransportSession = TransportSession FileResponse
@@ -259,7 +257,7 @@ data AgentClient = AgentClient
pendingSubs :: TRcvQueues,
removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError,
workerSeq :: TVar Int,
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
smpDeliveryWorkers :: TMap SMPTransportSession Worker,
asyncCmdWorkers :: TMap (Maybe SMPServer) Worker,
connCmdsQueued :: TMap ConnId Bool,
ntfNetworkOp :: TVar AgentOpState,
@@ -712,7 +710,7 @@ closeAgentClient c = liftIO $ do
closeProtocolServerClients c xftpClients
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
cancelActions . actions $ asyncClients c
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers smpDeliveryWorkers >>= mapM_ cancelWorker
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
clear connCmdsQueued
atomically . RQ.clear $ activeSubs c
@@ -724,7 +722,7 @@ closeAgentClient c = liftIO $ do
clearWorkers workers = atomically $ swapTVar (workers c) mempty
clear :: Monoid m => (AgentClient -> TVar m) -> IO ()
clear sel = atomically $ writeTVar (sel c) mempty
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel
cancelWorker :: Worker -> IO ()
@@ -739,9 +737,9 @@ throwWhenInactive :: AgentClient -> STM ()
throwWhenInactive c = unlessM (readTVar $ active c) $ throwSTM ThreadKilled
-- this function is used to remove workers once delivery is complete, not when it is removed from the map
throwWhenNoDelivery :: AgentClient -> SndQueue -> STM ()
throwWhenNoDelivery c sq =
unlessM (TM.member (qAddress sq) $ smpDeliveryWorkers c) $
throwWhenNoDelivery :: AgentClient -> SMPTransportSession -> STM ()
throwWhenNoDelivery c tSess =
unlessM (TM.member tSess $ smpDeliveryWorkers c) $
throwSTM ThreadKilled
closeProtocolServerClients :: ProtocolServerClient err msg => AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO ()
@@ -1573,7 +1571,7 @@ getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeli
smpClients_ <- textKeys <$> readTVarIO smpClients
ntfClients_ <- textKeys <$> readTVarIO ntfClients
xftpClients_ <- textKeys <$> readTVarIO xftpClients
smpDeliveryWorkers_ <- workerStats . fmap fst =<< readTVarIO smpDeliveryWorkers
smpDeliveryWorkers_ <- workerStats =<< readTVarIO smpDeliveryWorkers
asyncCmdWorkers_ <- workerStats =<< readTVarIO asyncCmdWorkers
smpSubWorkers_ <- textKeys <$> readTVarIO smpSubWorkers
asyncCients_ <- M.keys <$> readTVarIO actions
+11 -24
View File
@@ -51,6 +51,7 @@ import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.RetryInterval.Delivery
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client
@@ -89,9 +90,8 @@ data AgentConfig = AgentConfig
ntfCfg :: ProtocolClientConfig,
xftpCfg :: XFTPClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval2,
messageTimeout :: NominalDiffTime,
helloTimeout :: NominalDiffTime,
messageDeliveryCfg :: MsgDeliveryConfig,
xftpRetryInterval :: RetryInterval,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
cleanupStepInterval :: Int,
@@ -125,24 +125,12 @@ defaultReconnectInterval =
maxInterval = 180_000000
}
defaultMessageRetryInterval :: RetryInterval2
defaultMessageRetryInterval =
RetryInterval2
{ riFast =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
},
riSlow =
-- TODO: these timeouts can be increased in v5.0 once most clients are updated
-- to resume sending on QCONT messages.
-- After that local message expiration period should be also increased.
RetryInterval
{ initialInterval = 60_000000,
increaseAfter = 60_000000,
maxInterval = 3600_000000 -- 1 hour
}
defaultXFTPRetryInterval :: RetryInterval
defaultXFTPRetryInterval =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
}
defaultAgentConfig :: AgentConfig
@@ -156,9 +144,8 @@ defaultAgentConfig =
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
xftpCfg = defaultXFTPClientConfig,
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
messageTimeout = 2 * nominalDay,
helloTimeout = 2 * nominalDay,
messageDeliveryCfg = defaultMsgDeliveryConfig,
xftpRetryInterval = defaultXFTPRetryInterval,
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
cleanupStepInterval = 200000, -- 200ms
+21 -4
View File
@@ -4,13 +4,16 @@
module Simplex.Messaging.Agent.RetryInterval
( RetryInterval (..),
RetryInterval2 (..),
RetryIntervalMode (..),
-- RetryInterval2 (..),
-- RetryIntervalMode (..),
RIState (..),
RI2State (..),
withRetryInterval,
withRetryIntervalCount,
withRetryLock2,
updateRetryInterval2,
-- withRetryLock2,
-- updateRetryInterval2,
updateRetryInterval,
nextDelay,
)
where
@@ -27,6 +30,12 @@ data RetryInterval = RetryInterval
maxInterval :: Int64
}
data RIState = RIState
{ retryDelay :: Int64,
retryElapsed :: Int64
}
deriving (Eq, Show)
data RetryInterval2 = RetryInterval2
{ riSlow :: RetryInterval,
riFast :: RetryInterval
@@ -38,6 +47,14 @@ data RI2State = RI2State
}
deriving (Show)
updateRetryInterval :: RIState -> RetryInterval -> RetryInterval
updateRetryInterval RIState {retryDelay, retryElapsed} RetryInterval {initialInterval, increaseAfter, maxInterval} =
RetryInterval
{ initialInterval = retryDelay,
increaseAfter = max 0 (increaseAfter - retryElapsed),
maxInterval
}
updateRetryInterval2 :: RI2State -> RetryInterval2 -> RetryInterval2
updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlow, riFast} =
RetryInterval2
@@ -0,0 +1,43 @@
{-# LANGUAGE NumericUnderscores #-}
module Simplex.Messaging.Agent.RetryInterval.Delivery where
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Simplex.Messaging.Agent.RetryInterval
data MsgDeliveryConfig = MsgDeliveryConfig
{ messageRetryInterval :: RetryInterval,
messageTimeout :: NominalDiffTime,
messageConsecutiveRetries :: Int,
quotaExceededRetryInterval :: RetryInterval,
quotaExceededTimeout :: NominalDiffTime
}
defaultMsgDeliveryConfig :: MsgDeliveryConfig
defaultMsgDeliveryConfig =
MsgDeliveryConfig
{ messageRetryInterval =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
},
messageTimeout = 2 * nominalDay,
messageConsecutiveRetries = 3,
quotaExceededRetryInterval =
RetryInterval
{ initialInterval = 180_000000, -- 3 minutes
increaseAfter = 0,
maxInterval = 3 * 3600_000000 -- 3 hours
},
quotaExceededTimeout = 7 * nominalDay
}
-- if
-- | quota exceeded ->
-- | message expired -> send error, stop retries, check and fail other expired messages
-- | otherwise -> stop retries, update delay, store deliver_after
-- | timeout ->
-- | n < messageConsecutiveRetries -> loop
-- | message expired -> send error, stop retries, check and fail other expired messages
-- | otherwise -> stop retries, update delay, store deliver_after
+3 -1
View File
@@ -28,7 +28,7 @@ import Data.Maybe (isJust)
import Data.Time (UTCTime)
import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.RetryInterval (RI2State, RIState)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (RatchetX448)
import Simplex.Messaging.Encoding.String
@@ -149,6 +149,8 @@ data StoredSndQueue (q :: QueueStored) = SndQueue
e2eDhSecret :: C.DhSecretX25519,
-- | queue status
status :: QueueStatus,
quotaExceeded :: Bool,
retryState :: Maybe RIState,
-- | database queue ID (within connection)
dbQueueId :: DBQueueId q,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
+30 -5
View File
@@ -107,9 +107,15 @@ module Simplex.Messaging.Agent.Store.SQLite
createSndMsgDelivery,
getSndMsgViaRcpt,
updateSndMsgRcpt,
getPendingQueueMsg,
updatePendingMsgRIState,
getPendingSessionMsg,
-- getPendingQueueMsg,
-- updatePendingMsgRIState,
updateSndQueueDelivery,
deletePendingMsgs,
getExpiredSndMessages,
deleteExpiredSndMessages,
setQuotaAvailable,
updateDeliveryDelay,
setMsgUserAck,
getRcvMsg,
getLastMsg,
@@ -263,6 +269,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Common
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRError, Migration (..), MigrationsToRun (..), mtrErrorDescription)
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (SMPTransportSession)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
@@ -978,6 +985,9 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} =
"UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?"
(agentMsgId, msgRcptStatus, connId, sndMsgId)
getPendingSessionMsg :: DB.Connection -> SMPTransportSession -> IO (Either StoreError (Maybe (ConnData, SndQueue, Maybe RcvQueue, PendingMsgData)))
getPendingSessionMsg _db _tSess = undefined
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
getPendingQueueMsg db connId SndQueue {dbQueueId} =
getWorkItem "message" getMsgId getMsgData markMsgFailed
@@ -1033,14 +1043,29 @@ getWorkItem itemName getId getItem markFailed =
mkError :: E.SomeException -> StoreError
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId)
-- updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
-- updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
-- DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId)
updateSndQueueDelivery :: DB.Connection -> ConnId -> SndQueue -> Bool -> Int64 -> IO ()
updateSndQueueDelivery _db _connId _sq _quotaExceeded _delay = undefined
deletePendingMsgs :: DB.Connection -> ConnId -> SndQueue -> IO ()
deletePendingMsgs db connId SndQueue {dbQueueId} =
DB.execute db "DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?" (connId, dbQueueId)
getExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> IO [InternalId]
getExpiredSndMessages = undefined
deleteExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> [InternalId] -> IO ()
deleteExpiredSndMessages = undefined
setQuotaAvailable :: DB.Connection -> ConnId -> SndQueue -> IO ()
setQuotaAvailable = undefined
updateDeliveryDelay :: DB.Connection -> ConnId -> SndQueue -> IO ()
updateDeliveryDelay = undefined
setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId))
setMsgUserAck db connId agentMsgId = runExceptT $ do
(dbRcvId, srvMsgId) <-
@@ -67,6 +67,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -102,7 +103,8 @@ schemaMigrations =
("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes),
("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files),
("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at),
("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items)
("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items),
("m20240118_snd_queue_delivery", m20240118_snd_queue_delivery, Just down_m20240118_snd_queue_delivery)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,26 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20240118_snd_queue_delivery :: Query
m20240118_snd_queue_delivery =
[sql|
ALTER TABLE snd_queues ADD COLUMN deliver_after TEXT NOT NULL DEFAULT('1970-01-01 00:00:00');
ALTER TABLE snd_queues ADD COLUMN quota_exceeded INTEGER NOT NULL DEFAULT 0;
ALTER TABLE snd_queues ADD COLUMN retry_delay INTEGER;
CREATE INDEX idx_snd_queues_deliver_after ON snd_queues(deliver_after);
|]
down_m20240118_snd_queue_delivery :: Query
down_m20240118_snd_queue_delivery =
[sql|
DROP INDEX idx_snd_queues_deliver_after;
ALTER TABLE snd_queues DROP COLUMN deliver_after;
ALTER TABLE snd_queues DROP COLUMN quota_exceeded;
ALTER TABLE snd_queues DROP COLUMN retry_delay;
|]
+3
View File
@@ -28,6 +28,7 @@
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
TransportSession,
SMPTransportSession,
ProtocolClient (thVersion, sessionId, sessionTs),
SMPClient,
getProtocolClient,
@@ -301,6 +302,8 @@ type UserId = Int64
-- | Transport session key - includes entity ID if `sessionMode = TSMEntity`.
type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId)
type SMPTransportSession = TransportSession BrokerMsg
-- | Connects to 'ProtocolServer' using passed client configuration
-- and queue for messages and notifications.
--