From ec0881f50f8574507d3796546f491fc35553486d Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 5 Jan 2022 19:57:04 +0000 Subject: [PATCH 1/7] remove message statuses and unused timestamps --- migrations/20210101_initial.sql | 6 +- src/Simplex/Messaging/Agent/Store.hs | 82 --------------------- src/Simplex/Messaging/Agent/Store/SQLite.hs | 23 ++---- 3 files changed, 6 insertions(+), 105 deletions(-) diff --git a/migrations/20210101_initial.sql b/migrations/20210101_initial.sql index ce8e10ea4..361765d2a 100644 --- a/migrations/20210101_initial.sql +++ b/migrations/20210101_initial.sql @@ -60,7 +60,7 @@ CREATE TABLE messages ( internal_ts TEXT NOT NULL, internal_rcv_id INTEGER, internal_snd_id INTEGER, - msg_type BLOB NOT NULL, -- SMP_CONF?, HELLO, REPLY, DELETE + msg_type BLOB NOT NULL, -- (H)ELLO, (R)EPLY, (D)ELETE. Should SMP confirmation be saved too? msg_body BLOB NOT NULL DEFAULT x'', PRIMARY KEY (conn_alias, internal_id), FOREIGN KEY (conn_alias, internal_rcv_id) REFERENCES rcv_messages @@ -76,8 +76,6 @@ CREATE TABLE rcv_messages ( external_snd_id INTEGER NOT NULL, broker_id BLOB NOT NULL, broker_ts TEXT NOT NULL, - rcv_status TEXT NOT NULL, - ack_brocker_ts TEXT, internal_hash BLOB NOT NULL, external_prev_snd_hash BLOB NOT NULL, integrity BLOB NOT NULL, @@ -90,8 +88,6 @@ CREATE TABLE snd_messages ( conn_alias BLOB NOT NULL, internal_snd_id INTEGER NOT NULL, internal_id INTEGER NOT NULL, - snd_status TEXT NOT NULL, - sent_ts TEXT, internal_hash BLOB NOT NULL, previous_msg_hash BLOB NOT NULL DEFAULT x'', PRIMARY KEY (conn_alias, internal_snd_id), diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index d46fecff8..43e752353 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -3,9 +3,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE StandaloneDeriving #-} {-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-} @@ -17,7 +15,6 @@ import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import Data.Int (Int64) import Data.Kind (Type) -import Data.Text (Text) import Data.Time (UTCTime) import Data.Type.Equality import Simplex.Messaging.Agent.Protocol @@ -66,7 +63,6 @@ class Monad m => MonadAgentStore s m where createSndMsg :: s -> ConnId -> SndMsgData -> m () getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody)) getPendingMsgs :: s -> ConnId -> m [InternalId] - getMsg :: s -> ConnId -> InternalId -> m Msg checkRcvMsg :: s -> ConnId -> InternalId -> m () deleteMsg :: s -> ConnId -> InternalId -> m () @@ -210,8 +206,6 @@ type PrevRcvMsgHash = MsgHash -- | Corresponds to `last_snd_msg_hash` in `connections` table type PrevSndMsgHash = MsgHash --- ? merge/replace these with RcvMsg and SndMsg - -- * Message data containers - used on Msg creation to reduce number of parameters data RcvMsgData = RcvMsgData @@ -239,35 +233,6 @@ data PendingMsg = PendingMsg } deriving (Show) --- * Message types - --- | A message in either direction that is stored by the agent. -data Msg = MRcv RcvMsg | MSnd SndMsg - deriving (Eq, Show) - --- | A message received by the agent from a sender. -data RcvMsg = RcvMsg - { msgBase :: MsgBase, - internalRcvId :: InternalRcvId, - -- | Id of the message at sender, corresponds to `internalSndId` from the sender's side. - -- Sender Id is made sequential for detection of missing messages. For redundant / parallel queues, - -- it also allows to keep track of duplicates and restore the original order before delivery to the client. - externalSndId :: ExternalSndId, - externalSndTs :: ExternalSndTs, - -- | Id of the message at broker, although it is not sequential (to avoid metadata leakage for potential observer), - -- it is needed to track repeated deliveries in case of connection loss - this logic is not implemented yet. - brokerId :: BrokerId, - brokerTs :: BrokerTs, - rcvMsgStatus :: RcvMsgStatus, - -- | Timestamp of acknowledgement to broker, corresponds to `Acknowledged` status. - -- Don't confuse with `brokerTs` - timestamp created at broker after it receives the message from sender. - ackBrokerTs :: AckBrokerTs, - -- | Hash of previous message as received from sender - stored for integrity forensics. - externalPrevSndHash :: MsgHash, - msgIntegrity :: MsgIntegrity - } - deriving (Eq, Show) - -- internal Ids are newtypes to prevent mixing them up newtype InternalRcvId = InternalRcvId {unRcvId :: Int64} deriving (Eq, Show) @@ -279,55 +244,8 @@ type BrokerId = MsgId type BrokerTs = UTCTime -data RcvMsgStatus = RcvMsgReceived | RcvMsgAcknowledged - deriving (Eq, Show) - -serializeRcvMsgStatus :: RcvMsgStatus -> Text -serializeRcvMsgStatus = \case - RcvMsgReceived -> "rcvd" - RcvMsgAcknowledged -> "ackd" - -rcvMsgStatusT :: Text -> Maybe RcvMsgStatus -rcvMsgStatusT = \case - "rcvd" -> Just RcvMsgReceived - "ackd" -> Just RcvMsgAcknowledged - _ -> Nothing - -type AckBrokerTs = UTCTime - -type AckSenderTs = UTCTime - --- | A message sent by the agent to a recipient. -data SndMsg = SndMsg - { msgBase :: MsgBase, - -- | Id of the message sent / to be sent, as in its number in order of sending. - internalSndId :: InternalSndId, - sndMsgStatus :: SndMsgStatus, - -- | Timestamp of the message received by broker, corresponds to `Sent` status. - sentTs :: SentTs - } - deriving (Eq, Show) - newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show) -data SndMsgStatus = SndMsgCreated | SndMsgSent - deriving (Eq, Show) - -serializeSndMsgStatus :: SndMsgStatus -> Text -serializeSndMsgStatus = \case - SndMsgCreated -> "created" - SndMsgSent -> "sent" - -sndMsgStatusT :: Text -> Maybe SndMsgStatus -sndMsgStatusT = \case - "created" -> Just SndMsgCreated - "sent" -> Just SndMsgSent - _ -> Nothing - -type SentTs = UTCTime - -type DeliveredTs = UTCTime - -- | Base message data independent of direction. data MsgBase = MsgBase { connAlias :: ConnId, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 037a24956..33e092195 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -468,10 +468,7 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto getPendingMsgs st connId = liftIO . withTransaction st $ \db -> map fromOnly - <$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_alias = ? AND snd_status = ?" (connId, SndMsgCreated) - - getMsg :: SQLiteStore -> ConnId -> InternalId -> m Msg - getMsg _st _connId _id = throwError SENotImplemented + <$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_alias = ?" (Only connId) checkRcvMsg :: SQLiteStore -> ConnId -> InternalId -> m () checkRcvMsg st connId msgId = @@ -512,18 +509,10 @@ instance ToField InternalId where toField (InternalId x) = toField x instance FromField InternalId where fromField x = InternalId <$> fromField x -instance ToField RcvMsgStatus where toField = toField . serializeRcvMsgStatus - -instance FromField RcvMsgStatus where fromField = fromTextField_ rcvMsgStatusT - instance ToField AMsgType where toField = toField . smpEncode instance FromField AMsgType where fromField = blobFieldParser smpP -instance ToField SndMsgStatus where toField = toField . serializeSndMsgStatus - -instance FromField SndMsgStatus where fromField = fromTextField_ sndMsgStatusT - instance ToField MsgIntegrity where toField = toField . serializeMsgIntegrity instance FromField MsgIntegrity where fromField = blobFieldParser msgIntegrityP @@ -764,11 +753,11 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH [sql| INSERT INTO rcv_messages ( conn_alias, internal_rcv_id, internal_id, external_snd_id, - broker_id, broker_ts, rcv_status, + broker_id, broker_ts, internal_hash, external_prev_snd_hash, integrity) VALUES (:conn_alias,:internal_rcv_id,:internal_id,:external_snd_id, - :broker_id,:broker_ts,:rcv_status, + :broker_id,:broker_ts, :internal_hash,:external_prev_snd_hash,:integrity); |] [ ":conn_alias" := connId, @@ -777,7 +766,6 @@ insertRcvMsgDetails_ dbConn connId RcvMsgData {msgMeta, internalRcvId, internalH ":external_snd_id" := sndMsgId, ":broker_id" := fst broker, ":broker_ts" := snd broker, - ":rcv_status" := RcvMsgReceived, ":internal_hash" := internalHash, ":external_prev_snd_hash" := externalPrevSndHash, ":integrity" := integrity @@ -857,14 +845,13 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} = dbConn [sql| INSERT INTO snd_messages - ( conn_alias, internal_snd_id, internal_id, snd_status, internal_hash, previous_msg_hash) + ( conn_alias, internal_snd_id, internal_id, internal_hash, previous_msg_hash) VALUES - (:conn_alias,:internal_snd_id,:internal_id,:snd_status,:internal_hash,:previous_msg_hash); + (:conn_alias,:internal_snd_id,:internal_id,:internal_hash,:previous_msg_hash); |] [ ":conn_alias" := connId, ":internal_snd_id" := internalSndId, ":internal_id" := internalId, - ":snd_status" := SndMsgCreated, ":internal_hash" := internalHash, ":previous_msg_hash" := prevMsgHash ] From 3c923a3dc0f071ebf862379d3742772e4a790f27 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 5 Jan 2022 21:07:34 +0000 Subject: [PATCH 2/7] test: HELLO blocking message delivery on the same server --- tests/AgentTests.hs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 9b3f0f80d..51168c022 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -65,6 +65,8 @@ agentTests (ATransport t) = do smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t it "should deliver pending messages after agent restarting" $ smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t + xit "should concurrently deliver messages to connections without blocking" $ + smpAgentTest2_2_1 $ testConcurrentMsgDelivery t -- | receive message to handle `h` (<#:) :: Transport c => c -> IO (ATransmissionOrError 'Agent) @@ -327,6 +329,32 @@ testMsgDeliveryAgentRestart t bob = do withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` () withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort +testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO () +testConcurrentMsgDelivery _ alice bob = do + connect (alice, "alice") (bob, "bob") + + ("1", "bob2", Right (INV cReq)) <- alice #: ("1", "bob2", "NEW INV") + let cReq' = strEncode cReq + bob #: ("11", "alice2", "JOIN " <> cReq' <> " 14\nbob's connInfo") #> ("11", "alice2", OK) + ("", "bob2", Right (CONF _confId "bob's connInfo")) <- (alice <#:) + -- below commands would be needed to accept bob's connection, but alice does not + -- alice #: ("2", "bob", "LET " <> _confId <> " 16\nalice's connInfo") #> ("2", "bob", OK) + -- bob <# ("", "alice", INFO "alice's connInfo") + -- bob <# ("", "alice", CON) + -- alice <# ("", "bob", CON) + + -- the first connection should not be blocked by the second one + alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) + alice <# ("", "bob", SENT 1) + bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False + bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) + bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2) + putStrLn "it gets this far" + bob <# ("", "alice", SENT 2) + putStrLn "it never gets here as the message is blocked by HELLO in in another connection" + alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False + alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK) + connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do ("c1", _, Right (INV cReq)) <- h1 #: ("c1", name2, "NEW INV") From 6f1d9db8ec913544e5b6b05d0c677b29bb001089 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 5 Jan 2022 21:48:40 +0000 Subject: [PATCH 3/7] test: quota exceeded in one queue should not block delivery in other queues --- tests/AgentTests.hs | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 51168c022..d7ba1c2b6 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -14,6 +14,7 @@ import AgentTests.DoubleRatchetTests (doubleRatchetTests) import AgentTests.FunctionalAPITests (functionalAPITests) import AgentTests.SQLiteTests (storeTests) import Control.Concurrent +import Control.Monad (forM_) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Network.HTTP.Types (urlEncode) @@ -24,6 +25,7 @@ import qualified Simplex.Messaging.Agent.Protocol as A import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..)) +import Simplex.Messaging.Util (bshow) import System.Directory (removeFile) import System.Timeout import Test.Hspec @@ -67,6 +69,8 @@ agentTests (ATransport t) = do smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t xit "should concurrently deliver messages to connections without blocking" $ smpAgentTest2_2_1 $ testConcurrentMsgDelivery t + xit "should deliver messages if one of connections has quota exceeded" $ + smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t -- | receive message to handle `h` (<#:) :: Transport c => c -> IO (ATransmissionOrError 'Agent) @@ -344,10 +348,11 @@ testConcurrentMsgDelivery _ alice bob = do -- alice <# ("", "bob", CON) -- the first connection should not be blocked by the second one - alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) - alice <# ("", "bob", SENT 1) - bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False - bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) + sendMessage (alice, "alice") (bob, "bob") "hello" + -- alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) + -- alice <# ("", "bob", SENT 1) + -- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False + -- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2) putStrLn "it gets this far" bob <# ("", "alice", SENT 2) @@ -355,6 +360,22 @@ testConcurrentMsgDelivery _ alice bob = do alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK) +testMsgDeliveryQuotaExceeded :: Transport c => TProxy c -> c -> c -> IO () +testMsgDeliveryQuotaExceeded _ alice bob = do + connect (alice, "alice") (bob, "bob") + connect (alice, "alice2") (bob, "bob2") + forM_ [1 .. 4 :: Int] $ \i -> do + let corrId = bshow i + msg = "message " <> bshow i + (_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND :" <> msg) + alice <#= \case ("", "bob", SENT m) -> m == mId; _ -> False + (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota") + + alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1) + putStrLn "it gets this far" + alice <# ("", "bob2", SENT 1) + putStrLn "it never gets here as the message is blocked by MSG in in another connection" + connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do ("c1", _, Right (INV cReq)) <- h1 #: ("c1", name2, "NEW INV") @@ -366,6 +387,16 @@ connect (h1, name1) (h2, name2) = do h2 <# ("", name1, CON) h1 <# ("", name2, CON) +sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () +sendMessage (h1, name1) (h2, name2) msg = do + ("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND :" <> msg) + name2' `shouldBe` name2 + h1 <#= \case ("", n, SENT m) -> n == name2 && m == mId; _ -> False + ("", name1', Right (MSG MsgMeta {recipient = (msgId, _)} msg')) <- (h2 <#:) + name1' `shouldBe` name1 + msg' `shouldBe` msg + h2 #: ("m2", name1, "ACK " <> bshow msgId) =#> \case ("m2", n, OK) -> n == name1; _ -> False + -- connect' :: forall c. Transport c => c -> c -> IO (ByteString, ByteString) -- connect' h1 h2 = do -- ("c1", conn2, Right (INV cReq)) <- h1 #: ("c1", "", "NEW INV") From 10c62e7fa20f634a4c4323ad406c2bb3ca9c897d Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 6 Jan 2022 08:56:19 +0000 Subject: [PATCH 4/7] fix message delivery when quota exceeded - making it concurrent per queue, not per server (#260) * test: HELLO blocking message delivery on the same server * test: quota exceeded in one queue should not block delivery in other queues * fix test to work in stable branch * simplify pending message delivery (#202) * simplify pending message delivery (WIP) * refactor * fix concurrent message delivery * remove type synonym --- src/Simplex/Messaging/Agent.hs | 99 +++++++++------------ src/Simplex/Messaging/Agent/Client.hs | 20 ++--- src/Simplex/Messaging/Agent/Store.hs | 4 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 34 +++---- src/Simplex/Messaging/Util.hs | 8 ++ tests/AgentTests.hs | 57 ++++++++++++ 6 files changed, 134 insertions(+), 88 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 68c8586dd..52209358e 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -69,7 +69,6 @@ import Data.Composition ((.:), (.:.)) import Data.Functor (($>)) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L -import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust) import qualified Data.Text as T @@ -87,9 +86,9 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (MsgBody, SenderPublicKey) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport (ATransport (..), TProxy, Transport (..), currentSMPVersionStr, runTransportServer) -import Simplex.Messaging.Util (bshow, tryError) +import Simplex.Messaging.Util (bshow, tryError, unlessM) import System.Random (randomR) -import UnliftIO.Async (Async, async, race_) +import UnliftIO.Async (async, race_) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -334,7 +333,7 @@ subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () subscribeConnection' c connId = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ rq sq) -> do - resumeDelivery sq + resumeMsgDelivery c connId sq case status (sq :: SndQueue) of Confirmed -> withVerifyKey sq $ \verifyKey -> do conf <- withStore (`getAcceptedConfirmation` connId) @@ -345,7 +344,7 @@ subscribeConnection' c connId = Active -> subscribeQueue c rq connId _ -> throwError $ INTERNAL "unexpected queue status" SomeConn _ (SndConnection _ sq) -> do - resumeDelivery sq + resumeMsgDelivery c connId sq case status (sq :: SndQueue) of Confirmed -> withVerifyKey sq $ \verifyKey -> activateQueueJoining c connId sq verifyKey =<< resumeInterval @@ -354,12 +353,6 @@ subscribeConnection' c connId = SomeConn _ (RcvConnection _ rq) -> subscribeQueue c rq connId SomeConn _ (ContactConnection _ rq) -> subscribeQueue c rq connId where - resumeDelivery :: SndQueue -> m () - resumeDelivery SndQueue {server} = do - wasDelivering <- resumeMsgDelivery c connId server - unless wasDelivering $ do - pending <- withStore (`getPendingMsgs` connId) - queuePendingMsgs c connId pending withVerifyKey :: SndQueue -> (C.PublicKey -> m ()) -> m () withVerifyKey sq action = let err = throwError $ INTERNAL "missing signing key public counterpart" @@ -382,14 +375,10 @@ sendMessage' c connId msg = _ -> throwError $ CONN SIMPLEX where enqueueMessage :: SndQueue -> m AgentMsgId - enqueueMessage SndQueue {server} = do + enqueueMessage sq = do + resumeMsgDelivery c connId sq msgId <- storeSentMsg - wasDelivering <- resumeMsgDelivery c connId server - pending <- - if wasDelivering - then pure [PendingMsg {connId, msgId}] - else withStore (`getPendingMsgs` connId) - queuePendingMsgs c connId pending + queuePendingMsgs c connId sq [msgId] pure $ unId msgId where storeSentMsg :: m InternalId @@ -410,63 +399,63 @@ sendMessage' c connId msg = createSndMsg st connId msgData pure internalId -resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SMPServer -> m Bool -resumeMsgDelivery c connId srv = do - void $ resume srv (srvMsgDeliveries c) $ runSrvMsgDelivery c srv - resume connId (connMsgDeliveries c) $ runMsgDelivery c connId srv +resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () +resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + unlessM (queueDelivering qKey) $ + async (runSmpQueueMsgDelivery c connId sq) + >>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey + unlessM connQueued $ + withStore (`getPendingMsgs` connId) + >>= queuePendingMsgs c connId sq where - resume :: Ord a => a -> TVar (Map a (Async ())) -> m () -> m Bool - resume key actionMap actionProcess = do - isDelivering <- isJust . M.lookup key <$> readTVarIO actionMap - unless isDelivering $ - async actionProcess - >>= atomically . modifyTVar actionMap . M.insert key - pure isDelivering + queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c) + connQueued = + atomically $ + isJust + <$> stateTVar + (connMsgsQueued c) + (\m -> (M.lookup connId m, M.insert connId True m)) -queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> [PendingMsg] -> m () -queuePendingMsgs c connId pending = - atomically $ getPendingMsgQ connId (connMsgQueues c) >>= forM_ pending . writeTQueue +queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m () +queuePendingMsgs c connId sq msgIds = atomically $ do + q <- getPendingMsgQ c connId sq + mapM_ (writeTQueue q) msgIds -getPendingMsgQ :: Ord a => a -> TVar (Map a (TQueue PendingMsg)) -> STM (TQueue PendingMsg) -getPendingMsgQ key queueMap = do - maybe newMsgQueue pure . M.lookup key =<< readTVar queueMap +getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId) +getPendingMsgQ c connId SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c) where - newMsgQueue :: STM (TQueue PendingMsg) - newMsgQueue = do + newMsgQueue qKey = do mq <- newTQueue - modifyTVar queueMap $ M.insert key mq + modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq pure mq -runMsgDelivery :: AgentMonad m => AgentClient -> ConnId -> SMPServer -> m () -runMsgDelivery c connId srv = do - mq <- atomically . getPendingMsgQ connId $ connMsgQueues c - smq <- atomically . getPendingMsgQ srv $ srvMsgQueues c - forever . atomically $ readTQueue mq >>= writeTQueue smq - -runSrvMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () -runSrvMsgDelivery c@AgentClient {subQ} srv = do - mq <- atomically . getPendingMsgQ srv $ srvMsgQueues c +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do + mq <- atomically $ getPendingMsgQ c connId sq ri <- asks $ reconnectInterval . config forever $ do - PendingMsg {connId, msgId} <- atomically $ readTQueue mq + msgId <- atomically $ readTQueue mq let mId = unId msgId withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> - notify connId $ MERR mId (INTERNAL $ show e) - Right (sq, msgBody) -> do + notify $ MERR mId (INTERNAL $ show e) + Right msgBody -> do withRetryInterval ri $ \loop -> do tryError (sendAgentMessage c sq msgBody) >>= \case Left e -> case e of SMP SMP.QUOTA -> loop - SMP {} -> notify connId $ MERR mId e - CMD {} -> notify connId $ MERR mId e + SMP {} -> notify $ MERR mId e + CMD {} -> notify $ MERR mId e _ -> loop Right () -> do - notify connId $ SENT mId + notify $ SENT mId withStore $ \st -> updateSndMsgStatus st connId msgId SndMsgSent where - notify :: ConnId -> ACommand 'Agent -> m () - notify connId cmd = atomically $ writeTBQueue subQ ("", connId, cmd) + notify :: ACommand 'Agent -> m () + notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd) ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage' c connId msgId = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index e99486c7f..48432c945 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -60,6 +60,7 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgBody, QueueId, SenderPublicKey) +import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Util (bshow, liftEitherError, liftError) import UnliftIO.Exception (IOException) import qualified UnliftIO.Exception as E @@ -73,10 +74,9 @@ data AgentClient = AgentClient subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)), subscrConns :: TVar (Map ConnId SMPServer), activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress - connMsgQueues :: TVar (Map ConnId (TQueue PendingMsg)), - connMsgDeliveries :: TVar (Map ConnId (Async ())), - srvMsgQueues :: TVar (Map SMPServer (TQueue PendingMsg)), - srvMsgDeliveries :: TVar (Map SMPServer (Async ())), + connMsgsQueued :: TVar (Map ConnId Bool), + smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)), + smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())), reconnections :: TVar [Async ()], clientId :: Int, agentEnv :: Env, @@ -94,14 +94,13 @@ newAgentClient agentEnv = do subscrSrvrs <- newTVar M.empty subscrConns <- newTVar M.empty activations <- newTVar M.empty - connMsgQueues <- newTVar M.empty - connMsgDeliveries <- newTVar M.empty - srvMsgQueues <- newTVar M.empty - srvMsgDeliveries <- newTVar M.empty + connMsgsQueued <- newTVar M.empty + smpQueueMsgQueues <- newTVar M.empty + smpQueueMsgDeliveries <- newTVar M.empty reconnections <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) lock <- newTMVar () - return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgQueues, connMsgDeliveries, srvMsgQueues, srvMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} + return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -177,8 +176,7 @@ closeAgentClient c = liftIO $ do closeSMPServerClients c cancelActions $ activations c cancelActions $ reconnections c - cancelActions $ connMsgDeliveries c - cancelActions $ srvMsgDeliveries c + cancelActions $ smpQueueMsgDeliveries c closeSMPServerClients :: AgentClient -> IO () closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 4c14c5fe5..14cd2f9db 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -63,8 +63,8 @@ class Monad m => MonadAgentStore s m where updateSndIds :: s -> ConnId -> m (InternalId, InternalSndId, PrevSndMsgHash) createSndMsg :: s -> ConnId -> SndMsgData -> m () updateSndMsgStatus :: s -> ConnId -> InternalId -> SndMsgStatus -> m () - getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, MsgBody) - getPendingMsgs :: s -> ConnId -> m [PendingMsg] + getPendingMsgData :: s -> ConnId -> InternalId -> m MsgBody + getPendingMsgs :: s -> ConnId -> m [InternalId] getMsg :: s -> ConnId -> InternalId -> m Msg checkRcvMsg :: s -> ConnId -> InternalId -> m () updateRcvMsgAck :: s -> ConnId -> InternalId -> m () diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 75158b0c5..f7dc744fc 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -461,34 +461,28 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto ":snd_status" := msgStatus ] - getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (SndQueue, MsgBody) + getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m MsgBody getPendingMsgData st connId msgId = - liftIOEither . withTransaction st $ \db -> runExceptT $ do - sq <- ExceptT $ sndQueue <$> getSndQueueByConnAlias_ db connId - msgBody <- - ExceptT $ - sndMsgData - <$> DB.query - db - [sql| - SELECT m.msg_body - FROM messages m - JOIN snd_messages s ON s.conn_alias = m.conn_alias AND s.internal_id = m.internal_id - WHERE m.conn_alias = ? AND m.internal_id = ? - |] - (connId, msgId) - pure (sq, msgBody) + liftIOEither . withTransaction st $ \db -> + sndMsgData + <$> DB.query + db + [sql| + SELECT m.msg_body + FROM messages m + JOIN snd_messages s ON s.conn_alias = m.conn_alias AND s.internal_id = m.internal_id + WHERE m.conn_alias = ? AND m.internal_id = ? + |] + (connId, msgId) where sndMsgData :: [Only MsgBody] -> Either StoreError MsgBody sndMsgData [Only msgBody] = Right msgBody sndMsgData _ = Left SEMsgNotFound - sndQueue :: Maybe SndQueue -> Either StoreError SndQueue - sndQueue = maybe (Left SEConnNotFound) Right - getPendingMsgs :: SQLiteStore -> ConnId -> m [PendingMsg] + getPendingMsgs :: SQLiteStore -> ConnId -> m [InternalId] getPendingMsgs st connId = liftIO . withTransaction st $ \db -> - map (PendingMsg connId . fromOnly) + map fromOnly <$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_alias = ? AND snd_status = ?" (connId, SndMsgCreated) getMsg :: SQLiteStore -> ConnId -> InternalId -> m Msg diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 5bd05c4a9..0d76781e8 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -53,3 +53,11 @@ liftEitherError f a = liftIOEither (first f <$> a) tryError :: MonadError e m => m a -> m (Either e a) tryError action = (Right <$> action) `catchError` (pure . Left) + +ifM :: Monad m => m Bool -> m a -> m a -> m a +ifM ba t f = ba >>= \b -> if b then t else f +{-# INLINE ifM #-} + +unlessM :: Monad m => m Bool -> m () -> m () +unlessM b = ifM b $ pure () +{-# INLINE unlessM #-} diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index cf7842d5b..65e5aa2ce 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -14,6 +14,7 @@ import AgentTests.ConnectionRequestTests import AgentTests.FunctionalAPITests (functionalAPITests) import AgentTests.SQLiteTests (storeTests) import Control.Concurrent +import Control.Monad (forM_) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Network.HTTP.Types (urlEncode) @@ -23,6 +24,7 @@ import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Agent.Protocol as A import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..)) +import Simplex.Messaging.Util (bshow) import System.Directory (removeFile) import System.Timeout import Test.Hspec @@ -63,6 +65,10 @@ agentTests (ATransport t) = do smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t it "should deliver pending messages after agent restarting" $ smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t + it "should concurrently deliver messages to connections without blocking" $ + smpAgentTest2_2_1 $ testConcurrentMsgDelivery t + it "should deliver messages if one of connections has quota exceeded" $ + smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t -- | receive message to handle `h` (<#:) :: Transport c => c -> IO (ATransmissionOrError 'Agent) @@ -325,6 +331,47 @@ testMsgDeliveryAgentRestart t bob = do withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` () withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort +testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO () +testConcurrentMsgDelivery _ alice bob = do + connect (alice, "alice") (bob, "bob") + + ("1", "bob2", Right (INV cReq)) <- alice #: ("1", "bob2", "NEW INV") + let cReq' = serializeConnReq cReq + bob #: ("11", "alice2", "JOIN " <> cReq' <> " 14\nbob's connInfo") #> ("11", "alice2", OK) + ("", "bob2", Right (CONF _confId "bob's connInfo")) <- (alice <#:) + -- below commands would be needed to accept bob's connection, but alice does not + -- alice #: ("2", "bob", "LET " <> _confId <> " 16\nalice's connInfo") #> ("2", "bob", OK) + -- bob <# ("", "alice", INFO "alice's connInfo") + -- bob <# ("", "alice", CON) + -- alice <# ("", "bob", CON) + + -- the first connection should not be blocked by the second one + sendMessage (alice, "alice") (bob, "bob") "hello" + -- alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) + -- alice <# ("", "bob", SENT 1) + -- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False + -- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) + bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2) + bob <# ("", "alice", SENT 2) + -- if delivery is blocked it won't go further + alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False + alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK) + +testMsgDeliveryQuotaExceeded :: Transport c => TProxy c -> c -> c -> IO () +testMsgDeliveryQuotaExceeded _ alice bob = do + connect (alice, "alice") (bob, "bob") + connect (alice, "alice2") (bob, "bob2") + forM_ [1 .. 4 :: Int] $ \i -> do + let corrId = bshow i + msg = "message " <> bshow i + (_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND :" <> msg) + alice <#= \case ("", "bob", SENT m) -> m == mId; _ -> False + (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota") + + alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1) + -- if delivery is blocked it won't go further + alice <# ("", "bob2", SENT 1) + connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do ("c1", _, Right (INV cReq)) <- h1 #: ("c1", name2, "NEW INV") @@ -336,6 +383,16 @@ connect (h1, name1) (h2, name2) = do h2 <# ("", name1, CON) h1 <# ("", name2, CON) +sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () +sendMessage (h1, name1) (h2, name2) msg = do + ("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND :" <> msg) + name2' `shouldBe` name2 + h1 <#= \case ("", n, SENT m) -> n == name2 && m == mId; _ -> False + ("", name1', Right (MSG MsgMeta {recipient = (msgId, _)} msg')) <- (h2 <#:) + name1' `shouldBe` name1 + msg' `shouldBe` msg + h2 #: ("m2", name1, "ACK " <> bshow msgId) =#> \case ("m2", n, OK) -> n == name1; _ -> False + -- connect' :: forall c. Transport c => c -> c -> IO (ByteString, ByteString) -- connect' h1 h2 = do -- ("c1", conn2, Right (INV cReq)) <- h1 #: ("c1", "", "NEW INV") From 584f230c4d72135d1ba4a1c599008602768d4297 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 6 Jan 2022 13:50:50 +0000 Subject: [PATCH 5/7] update versions for 0.5.2, changelog (#261) --- CHANGELOG.md | 4 ++++ package.yaml | 4 ++-- simplexmq.cabal | 12 ++++++------ stack.yaml | 4 +--- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29ae11e15..f455aeda6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.5.2 + +- Fix message delivery logic that blocked delivery of all server messages when server per-queue quota exceeded, making it concurrent per SMP queue, not per server. + # 0.5.1 - Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases. diff --git a/package.yaml b/package.yaml index 4f3e66ac7..f2e8e4292 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, @@ -37,7 +37,7 @@ dependencies: - cryptonite >= 0.27 && < 0.30 - direct-sqlite == 2.3.* - directory == 1.3.* - - file-embed == 0.0.14.* + - file-embed >= 0.0.14.0 && <= 0.0.15.0 - filepath == 1.4.* - http-types == 0.12.* - generic-random >= 1.3 && < 1.5 diff --git a/simplexmq.cabal b/simplexmq.cabal index c81835e81..06df3c162 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -4,10 +4,10 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: ff75c96b1b4f19821ceda01f32e8759e89ad661380f324f1ab9eec3d5c223425 +-- hash: f5e649e0c359792d9528dec6e6cf07af772a521d746e66b8cc2b86d8c604afb2 name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and @@ -79,7 +79,7 @@ library , cryptonite >=0.27 && <0.30 , direct-sqlite ==2.3.* , directory ==1.3.* - , file-embed ==0.0.14.* + , file-embed >=0.0.14.0 && <=0.0.15.0 , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* @@ -125,7 +125,7 @@ executable smp-agent , cryptonite >=0.27 && <0.30 , direct-sqlite ==2.3.* , directory ==1.3.* - , file-embed ==0.0.14.* + , file-embed >=0.0.14.0 && <=0.0.15.0 , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* @@ -173,7 +173,7 @@ executable smp-server , cryptostore ==0.2.* , direct-sqlite ==2.3.* , directory ==1.3.* - , file-embed ==0.0.14.* + , file-embed >=0.0.14.0 && <=0.0.15.0 , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* @@ -232,7 +232,7 @@ test-suite smp-server-test , cryptonite >=0.27 && <0.30 , direct-sqlite ==2.3.* , directory ==1.3.* - , file-embed ==0.0.14.* + , file-embed >=0.0.14.0 && <=0.0.15.0 , filepath ==1.4.* , generic-random >=1.3 && <1.5 , hspec ==2.7.* diff --git a/stack.yaml b/stack.yaml index 70267dd80..2070d9ffc 100644 --- a/stack.yaml +++ b/stack.yaml @@ -17,7 +17,7 @@ # # resolver: ./custom-snapshot.yaml # resolver: https://example.com/snapshots/2018-01-01.yaml -resolver: lts-18.0 +resolver: lts-18.21 # User packages to be built. # Various formats can be used as shown in the example below. @@ -36,9 +36,7 @@ packages: # extra-deps: - cryptostore-0.2.1.0@sha256:9896e2984f36a1c8790f057fd5ce3da4cbcaf8aa73eb2d9277916886978c5b19,3881 - - direct-sqlite-2.3.26@sha256:04e835402f1508abca383182023e4e2b9b86297b8533afbd4e57d1a5652e0c23,3718 - simple-logger-0.1.0@sha256:be8ede4bd251a9cac776533bae7fb643369ebd826eb948a9a18df1a8dd252ff8,1079 - - sqlite-simple-0.4.18.0@sha256:3ceea56375c0a3590c814e411a4eb86943f8d31b93b110ca159c90689b6b39e5,3002 # - network-run-0.2.4@sha256:7dbb06def522dab413bce4a46af476820bffdff2071974736b06f52f4ab57c96,885 # - git: https://github.com/commercialhaskell/stack.git # commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a From dd4ccce1bad2d7c1144b775ed1d7fcfbfc878dbf Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 6 Jan 2022 13:52:54 +0000 Subject: [PATCH 6/7] 0.5.2 From b415537ba593d30df89e9b6935fdd73e06e7e2a1 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 6 Jan 2022 16:41:39 +0000 Subject: [PATCH 7/7] simplify message delivery --- src/Simplex/Messaging/Agent.hs | 2 +- src/Simplex/Messaging/Agent/Store.hs | 2 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 7 ++----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index dcb0c29dc..093e585f0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -439,7 +439,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> notify $ MERR mId (INTERNAL $ show e) - Right (sq, rq_, (msgType, msgBody)) -> do + Right (rq_, (msgType, msgBody)) -> do withRetryInterval ri $ \loop -> do tryError (sendAgentMessage c sq msgBody) >>= \case Left e -> case e of diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 43e752353..3b50985df 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -61,7 +61,7 @@ class Monad m => MonadAgentStore s m where createRcvMsg :: s -> ConnId -> RcvMsgData -> m () updateSndIds :: s -> ConnId -> m (InternalId, InternalSndId, PrevSndMsgHash) createSndMsg :: s -> ConnId -> SndMsgData -> m () - getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody)) + getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody)) getPendingMsgs :: s -> ConnId -> m [InternalId] checkRcvMsg :: s -> ConnId -> InternalId -> m () deleteMsg :: s -> ConnId -> InternalId -> m () diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 33e092195..fc45571ef 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -439,10 +439,9 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto insertSndMsgDetails_ db connId sndMsgData updateHashSnd_ db connId sndMsgData - getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody)) + getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody)) getPendingMsgData st connId msgId = liftIOEither . withTransaction st $ \db -> runExceptT $ do - sq <- ExceptT $ sndQueue <$> getSndQueueByConnAlias_ db connId rq_ <- liftIO $ getRcvQueueByConnAlias_ db connId msgData <- ExceptT $ @@ -456,13 +455,11 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto WHERE m.conn_alias = ? AND m.internal_id = ? |] (connId, msgId) - pure (sq, rq_, msgData) + pure (rq_, msgData) where sndMsgData :: [(AMsgType, MsgBody)] -> Either StoreError (AMsgType, MsgBody) sndMsgData [msgData] = Right msgData sndMsgData _ = Left SEMsgNotFound - sndQueue :: Maybe SndQueue -> Either StoreError SndQueue - sndQueue = maybe (Left SEConnNotFound) Right getPendingMsgs :: SQLiteStore -> ConnId -> m [InternalId] getPendingMsgs st connId =