diff --git a/simplexmq.cabal b/simplexmq.cabal index 0c39fd79b..c34c18ef6 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -4,7 +4,7 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 576ad28116836a490d6974cd6322169dd38b5df984c16fe296387427b5bef3d5 +-- hash: 5169db4a4922766c79f08cbdb91d4c765520372273ab432569eba25a253a8dbb name: simplexmq version: 0.3.2 @@ -35,6 +35,7 @@ library Simplex.Messaging.Agent.Client Simplex.Messaging.Agent.Env.SQLite Simplex.Messaging.Agent.Protocol + Simplex.Messaging.Agent.RetryInterval Simplex.Messaging.Agent.Store Simplex.Messaging.Agent.Store.SQLite Simplex.Messaging.Agent.Store.SQLite.Migrations diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e599d8074..6d3747497 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -189,12 +189,6 @@ logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a -> logClient AgentClient {clientId} dir (corrId, connId, cmd) = do logInfo . decodeUtf8 $ B.unwords [bshow clientId, dir, "A :", corrId, connId, B.takeWhile (/= ' ') $ serializeCommand cmd] -withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a -withAgentLock AgentClient {lock} = - E.bracket_ - (void . atomically $ takeTMVar lock) - (atomically $ putTMVar lock ()) - client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () client c@AgentClient {rcvQ, subQ} = forever $ do (corrId, connId, cmd) <- atomically $ readTBQueue rcvQ diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 3f72ebc21..27bf21a41 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -11,7 +11,7 @@ module Simplex.Messaging.Agent.Client ( AgentClient (..), newAgentClient, AgentMonad, - getSMPServerClient, + withAgentLock, closeAgentClient, newRcvQueue, subscribeQueue, @@ -35,7 +35,7 @@ module Simplex.Messaging.Agent.Client ) where -import Control.Concurrent.Async (Async, uninterruptibleCancel) +import Control.Concurrent.Async (Async, async, uninterruptibleCancel) import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad.Except @@ -47,18 +47,19 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.Maybe (isNothing) import Data.Set (Set) import qualified Data.Set as S import Data.Text.Encoding import Data.Time.Clock import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol +import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgBody, QueueId, SenderPublicKey) import Simplex.Messaging.Util (bshow, liftEitherError, liftError) -import UnliftIO.Concurrent import UnliftIO.Exception (IOException) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -68,9 +69,10 @@ data AgentClient = AgentClient subQ :: TBQueue (ATransmission 'Agent), msgQ :: TBQueue SMPServerTransmission, smpClients :: TVar (Map SMPServer SMPClient), - subscrSrvrs :: TVar (Map SMPServer (Set ConnId)), + subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)), subscrConns :: TVar (Map ConnId SMPServer), activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress + reconnections :: TVar [Async ()], clientId :: Int, agentEnv :: Env, smpSubscriber :: Async (), @@ -87,9 +89,10 @@ newAgentClient agentEnv = do subscrSrvrs <- newTVar M.empty subscrConns <- newTVar M.empty activations <- newTVar M.empty + reconnections <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) lock <- newTMVar () - return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, clientId, agentEnv, smpSubscriber = undefined, lock} + return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -109,41 +112,74 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv = connectClient :: m SMPClient connectClient = do cfg <- asks $ smpCfg . config - liftEitherError smpClientError (getSMPClient srv cfg msgQ clientDisconnected) + u <- askUnliftIO + liftEitherError smpClientError (getSMPClient srv cfg msgQ $ clientDisconnected u) `E.catch` internalError where internalError :: IOException -> m SMPClient internalError = throwError . INTERNAL . show - clientDisconnected :: IO () - clientDisconnected = do - removeSubs >>= mapM_ (mapM_ notifySub) + clientDisconnected :: UnliftIO m -> IO () + clientDisconnected u = do + removeClientSubs >>= (`forM_` serverDown u) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv - removeSubs :: IO (Maybe (Set ConnId)) - removeSubs = atomically $ do + removeClientSubs :: IO (Maybe (Map ConnId RcvQueue)) + removeClientSubs = atomically $ do modifyTVar smpClients $ M.delete srv cs <- M.lookup srv <$> readTVar (subscrSrvrs c) modifyTVar (subscrSrvrs c) $ M.delete srv - modifyTVar (subscrConns c) $ maybe id deleteKeys cs + modifyTVar (subscrConns c) $ maybe id (deleteKeys . M.keysSet) cs return cs where deleteKeys :: Ord k => Set k -> Map k a -> Map k a deleteKeys ks m = S.foldr' M.delete m ks - notifySub :: ConnId -> IO () - notifySub connId = atomically $ writeTBQueue (subQ c) ("", connId, END) + serverDown :: UnliftIO m -> Map ConnId RcvQueue -> IO () + serverDown u cs = unless (M.null cs) $ do + mapM_ (notifySub DOWN) $ M.keysSet cs + a <- async . unliftIO u $ tryReconnectClient cs + atomically $ modifyTVar (reconnections c) (a :) + + tryReconnectClient :: Map ConnId RcvQueue -> m () + tryReconnectClient cs = do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + reconnectClient cs `catchError` const loop + + reconnectClient :: Map ConnId RcvQueue -> m () + reconnectClient cs = do + withAgentLock c . withSMP c srv $ \smp -> do + subs <- readTVarIO $ subscrConns c + forM_ (M.toList cs) $ \(connId, rq@RcvQueue {rcvPrivateKey, rcvId}) -> + when (isNothing $ M.lookup connId subs) $ do + subscribeSMPQueue smp rcvPrivateKey rcvId + `catchError` \case + SMPServerError e -> liftIO $ notifySub (ERR $ SMP e) connId + e -> throwError e + addSubscription c rq connId + liftIO $ notifySub UP connId + + notifySub :: ACommand 'Agent -> ConnId -> IO () + notifySub cmd connId = atomically $ writeTBQueue (subQ c) ("", connId, cmd) closeAgentClient :: MonadUnliftIO m => AgentClient -> m () closeAgentClient c = liftIO $ do closeSMPServerClients c - cancelActivations c + cancelActions $ activations c + cancelActions $ reconnections c closeSMPServerClients :: AgentClient -> IO () closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient -cancelActivations :: AgentClient -> IO () -cancelActivations c = readTVarIO (activations c) >>= mapM_ uninterruptibleCancel +cancelActions :: Foldable f => TVar (f (Async ())) -> IO () +cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel + +withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a +withAgentLock AgentClient {lock} = + E.bracket_ + (void . atomically $ takeTMVar lock) + (atomically $ putTMVar lock ()) withSMP_ :: forall a m. AgentMonad m => AgentClient -> SMPServer -> (SMPClient -> m a) -> m a withSMP_ c srv action = @@ -207,13 +243,13 @@ subscribeQueue c rq@RcvQueue {server, rcvPrivateKey, rcvId} connId = do addSubscription c rq connId addSubscription :: MonadUnliftIO m => AgentClient -> RcvQueue -> ConnId -> m () -addSubscription c RcvQueue {server} connId = atomically $ do +addSubscription c rq@RcvQueue {server} connId = atomically $ do modifyTVar (subscrConns c) $ M.insert connId server modifyTVar (subscrSrvrs c) $ M.alter (Just . addSub) server where - addSub :: Maybe (Set ConnId) -> Set ConnId - addSub (Just cs) = S.insert connId cs - addSub _ = S.singleton connId + addSub :: Maybe (Map ConnId RcvQueue) -> Map ConnId RcvQueue + addSub (Just cs) = M.insert connId rq cs + addSub _ = M.singleton connId rq removeSubscription :: AgentMonad m => AgentClient -> ConnId -> m () removeSubscription AgentClient {subscrConns, subscrSrvrs} connId = atomically $ do @@ -223,10 +259,10 @@ removeSubscription AgentClient {subscrConns, subscrSrvrs} connId = atomically $ (modifyTVar subscrSrvrs . M.alter (>>= delSub)) (M.lookup connId cs) where - delSub :: Set ConnId -> Maybe (Set ConnId) + delSub :: Map ConnId RcvQueue -> Maybe (Map ConnId RcvQueue) delSub cs = - let cs' = S.delete connId cs - in if S.null cs' then Nothing else Just cs' + let cs' = M.delete connId cs + in if M.null cs' then Nothing else Just cs' addActivation :: MonadUnliftIO m => AgentClient -> ConnId -> Async () -> m () addActivation c connId a = atomically . modifyTVar (activations c) $ M.insert connId a @@ -257,10 +293,13 @@ sendConfirmation c sq@SndQueue {server, sndId} senderKey cInfo = mkConfirmation smp = encryptAndSign smp sq . serializeSMPMessage $ SMPConfirmation senderKey cInfo sendHello :: forall m. AgentMonad m => AgentClient -> SndQueue -> VerificationKey -> RetryInterval -> m () -sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey RetryInterval {initialInterval, increaseAfter, maxInterval} = +sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey ri = withLogSMP_ c server sndId "SEND (retrying)" $ \smp -> do msg <- mkHello smp $ AckMode On - liftSMP $ send 0 initialInterval msg smp + liftSMP . withRetryInterval ri $ \loop -> + sendSMPMessage smp (Just sndPrivateKey) sndId msg `catchE` \case + SMPServerError AUTH -> loop + e -> throwE e where mkHello :: SMPClient -> AckMode -> m ByteString mkHello smp ackMode = do @@ -273,18 +312,6 @@ sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey RetryInterval { agentMessage = HELLO verifyKey ackMode } - send :: Int -> Int -> ByteString -> SMPClient -> ExceptT SMPClientError IO () - send elapsedTime delay msg smp = - sendSMPMessage smp (Just sndPrivateKey) sndId msg `catchE` \case - SMPServerError AUTH -> do - threadDelay delay - let newDelay = - if elapsedTime < increaseAfter || delay == maxInterval - then delay - else min (delay * 3 `div` 2) maxInterval - send (elapsedTime + delay) newDelay msg smp - e -> throwE e - secureQueue :: AgentMonad m => AgentClient -> RcvQueue -> SenderPublicKey -> m () secureQueue c RcvQueue {server, rcvId, rcvPrivateKey} senderKey = withLogSMP c server rcvId "KEY " $ \smp -> diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 85877aa0d..6a063d4dd 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -12,6 +12,7 @@ import Data.List.NonEmpty (NonEmpty) import Network.Socket import Numeric.Natural import Simplex.Messaging.Agent.Protocol (SMPServer) +import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client @@ -27,18 +28,13 @@ data AgentConfig = AgentConfig dbFile :: FilePath, dbPoolSize :: Int, smpCfg :: SMPClientConfig, - retryInterval :: RetryInterval + retryInterval :: RetryInterval, + reconnectInterval :: RetryInterval } minute :: Int minute = 60_000_000 -data RetryInterval = RetryInterval - { initialInterval :: Int, - increaseAfter :: Int, - maxInterval :: Int - } - defaultAgentConfig :: AgentConfig defaultAgentConfig = AgentConfig @@ -55,6 +51,12 @@ defaultAgentConfig = { initialInterval = 1_000_000, increaseAfter = minute, maxInterval = 10 * minute + }, + reconnectInterval = + RetryInterval + { initialInterval = 1_000_000, + increaseAfter = 10_000_000, + maxInterval = 10_000_000 } } diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index b8f29e856..5b0dbc485 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -162,9 +162,12 @@ data ACommand (p :: AParty) where CON :: ACommand Agent -- notification that connection is established SUB :: ACommand Client END :: ACommand Agent + DOWN :: ACommand Agent + UP :: ACommand Agent -- QST :: QueueDirection -> ACommand Client -- STAT :: QueueDirection -> Maybe QueueStatus -> Maybe SubMode -> ACommand Agent SEND :: MsgBody -> ACommand Client + MID :: AgentMsgId -> ACommand Agent SENT :: AgentMsgId -> ACommand Agent MSG :: MsgMeta -> MsgBody -> ACommand Agent -- ACK :: AgentMsgId -> ACommand Client @@ -455,7 +458,10 @@ commandP = <|> "INFO " *> infoCmd <|> "SUB" $> ACmd SClient SUB <|> "END" $> ACmd SAgent END + <|> "DOWN" $> ACmd SAgent DOWN + <|> "UP" $> ACmd SAgent UP <|> "SEND " *> sendCmd + <|> "MID " *> msgIdResp <|> "SENT " *> sentResp <|> "MSG " *> message <|> "OFF" $> ACmd SClient OFF @@ -470,6 +476,7 @@ commandP = acptCmd = ACmd SClient <$> (ACPT <$> A.takeTill (== ' ') <* A.space <*> A.takeByteString) infoCmd = ACmd SAgent . INFO <$> A.takeByteString sendCmd = ACmd SClient . SEND <$> A.takeByteString + msgIdResp = ACmd SAgent . MID <$> A.decimal sentResp = ACmd SAgent . SENT <$> A.decimal message = ACmd SAgent <$> (MSG <$> msgMetaP <* A.space <*> A.takeByteString) msgMetaP = do @@ -505,7 +512,10 @@ serializeCommand = \case INFO cInfo -> "INFO " <> serializeBinary cInfo SUB -> "SUB" END -> "END" + DOWN -> "DOWN" + UP -> "UP" SEND msgBody -> "SEND " <> serializeBinary msgBody + MID mId -> "MID " <> bshow mId SENT mId -> "SENT " <> bshow mId MSG msgMeta msgBody -> "MSG " <> serializeMsgMeta msgMeta <> " " <> serializeBinary msgBody diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs new file mode 100644 index 000000000..048b9e09c --- /dev/null +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -0,0 +1,28 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Simplex.Messaging.Agent.RetryInterval where + +import Control.Concurrent (threadDelay) +import Control.Monad.IO.Class (MonadIO, liftIO) + +data RetryInterval = RetryInterval + { initialInterval :: Int, + increaseAfter :: Int, + maxInterval :: Int + } + +withRetryInterval :: forall m. MonadIO m => RetryInterval -> (m () -> m ()) -> m () +withRetryInterval RetryInterval {initialInterval, increaseAfter, maxInterval} action = + callAction 0 initialInterval + where + callAction :: Int -> Int -> m () + callAction elapsedTime delay = action loop + where + loop = do + let newDelay = + if elapsedTime < increaseAfter || delay == maxInterval + then delay + else min (delay * 3 `div` 2) maxInterval + liftIO $ threadDelay delay + callAction (elapsedTime + delay) newDelay diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 72f157397..c7fb05b40 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -264,9 +264,9 @@ data SndMsg = SndMsg newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show) data SndMsgStatus - = Created - | Sent - | Delivered + = SndMsgCreated + | SndMsgSent + | SndMsgDelivered deriving (Eq, Show) type SentTs = UTCTime diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 714501a0c..52d3a84ae 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -795,7 +795,7 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} = [ ":conn_alias" := connId, ":internal_snd_id" := internalSndId, ":internal_id" := internalId, - ":snd_status" := Created, + ":snd_status" := SndMsgCreated, ":internal_hash" := internalHash ] diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 59b4ceb49..aa579a175 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -16,6 +16,7 @@ import Control.Concurrent import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import SMPAgentClient +import SMPClient (withSmpServer) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..)) @@ -150,11 +151,13 @@ testSubscription _ alice1 alice2 bob = do alice1 #:# "nothing else should be delivered to alice1" testSubscrNotification :: Transport c => TProxy c -> (ThreadId, ThreadId) -> c -> IO () -testSubscrNotification _ (server, _) client = do +testSubscrNotification t (server, _) client = do client #: ("1", "conn1", "NEW") =#> \case ("1", "conn1", INV {}) -> True; _ -> False client #:# "nothing should be delivered to client before the server is killed" killThread server - client <# ("", "conn1", END) + client <# ("", "conn1", DOWN) + withSmpServer (ATransport t) $ + client <# ("", "conn1", ERR (SMP AUTH)) -- this new server does not have the queue connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index af3093ca0..af1d81b7e 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -24,6 +24,7 @@ import SMPClient import Simplex.Messaging.Agent (runSMPAgentBlocking) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol +import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Client (SMPClientConfig (..), smpDefaultConfig) import Simplex.Messaging.Transport import Test.Hspec