reconnect server and resubscribe connections after disconnection (#178)

This commit is contained in:
Evgeny Poberezkin
2021-08-09 08:49:49 +01:00
committed by GitHub
parent d9084522af
commit e045774caa
10 changed files with 124 additions and 58 deletions
+2 -1
View File
@@ -4,7 +4,7 @@ cabal-version: 1.12
--
-- see: https://github.com/sol/hpack
--
-- hash: 576ad28116836a490d6974cd6322169dd38b5df984c16fe296387427b5bef3d5
-- hash: 5169db4a4922766c79f08cbdb91d4c765520372273ab432569eba25a253a8dbb
name: simplexmq
version: 0.3.2
@@ -35,6 +35,7 @@ library
Simplex.Messaging.Agent.Client
Simplex.Messaging.Agent.Env.SQLite
Simplex.Messaging.Agent.Protocol
Simplex.Messaging.Agent.RetryInterval
Simplex.Messaging.Agent.Store
Simplex.Messaging.Agent.Store.SQLite
Simplex.Messaging.Agent.Store.SQLite.Migrations
-6
View File
@@ -189,12 +189,6 @@ logClient :: MonadUnliftIO m => AgentClient -> ByteString -> ATransmission a ->
logClient AgentClient {clientId} dir (corrId, connId, cmd) = do
logInfo . decodeUtf8 $ B.unwords [bshow clientId, dir, "A :", corrId, connId, B.takeWhile (/= ' ') $ serializeCommand cmd]
withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a
withAgentLock AgentClient {lock} =
E.bracket_
(void . atomically $ takeTMVar lock)
(atomically $ putTMVar lock ())
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
client c@AgentClient {rcvQ, subQ} = forever $ do
(corrId, connId, cmd) <- atomically $ readTBQueue rcvQ
+65 -38
View File
@@ -11,7 +11,7 @@ module Simplex.Messaging.Agent.Client
( AgentClient (..),
newAgentClient,
AgentMonad,
getSMPServerClient,
withAgentLock,
closeAgentClient,
newRcvQueue,
subscribeQueue,
@@ -35,7 +35,7 @@ module Simplex.Messaging.Agent.Client
)
where
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.Async (Async, async, uninterruptibleCancel)
import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple
import Control.Monad.Except
@@ -47,18 +47,19 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgBody, QueueId, SenderPublicKey)
import Simplex.Messaging.Util (bshow, liftEitherError, liftError)
import UnliftIO.Concurrent
import UnliftIO.Exception (IOException)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -68,9 +69,10 @@ data AgentClient = AgentClient
subQ :: TBQueue (ATransmission 'Agent),
msgQ :: TBQueue SMPServerTransmission,
smpClients :: TVar (Map SMPServer SMPClient),
subscrSrvrs :: TVar (Map SMPServer (Set ConnId)),
subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)),
subscrConns :: TVar (Map ConnId SMPServer),
activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress
reconnections :: TVar [Async ()],
clientId :: Int,
agentEnv :: Env,
smpSubscriber :: Async (),
@@ -87,9 +89,10 @@ newAgentClient agentEnv = do
subscrSrvrs <- newTVar M.empty
subscrConns <- newTVar M.empty
activations <- newTVar M.empty
reconnections <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
lock <- newTMVar ()
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, clientId, agentEnv, smpSubscriber = undefined, lock}
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock}
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
@@ -109,41 +112,74 @@ getSMPServerClient c@AgentClient {smpClients, msgQ} srv =
connectClient :: m SMPClient
connectClient = do
cfg <- asks $ smpCfg . config
liftEitherError smpClientError (getSMPClient srv cfg msgQ clientDisconnected)
u <- askUnliftIO
liftEitherError smpClientError (getSMPClient srv cfg msgQ $ clientDisconnected u)
`E.catch` internalError
where
internalError :: IOException -> m SMPClient
internalError = throwError . INTERNAL . show
clientDisconnected :: IO ()
clientDisconnected = do
removeSubs >>= mapM_ (mapM_ notifySub)
clientDisconnected :: UnliftIO m -> IO ()
clientDisconnected u = do
removeClientSubs >>= (`forM_` serverDown u)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
removeSubs :: IO (Maybe (Set ConnId))
removeSubs = atomically $ do
removeClientSubs :: IO (Maybe (Map ConnId RcvQueue))
removeClientSubs = atomically $ do
modifyTVar smpClients $ M.delete srv
cs <- M.lookup srv <$> readTVar (subscrSrvrs c)
modifyTVar (subscrSrvrs c) $ M.delete srv
modifyTVar (subscrConns c) $ maybe id deleteKeys cs
modifyTVar (subscrConns c) $ maybe id (deleteKeys . M.keysSet) cs
return cs
where
deleteKeys :: Ord k => Set k -> Map k a -> Map k a
deleteKeys ks m = S.foldr' M.delete m ks
notifySub :: ConnId -> IO ()
notifySub connId = atomically $ writeTBQueue (subQ c) ("", connId, END)
serverDown :: UnliftIO m -> Map ConnId RcvQueue -> IO ()
serverDown u cs = unless (M.null cs) $ do
mapM_ (notifySub DOWN) $ M.keysSet cs
a <- async . unliftIO u $ tryReconnectClient cs
atomically $ modifyTVar (reconnections c) (a :)
tryReconnectClient :: Map ConnId RcvQueue -> m ()
tryReconnectClient cs = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
reconnectClient cs `catchError` const loop
reconnectClient :: Map ConnId RcvQueue -> m ()
reconnectClient cs = do
withAgentLock c . withSMP c srv $ \smp -> do
subs <- readTVarIO $ subscrConns c
forM_ (M.toList cs) $ \(connId, rq@RcvQueue {rcvPrivateKey, rcvId}) ->
when (isNothing $ M.lookup connId subs) $ do
subscribeSMPQueue smp rcvPrivateKey rcvId
`catchError` \case
SMPServerError e -> liftIO $ notifySub (ERR $ SMP e) connId
e -> throwError e
addSubscription c rq connId
liftIO $ notifySub UP connId
notifySub :: ACommand 'Agent -> ConnId -> IO ()
notifySub cmd connId = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
closeAgentClient :: MonadUnliftIO m => AgentClient -> m ()
closeAgentClient c = liftIO $ do
closeSMPServerClients c
cancelActivations c
cancelActions $ activations c
cancelActions $ reconnections c
closeSMPServerClients :: AgentClient -> IO ()
closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient
cancelActivations :: AgentClient -> IO ()
cancelActivations c = readTVarIO (activations c) >>= mapM_ uninterruptibleCancel
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel
withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a
withAgentLock AgentClient {lock} =
E.bracket_
(void . atomically $ takeTMVar lock)
(atomically $ putTMVar lock ())
withSMP_ :: forall a m. AgentMonad m => AgentClient -> SMPServer -> (SMPClient -> m a) -> m a
withSMP_ c srv action =
@@ -207,13 +243,13 @@ subscribeQueue c rq@RcvQueue {server, rcvPrivateKey, rcvId} connId = do
addSubscription c rq connId
addSubscription :: MonadUnliftIO m => AgentClient -> RcvQueue -> ConnId -> m ()
addSubscription c RcvQueue {server} connId = atomically $ do
addSubscription c rq@RcvQueue {server} connId = atomically $ do
modifyTVar (subscrConns c) $ M.insert connId server
modifyTVar (subscrSrvrs c) $ M.alter (Just . addSub) server
where
addSub :: Maybe (Set ConnId) -> Set ConnId
addSub (Just cs) = S.insert connId cs
addSub _ = S.singleton connId
addSub :: Maybe (Map ConnId RcvQueue) -> Map ConnId RcvQueue
addSub (Just cs) = M.insert connId rq cs
addSub _ = M.singleton connId rq
removeSubscription :: AgentMonad m => AgentClient -> ConnId -> m ()
removeSubscription AgentClient {subscrConns, subscrSrvrs} connId = atomically $ do
@@ -223,10 +259,10 @@ removeSubscription AgentClient {subscrConns, subscrSrvrs} connId = atomically $
(modifyTVar subscrSrvrs . M.alter (>>= delSub))
(M.lookup connId cs)
where
delSub :: Set ConnId -> Maybe (Set ConnId)
delSub :: Map ConnId RcvQueue -> Maybe (Map ConnId RcvQueue)
delSub cs =
let cs' = S.delete connId cs
in if S.null cs' then Nothing else Just cs'
let cs' = M.delete connId cs
in if M.null cs' then Nothing else Just cs'
addActivation :: MonadUnliftIO m => AgentClient -> ConnId -> Async () -> m ()
addActivation c connId a = atomically . modifyTVar (activations c) $ M.insert connId a
@@ -257,10 +293,13 @@ sendConfirmation c sq@SndQueue {server, sndId} senderKey cInfo =
mkConfirmation smp = encryptAndSign smp sq . serializeSMPMessage $ SMPConfirmation senderKey cInfo
sendHello :: forall m. AgentMonad m => AgentClient -> SndQueue -> VerificationKey -> RetryInterval -> m ()
sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey RetryInterval {initialInterval, increaseAfter, maxInterval} =
sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey ri =
withLogSMP_ c server sndId "SEND <HELLO> (retrying)" $ \smp -> do
msg <- mkHello smp $ AckMode On
liftSMP $ send 0 initialInterval msg smp
liftSMP . withRetryInterval ri $ \loop ->
sendSMPMessage smp (Just sndPrivateKey) sndId msg `catchE` \case
SMPServerError AUTH -> loop
e -> throwE e
where
mkHello :: SMPClient -> AckMode -> m ByteString
mkHello smp ackMode = do
@@ -273,18 +312,6 @@ sendHello c sq@SndQueue {server, sndId, sndPrivateKey} verifyKey RetryInterval {
agentMessage = HELLO verifyKey ackMode
}
send :: Int -> Int -> ByteString -> SMPClient -> ExceptT SMPClientError IO ()
send elapsedTime delay msg smp =
sendSMPMessage smp (Just sndPrivateKey) sndId msg `catchE` \case
SMPServerError AUTH -> do
threadDelay delay
let newDelay =
if elapsedTime < increaseAfter || delay == maxInterval
then delay
else min (delay * 3 `div` 2) maxInterval
send (elapsedTime + delay) newDelay msg smp
e -> throwE e
secureQueue :: AgentMonad m => AgentClient -> RcvQueue -> SenderPublicKey -> m ()
secureQueue c RcvQueue {server, rcvId, rcvPrivateKey} senderKey =
withLogSMP c server rcvId "KEY <key>" $ \smp ->
+9 -7
View File
@@ -12,6 +12,7 @@ import Data.List.NonEmpty (NonEmpty)
import Network.Socket
import Numeric.Natural
import Simplex.Messaging.Agent.Protocol (SMPServer)
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client
@@ -27,18 +28,13 @@ data AgentConfig = AgentConfig
dbFile :: FilePath,
dbPoolSize :: Int,
smpCfg :: SMPClientConfig,
retryInterval :: RetryInterval
retryInterval :: RetryInterval,
reconnectInterval :: RetryInterval
}
minute :: Int
minute = 60_000_000
data RetryInterval = RetryInterval
{ initialInterval :: Int,
increaseAfter :: Int,
maxInterval :: Int
}
defaultAgentConfig :: AgentConfig
defaultAgentConfig =
AgentConfig
@@ -55,6 +51,12 @@ defaultAgentConfig =
{ initialInterval = 1_000_000,
increaseAfter = minute,
maxInterval = 10 * minute
},
reconnectInterval =
RetryInterval
{ initialInterval = 1_000_000,
increaseAfter = 10_000_000,
maxInterval = 10_000_000
}
}
+10
View File
@@ -162,9 +162,12 @@ data ACommand (p :: AParty) where
CON :: ACommand Agent -- notification that connection is established
SUB :: ACommand Client
END :: ACommand Agent
DOWN :: ACommand Agent
UP :: ACommand Agent
-- QST :: QueueDirection -> ACommand Client
-- STAT :: QueueDirection -> Maybe QueueStatus -> Maybe SubMode -> ACommand Agent
SEND :: MsgBody -> ACommand Client
MID :: AgentMsgId -> ACommand Agent
SENT :: AgentMsgId -> ACommand Agent
MSG :: MsgMeta -> MsgBody -> ACommand Agent
-- ACK :: AgentMsgId -> ACommand Client
@@ -455,7 +458,10 @@ commandP =
<|> "INFO " *> infoCmd
<|> "SUB" $> ACmd SClient SUB
<|> "END" $> ACmd SAgent END
<|> "DOWN" $> ACmd SAgent DOWN
<|> "UP" $> ACmd SAgent UP
<|> "SEND " *> sendCmd
<|> "MID " *> msgIdResp
<|> "SENT " *> sentResp
<|> "MSG " *> message
<|> "OFF" $> ACmd SClient OFF
@@ -470,6 +476,7 @@ commandP =
acptCmd = ACmd SClient <$> (ACPT <$> A.takeTill (== ' ') <* A.space <*> A.takeByteString)
infoCmd = ACmd SAgent . INFO <$> A.takeByteString
sendCmd = ACmd SClient . SEND <$> A.takeByteString
msgIdResp = ACmd SAgent . MID <$> A.decimal
sentResp = ACmd SAgent . SENT <$> A.decimal
message = ACmd SAgent <$> (MSG <$> msgMetaP <* A.space <*> A.takeByteString)
msgMetaP = do
@@ -505,7 +512,10 @@ serializeCommand = \case
INFO cInfo -> "INFO " <> serializeBinary cInfo
SUB -> "SUB"
END -> "END"
DOWN -> "DOWN"
UP -> "UP"
SEND msgBody -> "SEND " <> serializeBinary msgBody
MID mId -> "MID " <> bshow mId
SENT mId -> "SENT " <> bshow mId
MSG msgMeta msgBody ->
"MSG " <> serializeMsgMeta msgMeta <> " " <> serializeBinary msgBody
@@ -0,0 +1,28 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Agent.RetryInterval where
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO, liftIO)
data RetryInterval = RetryInterval
{ initialInterval :: Int,
increaseAfter :: Int,
maxInterval :: Int
}
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (m () -> m ()) -> m ()
withRetryInterval RetryInterval {initialInterval, increaseAfter, maxInterval} action =
callAction 0 initialInterval
where
callAction :: Int -> Int -> m ()
callAction elapsedTime delay = action loop
where
loop = do
let newDelay =
if elapsedTime < increaseAfter || delay == maxInterval
then delay
else min (delay * 3 `div` 2) maxInterval
liftIO $ threadDelay delay
callAction (elapsedTime + delay) newDelay
+3 -3
View File
@@ -264,9 +264,9 @@ data SndMsg = SndMsg
newtype InternalSndId = InternalSndId {unSndId :: Int64} deriving (Eq, Show)
data SndMsgStatus
= Created
| Sent
| Delivered
= SndMsgCreated
| SndMsgSent
| SndMsgDelivered
deriving (Eq, Show)
type SentTs = UTCTime
+1 -1
View File
@@ -795,7 +795,7 @@ insertSndMsgDetails_ dbConn connId SndMsgData {..} =
[ ":conn_alias" := connId,
":internal_snd_id" := internalSndId,
":internal_id" := internalId,
":snd_status" := Created,
":snd_status" := SndMsgCreated,
":internal_hash" := internalHash
]
+5 -2
View File
@@ -16,6 +16,7 @@ import Control.Concurrent
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import SMPAgentClient
import SMPClient (withSmpServer)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Protocol (ErrorType (..), MsgBody)
import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..))
@@ -150,11 +151,13 @@ testSubscription _ alice1 alice2 bob = do
alice1 #:# "nothing else should be delivered to alice1"
testSubscrNotification :: Transport c => TProxy c -> (ThreadId, ThreadId) -> c -> IO ()
testSubscrNotification _ (server, _) client = do
testSubscrNotification t (server, _) client = do
client #: ("1", "conn1", "NEW") =#> \case ("1", "conn1", INV {}) -> True; _ -> False
client #:# "nothing should be delivered to client before the server is killed"
killThread server
client <# ("", "conn1", END)
client <# ("", "conn1", DOWN)
withSmpServer (ATransport t) $
client <# ("", "conn1", ERR (SMP AUTH)) -- this new server does not have the queue
connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO ()
connect (h1, name1) (h2, name2) = do
+1
View File
@@ -24,6 +24,7 @@ import SMPClient
import Simplex.Messaging.Agent (runSMPAgentBlocking)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client (SMPClientConfig (..), smpDefaultConfig)
import Simplex.Messaging.Transport
import Test.Hspec