Merge branch 'master' into ep/journal-export

This commit is contained in:
Evgeny Poberezkin
2025-09-08 18:56:46 +01:00
10 changed files with 47 additions and 42 deletions
+8 -10
View File
@@ -1974,19 +1974,20 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (
withWork c doWork = withWork_ c doWork . withStore' c
{-# INLINE withWork #-}
withWork_ :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
withWork_ c doWork getWork action =
getWork >>= \case
Right (Just r) -> action r
Right Nothing -> noWork
-- worker is stopped here (noWork) because the next iteration is likely to produce the same result
Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e
Left e -> notifyErr INTERNAL e
Left e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
where
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
withWorkItems :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError [Either StoreError a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems c doWork getWork action = do
getWork >>= \case
Right [] -> noWork
@@ -1995,20 +1996,17 @@ withWorkItems c doWork getWork action = do
case L.nonEmpty items of
Just items' -> action items'
Nothing -> do
let criticalErr = find workItemError errs
let criticalErr = find isWorkItemError errs
forM_ criticalErr $ \err -> do
notifyErr (CRITICAL False) err
when (all workItemError errs) noWork
when (all isWorkItemError errs) noWork
unless (null errs) $
atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
Left e
| workItemError e -> noWork >> notifyErr (CRITICAL False) e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
where
workItemError = \case
SEWorkItemError {} -> True
_ -> False
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
+11 -1
View File
@@ -693,10 +693,20 @@ data StoreError
| -- | XFTP Deleted snd chunk replica not found.
SEDeletedSndChunkReplicaNotFound
| -- | Error when reading work item that suspends worker - do not use!
SEWorkItemError ByteString
SEWorkItemError {errContext :: String}
| -- | Servers stats not found.
SEServersStatsNotFound
deriving (Eq, Show, Exception)
instance AnyError StoreError where
fromSomeException = SEInternal . bshow
class (Show e, AnyError e) => AnyStoreError e where
isWorkItemError :: e -> Bool
mkWorkItemError :: String -> e
instance AnyStoreError StoreError where
isWorkItemError = \case
SEWorkItemError {} -> True
_ -> False
mkWorkItemError errContext = SEWorkItemError {errContext}
@@ -237,6 +237,8 @@ module Simplex.Messaging.Agent.Store.AgentStore
firstRow',
maybeFirstRow,
fromOnlyBI,
getWorkItem,
getWorkItems,
)
where
@@ -966,25 +968,25 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
_ -> Left $ SEInternal "unexpected snd msg data"
markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
getWorkItem :: (Show i, AnyStoreError e) => String -> IO (Maybe i) -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e (Maybe a))
getWorkItem itemName getId getItem markFailed =
runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed)
getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a])
getWorkItems :: (Show i, AnyStoreError e) => String -> IO [i] -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e [Either e a])
getWorkItems itemName getIds getItem markFailed =
runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed)
tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a
tryGetItem :: (Show i, AnyStoreError e) => String -> (i -> IO (Either e a)) -> (i -> IO ()) -> i -> ExceptT e IO a
tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchAllErrors` \e -> mark >> throwE e
where
mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId
mark = handleWrkErr itemName ("markFailed ID " <> show itemId) $ markFailed itemId
-- Errors caught by this function will suspend worker as if there is no more work,
handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a
handleWrkErr :: forall e a. AnyStoreError e => String -> String -> IO a -> ExceptT e IO a
handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action
where
mkError :: E.SomeException -> StoreError
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
mkError :: E.SomeException -> e
mkError e = mkWorkItemError $ itemName <> " " <> opName <> " error: " <> show e
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
@@ -52,7 +52,7 @@ import Simplex.Messaging.Util (diffToMicroseconds, tshow)
newtype BoolInt = BI {unBI :: Bool}
deriving newtype (FromField, ToField)
newtype Binary = Binary {fromBinary :: ByteString}
newtype Binary a = Binary {fromBinary :: a}
deriving newtype (FromField, ToField)
data Connection = Connection
+4 -9
View File
@@ -292,18 +292,13 @@ currentSMPClientVersion = VersionSMPC 4
supportedSMPClientVRange :: VersionRangeSMPC
supportedSMPClientVRange = mkVersionRange initialSMPClientVersion currentSMPClientVersion
-- TODO v6.0 remove dependency on version
maxMessageLength :: VersionSMP -> Int
maxMessageLength v
| v >= encryptedBlockSMPVersion = 16048 -- max 16048
| v >= sendingProxySMPVersion = 16064 -- max 16067
| otherwise = 16088 -- 16048 - always use this size to determine allowed ranges
maxMessageLength :: Int
maxMessageLength = 16048
paddedProxiedTLength :: Int
paddedProxiedTLength = 16226 -- 16225 .. 16227
-- TODO v7.0 change to 16048
type MaxMessageLen = 16088
type MaxMessageLen = 16048
-- 16 extra bytes: 8 for timestamp and 8 for flags (7 flags and the space, only 1 flag is currently used)
type MaxRcvMessageLen = MaxMessageLen + 16 -- 16104, the padded size is 16106
@@ -1477,7 +1472,7 @@ data ErrorType
STORE {storeErr :: Text}
| -- | ACK command is sent without message to be acknowledged
NO_MSG
| -- | sent message is too large (> maxMessageLength = 16088 bytes)
| -- | sent message is too large (> maxMessageLength = 16048 bytes)
LARGE_MSG
| -- | relay public key is expired
EXPIRED
+6 -7
View File
@@ -1349,11 +1349,10 @@ client
ms
clnt@Client {clientId, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
let clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion t
<$> processCommand clntServiceId t
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
@@ -1439,8 +1438,8 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1829,7 +1828,7 @@ client
sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
sendMessage msgFlags msgBody q qr
| B.length msgBody > maxMessageLength clntVersion = do
| B.length msgBody > maxMessageLength = do
stats <- asks serverStats
incStat $ msgSentLarge stats
pure $ err LARGE_MSG
@@ -1982,7 +1981,7 @@ client
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right