From 42a2c1fc57aab5f3f7a4d3b708fc21b791a3b3a0 Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Mon, 8 Sep 2025 08:41:20 +0000 Subject: [PATCH 1/3] ci: fix git in docker (#1623) --- .github/workflows/build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 58ed41137..d86a4e069 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -160,6 +160,8 @@ jobs: if: matrix.should_run == true shell: docker exec -t builder sh -eu {0} run: | + chmod -R 777 dist-newstyle ~/.cabal && git config --global --add safe.directory '*' + cabal clean cabal update cabal build --jobs=$(nproc) --enable-tests -fserver_postgres mkdir -p /out From 8fea15245aaf7dd7b8ab8cf549e63b8f117da0a8 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 8 Sep 2025 11:28:42 +0100 Subject: [PATCH 2/3] smp server: remove dependency of message size on the version (#1627) --- src/Simplex/Messaging/Protocol.hs | 13 ++++--------- src/Simplex/Messaging/Server.hs | 13 ++++++------- tests/CoreTests/BatchingTests.hs | 2 +- tests/SMPProxyTests.hs | 3 +-- tests/ServerTests.hs | 8 ++++---- 5 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 40314ad2a..1588742f3 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -292,18 +292,13 @@ currentSMPClientVersion = VersionSMPC 4 supportedSMPClientVRange :: VersionRangeSMPC supportedSMPClientVRange = mkVersionRange initialSMPClientVersion currentSMPClientVersion --- TODO v6.0 remove dependency on version -maxMessageLength :: VersionSMP -> Int -maxMessageLength v - | v >= encryptedBlockSMPVersion = 16048 -- max 16048 - | v >= sendingProxySMPVersion = 16064 -- max 16067 - | otherwise = 16088 -- 16048 - always use this size to determine allowed ranges +maxMessageLength :: Int +maxMessageLength = 16048 paddedProxiedTLength :: Int paddedProxiedTLength = 16226 -- 16225 .. 16227 --- TODO v7.0 change to 16048 -type MaxMessageLen = 16088 +type MaxMessageLen = 16048 -- 16 extra bytes: 8 for timestamp and 8 for flags (7 flags and the space, only 1 flag is currently used) type MaxRcvMessageLen = MaxMessageLen + 16 -- 16104, the padded size is 16106 @@ -1477,7 +1472,7 @@ data ErrorType STORE {storeErr :: Text} | -- | ACK command is sent without message to be acknowledged NO_MSG - | -- | sent message is too large (> maxMessageLength = 16088 bytes) + | -- | sent message is too large (> maxMessageLength = 16048 bytes) LARGE_MSG | -- | relay public key is expired EXPIRED diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 23ce85035..6c288904d 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1349,11 +1349,10 @@ client ms clnt@Client {clientId, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" - let THandleParams {thVersion} = thParams' - clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') + let clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') process t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion t + <$> processCommand clntServiceId t forever $ atomically (readTBQueue rcvQ) >>= foldrM process ([], []) @@ -1439,8 +1438,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1829,7 +1828,7 @@ client sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) sendMessage msgFlags msgBody q qr - | B.length msgBody > maxMessageLength clntVersion = do + | B.length msgBody > maxMessageLength = do stats <- asks serverStats incStat $ msgSentLarge stats pure $ err LARGE_MSG @@ -1982,7 +1981,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/tests/CoreTests/BatchingTests.hs b/tests/CoreTests/BatchingTests.hs index d013c0db4..3fcc257f7 100644 --- a/tests/CoreTests/BatchingTests.hs +++ b/tests/CoreTests/BatchingTests.hs @@ -389,7 +389,7 @@ randomMSG = do corrId <- atomically $ C.randomBytes 24 g rId <- atomically $ C.randomBytes 24 g msgId <- atomically $ C.randomBytes 24 g - msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g + msg <- atomically $ C.randomBytes maxMessageLength g pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg}) randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString)) diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 5f1a59fd0..0e3db6424 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -69,7 +69,6 @@ smpProxyTests = do let srv1 = SMPServer testHost testPort testKeyHash srv2 = SMPServer testHost2 testPort2 testKeyHash describe "client API" $ do - let maxLen = maxMessageLength encryptedBlockSMPVersion describe "one server" $ do it "deliver via proxy" . oneServer $ do deliverMessageViaProxy srv1 srv1 C.SEd448 "hello 1" "hello 2" @@ -78,7 +77,7 @@ smpProxyTests = do relayServ = srv2 (msg1, msg2) <- runIO $ do g <- C.newRandom - atomically $ (,) <$> C.randomBytes maxLen g <*> C.randomBytes maxLen g + atomically $ (,) <$> C.randomBytes maxMessageLength g <*> C.randomBytes maxMessageLength g it "deliver via proxy" . twoServersFirstProxy $ deliverMessageViaProxy proxyServ relayServ C.SEd448 "hello 1" "hello 2" it "max message size, Ed448 keys" . twoServersFirstProxy $ diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 204365931..043f11e31 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -232,12 +232,12 @@ testCreateSecure = Resp "dabc" _ err5 <- sendRecv s ("", "dabc", sId, _SEND "hello") (err5, ERR AUTH) #== "rejects unsigned SEND" - let maxAllowedMessage = B.replicate (maxMessageLength currentClientSMPRelayVersion) '-' + let maxAllowedMessage = B.replicate maxMessageLength '-' Resp "bcda" _ OK <- signSendRecv s sKey ("bcda", sId, _SEND maxAllowedMessage) Resp "" _ (Msg mId3 msg3) <- tGet1 r (dec mId3 msg3, Right maxAllowedMessage) #== "delivers message of max size" - let biggerMessage = B.replicate (maxMessageLength currentClientSMPRelayVersion + 1) '-' + let biggerMessage = B.replicate (maxMessageLength + 1) '-' Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv s sKey ("bcda", sId, _SEND biggerMessage) pure () @@ -279,12 +279,12 @@ testCreateSndSecure = Resp "dabc" _ err5 <- sendRecv s ("", "dabc", sId, _SEND "hello") (err5, ERR AUTH) #== "rejects unsigned SEND" - let maxAllowedMessage = B.replicate (maxMessageLength currentClientSMPRelayVersion) '-' + let maxAllowedMessage = B.replicate maxMessageLength '-' Resp "bcda" _ OK <- signSendRecv s sKey ("bcda", sId, _SEND maxAllowedMessage) Resp "" _ (Msg mId3 msg3) <- tGet1 r (dec mId3 msg3, Right maxAllowedMessage) #== "delivers message of max size" - let biggerMessage = B.replicate (maxMessageLength currentClientSMPRelayVersion + 1) '-' + let biggerMessage = B.replicate (maxMessageLength + 1) '-' Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv s sKey ("bcda", sId, _SEND biggerMessage) pure () From a4f049d8da1021ee76fbd1b25635d16aef24154a Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:38:08 +0000 Subject: [PATCH 3/3] agent: parameterize withWork, getWorkItem with StoreError; parameterized Binary for SQLite (#1617) * agent: parameterize withWork StoreError * getWorkItem * export * binary * remove handleWrkErr AnyStoreError constraint * put AnyError in AnyStoreError constraint * move typeclass --------- Co-authored-by: Evgeny Poberezkin --- src/Simplex/Messaging/Agent/Client.hs | 18 ++++++++---------- src/Simplex/Messaging/Agent/Store.hs | 12 +++++++++++- .../Messaging/Agent/Store/AgentStore.hs | 16 +++++++++------- src/Simplex/Messaging/Agent/Store/SQLite/DB.hs | 2 +- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 4a1a1c40e..21c0436ee 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1974,19 +1974,20 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError ( withWork c doWork = withWork_ c doWork . withStore' c {-# INLINE withWork #-} -withWork_ :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m () +withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m () withWork_ c doWork getWork action = getWork >>= \case Right (Just r) -> action r Right Nothing -> noWork -- worker is stopped here (noWork) because the next iteration is likely to produce the same result - Left e@SEWorkItemError {} -> noWork >> notifyErr (CRITICAL False) e - Left e -> notifyErr INTERNAL e + Left e + | isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e + | otherwise -> notifyErr INTERNAL e where noWork = liftIO $ noWorkToDo doWork notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) -withWorkItems :: MonadIO m => AgentClient -> TMVar () -> ExceptT e m (Either StoreError [Either StoreError a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m () +withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m () withWorkItems c doWork getWork action = do getWork >>= \case Right [] -> noWork @@ -1995,20 +1996,17 @@ withWorkItems c doWork getWork action = do case L.nonEmpty items of Just items' -> action items' Nothing -> do - let criticalErr = find workItemError errs + let criticalErr = find isWorkItemError errs forM_ criticalErr $ \err -> do notifyErr (CRITICAL False) err - when (all workItemError errs) noWork + when (all isWorkItemError errs) noWork unless (null errs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs) Left e - | workItemError e -> noWork >> notifyErr (CRITICAL False) e + | isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e | otherwise -> notifyErr INTERNAL e where - workItemError = \case - SEWorkItemError {} -> True - _ -> False noWork = liftIO $ noWorkToDo doWork notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e) diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 8dee07037..7a1fc391d 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -693,10 +693,20 @@ data StoreError | -- | XFTP Deleted snd chunk replica not found. SEDeletedSndChunkReplicaNotFound | -- | Error when reading work item that suspends worker - do not use! - SEWorkItemError ByteString + SEWorkItemError {errContext :: String} | -- | Servers stats not found. SEServersStatsNotFound deriving (Eq, Show, Exception) instance AnyError StoreError where fromSomeException = SEInternal . bshow + +class (Show e, AnyError e) => AnyStoreError e where + isWorkItemError :: e -> Bool + mkWorkItemError :: String -> e + +instance AnyStoreError StoreError where + isWorkItemError = \case + SEWorkItemError {} -> True + _ -> False + mkWorkItemError errContext = SEWorkItemError {errContext} diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 59175e942..fef829c66 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -237,6 +237,8 @@ module Simplex.Messaging.Agent.Store.AgentStore firstRow', maybeFirstRow, fromOnlyBI, + getWorkItem, + getWorkItems, ) where @@ -966,25 +968,25 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} = _ -> Left $ SEInternal "unexpected snd msg data" markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId) -getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a)) +getWorkItem :: (Show i, AnyStoreError e) => String -> IO (Maybe i) -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e (Maybe a)) getWorkItem itemName getId getItem markFailed = runExceptT $ handleWrkErr itemName "getId" getId >>= mapM (tryGetItem itemName getItem markFailed) -getWorkItems :: Show i => ByteString -> IO [i] -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError [Either StoreError a]) +getWorkItems :: (Show i, AnyStoreError e) => String -> IO [i] -> (i -> IO (Either e a)) -> (i -> IO ()) -> IO (Either e [Either e a]) getWorkItems itemName getIds getItem markFailed = runExceptT $ handleWrkErr itemName "getIds" getIds >>= mapM (tryE . tryGetItem itemName getItem markFailed) -tryGetItem :: Show i => ByteString -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> i -> ExceptT StoreError IO a +tryGetItem :: (Show i, AnyStoreError e) => String -> (i -> IO (Either e a)) -> (i -> IO ()) -> i -> ExceptT e IO a tryGetItem itemName getItem markFailed itemId = ExceptT (getItem itemId) `catchAllErrors` \e -> mark >> throwE e where - mark = handleWrkErr itemName ("markFailed ID " <> bshow itemId) $ markFailed itemId + mark = handleWrkErr itemName ("markFailed ID " <> show itemId) $ markFailed itemId -- Errors caught by this function will suspend worker as if there is no more work, -handleWrkErr :: ByteString -> ByteString -> IO a -> ExceptT StoreError IO a +handleWrkErr :: forall e a. AnyStoreError e => String -> String -> IO a -> ExceptT e IO a handleWrkErr itemName opName action = ExceptT $ first mkError <$> E.try action where - mkError :: E.SomeException -> StoreError - mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e + mkError :: E.SomeException -> e + mkError e = mkWorkItemError $ itemName <> " " <> opName <> " error: " <> show e updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs index 7da6b2ca2..2620e561b 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs @@ -52,7 +52,7 @@ import Simplex.Messaging.Util (diffToMicroseconds, tshow) newtype BoolInt = BI {unBI :: Bool} deriving newtype (FromField, ToField) -newtype Binary = Binary {fromBinary :: ByteString} +newtype Binary a = Binary {fromBinary :: a} deriving newtype (FromField, ToField) data Connection = Connection