From 5894f9192715632f9bd50c481f3660e949224422 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 30 Aug 2022 08:26:42 +0100 Subject: [PATCH 1/4] fix connections passed to ntf supervisor to include pending, not only active (#506) * fix connections passed to ntf supervisor to include pending, not only active * fix * fix 2 --- src/Simplex/Messaging/Agent/Client.hs | 38 ++++++++++++++++----------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index ff0d5cb31..329ebbb34 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -88,6 +88,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (listToMaybe) import Data.Set (Set) +import qualified Data.Set as S import Data.Text.Encoding import Data.Tuple (swap) import Data.Word (Word16) @@ -156,7 +157,8 @@ data AgentClient = AgentClient useNetworkConfig :: TVar NetworkConfig, subscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue), pendingSubscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue), - subscrConns :: TMap ConnId SMPServer, + subscrConns :: TVar (Set ConnId), + activeSubscrConns :: TMap ConnId SMPServer, connMsgsQueued :: TMap ConnId Bool, smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId), smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()), @@ -207,7 +209,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do useNetworkConfig <- newTVar netCfg subscrSrvrs <- TM.empty pendingSubscrSrvrs <- TM.empty - subscrConns <- TM.empty + subscrConns <- newTVar S.empty + activeSubscrConns <- TM.empty connMsgsQueued <- TM.empty smpQueueMsgQueues <- TM.empty smpQueueMsgDeliveries <- TM.empty @@ -222,7 +225,7 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do asyncClients <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') lock <- newTMVar () - return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock} + return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrSrvrs, pendingSubscrSrvrs, subscrConns, activeSubscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock} agentDbPath :: AgentClient -> FilePath agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath @@ -265,7 +268,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do where updateSubs cVar = do cs <- readTVar cVar - modifyTVar' (subscrConns c) (`M.withoutKeys` M.keysSet cs) + modifyTVar' (activeSubscrConns c) (`M.withoutKeys` M.keysSet cs) addPendingSubs cVar cs pure cs @@ -407,12 +410,13 @@ closeAgentClient c = liftIO $ do clear subscrSrvrs clear pendingSubscrSrvrs clear subscrConns + clear activeSubscrConns clear connMsgsQueued clear smpQueueMsgQueues clear getMsgLocks where - clear :: (AgentClient -> TMap k a) -> IO () - clear sel = atomically $ writeTVar (sel c) M.empty + clear :: Monoid m => (AgentClient -> TVar m) -> IO () + clear sel = atomically $ writeTVar (sel c) mempty closeProtocolServerClients :: AgentClient -> (AgentClient -> TMap (ProtoServer msg) (ClientVar msg)) -> IO () closeProtocolServerClients c clientsSel = @@ -508,7 +512,9 @@ newRcvQueue_ a c srv vRange = do subscribeQueue :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m () subscribeQueue c rq@RcvQueue {server, rcvPrivateKey, rcvId} connId = do whenM (atomically . TM.member (server, rcvId) $ getMsgLocks c) . throwError $ CMD PROHIBITED - atomically $ addPendingSubscription c rq connId + atomically $ do + modifyTVar (subscrConns c) $ S.insert connId + addPendingSubscription c rq connId withLogClient c server rcvId "SUB" $ \smp -> liftIO (runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq connId) >>= either throwError pure @@ -538,7 +544,9 @@ temporaryAgentError = \case subscribeQueues :: AgentMonad m => AgentClient -> SMPServer -> Map ConnId RcvQueue -> m (Maybe SMPClient, Map ConnId (Either AgentErrorType ())) subscribeQueues c srv qs = do (errs, qs_) <- partitionEithers <$> mapM checkQueue (M.assocs qs) - forM_ qs_ $ atomically . uncurry (addPendingSubscription c) . swap + forM_ qs_ $ \q -> atomically $ do + modifyTVar (subscrConns c) . S.insert $ fst q + uncurry (addPendingSubscription c) $ swap q case L.nonEmpty qs_ of Just qs' -> do smp_ <- tryError (getSMPServerClient c srv) @@ -560,12 +568,13 @@ subscribeQueues c srv qs = do addSubscription :: MonadIO m => AgentClient -> RcvQueue -> ConnId -> m () addSubscription c rq@RcvQueue {server} connId = atomically $ do - TM.insert connId server $ subscrConns c + TM.insert connId server $ activeSubscrConns c + modifyTVar (subscrConns c) $ S.insert connId addSubs_ (subscrSrvrs c) rq connId removePendingSubscription c server connId hasActiveSubscription :: AgentClient -> ConnId -> STM Bool -hasActiveSubscription c connId = TM.member connId (subscrConns c) +hasActiveSubscription c connId = TM.member connId (activeSubscrConns c) addPendingSubscription :: AgentClient -> RcvQueue -> ConnId -> STM () addPendingSubscription = addSubs_ . pendingSubscrSrvrs @@ -577,8 +586,9 @@ addSubs_ ss rq@RcvQueue {server} connId = _ -> TM.singleton connId rq >>= \m -> TM.insert server m ss removeSubscription :: AgentClient -> ConnId -> STM () -removeSubscription c@AgentClient {subscrConns} connId = do - server_ <- TM.lookupDelete connId subscrConns +removeSubscription c connId = do + modifyTVar (subscrConns c) $ S.delete connId + server_ <- TM.lookupDelete connId $ activeSubscrConns c mapM_ (\server -> removeSubs_ (subscrSrvrs c) server connId) server_ removePendingSubscription :: AgentClient -> SMPServer -> ConnId -> STM () @@ -589,9 +599,7 @@ removeSubs_ ss server connId = TM.lookup server ss >>= mapM_ (TM.delete connId) getSubscriptions :: AgentClient -> STM (Set ConnId) -getSubscriptions AgentClient {subscrConns} = do - m <- readTVar subscrConns - pure $ M.keysSet m +getSubscriptions = readTVar . subscrConns logServer :: MonadIO m => ByteString -> AgentClient -> ProtocolServer s -> QueueId -> ByteString -> m () logServer dir AgentClient {clientId} srv qId cmdStr = From f2c1455a2755e1275983dc154321fc0a5c0d7b17 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 30 Aug 2022 15:45:15 +0100 Subject: [PATCH 2/4] fix network-transport at 0.5.4 --- package.yaml | 2 +- simplexmq.cabal | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/package.yaml b/package.yaml index ac686a874..f6999668c 100644 --- a/package.yaml +++ b/package.yaml @@ -49,7 +49,7 @@ dependencies: - memory == 0.15.* - mtl == 2.2.* - network >= 3.1.2.7 && < 3.2 - - network-transport == 0.5.* + - network-transport == 0.5.4 - optparse-applicative >= 0.15 && < 0.17 - QuickCheck == 2.14.* - process == 1.6.* diff --git a/simplexmq.cabal b/simplexmq.cabal index c185f10cc..be71cadb4 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -125,7 +125,7 @@ library , memory ==0.15.* , mtl ==2.2.* , network >=3.1.2.7 && <3.2 - , network-transport ==0.5.* + , network-transport ==0.5.4 , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , random >=1.1 && <1.3 @@ -186,7 +186,7 @@ executable ntf-server , memory ==0.15.* , mtl ==2.2.* , network >=3.1.2.7 && <3.2 - , network-transport ==0.5.* + , network-transport ==0.5.4 , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , random >=1.1 && <1.3 @@ -248,7 +248,7 @@ executable smp-agent , memory ==0.15.* , mtl ==2.2.* , network >=3.1.2.7 && <3.2 - , network-transport ==0.5.* + , network-transport ==0.5.4 , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , random >=1.1 && <1.3 @@ -310,7 +310,7 @@ executable smp-server , memory ==0.15.* , mtl ==2.2.* , network >=3.1.2.7 && <3.2 - , network-transport ==0.5.* + , network-transport ==0.5.4 , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , random >=1.1 && <1.3 @@ -391,7 +391,7 @@ test-suite smp-server-test , memory ==0.15.* , mtl ==2.2.* , network >=3.1.2.7 && <3.2 - , network-transport ==0.5.* + , network-transport ==0.5.4 , optparse-applicative >=0.15 && <0.17 , process ==1.6.* , random >=1.1 && <1.3 From b215bd954dd3f30481db94d0dc398e5c206ab6c3 Mon Sep 17 00:00:00 2001 From: JRoberts <8711996+jr-simplex@users.noreply.github.com> Date: Wed, 31 Aug 2022 21:12:44 +0400 Subject: [PATCH 3/4] remove token if token replace fails with permanent error (#511) --- src/Simplex/Messaging/Agent.hs | 32 +++++++++++++------ .../Messaging/Notifications/Server/Store.hs | 2 ++ tests/AgentTests/NotificationTests.hs | 6 ---- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 4ad44ef4a..1407c3862 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -765,11 +765,11 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = (Just tknId, Nothing) | savedDeviceToken == suppliedDeviceToken -> when (ntfTknStatus == NTRegistered) (registerToken tkn) $> NTRegistered - | otherwise -> replaceToken tknId $> NTRegistered + | otherwise -> replaceToken tknId (Just tknId, Just (NTAVerify code)) | savedDeviceToken == suppliedDeviceToken -> t tkn (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code - | otherwise -> replaceToken tknId $> NTRegistered + | otherwise -> replaceToken tknId (Just tknId, Just NTACheck) | savedDeviceToken == suppliedDeviceToken -> do ns <- asks ntfSupervisor @@ -781,7 +781,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCDelete pure ntfTknStatus -- TODO -- agentNtfCheckToken c tknId tkn >>= \case - | otherwise -> replaceToken tknId $> NTRegistered + | otherwise -> replaceToken tknId (Just tknId, Just NTADelete) -> do agentNtfDeleteToken c tknId tkn withStore' c (`removeNtfToken` tkn) @@ -792,13 +792,27 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = withStore' c $ \db -> updateNtfMode db tkn suppliedNtfMode pure status where - replaceToken :: NtfTokenId -> m () + replaceToken :: NtfTokenId -> m NtfTknStatus replaceToken tknId = do - agentNtfReplaceToken c tknId tkn suppliedDeviceToken - withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken ns <- asks ntfSupervisor - atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode} - _ -> + tryReplace ns `catchError` \e -> + if temporaryAgentError e || e == BROKER HOST + then throwError e + else do + withStore' c $ \db -> removeNtfToken db tkn + atomically $ nsRemoveNtfToken ns + createToken + where + tryReplace ns = do + agentNtfReplaceToken c tknId tkn suppliedDeviceToken + withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken + atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode} + pure NTRegistered + _ -> createToken + where + t tkn = withToken c tkn Nothing + createToken :: m NtfTknStatus + createToken = getNtfServer c >>= \case Just ntfServer -> asks (cmdSignAlg . config) >>= \case @@ -810,8 +824,6 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = registerToken tkn pure NTRegistered _ -> throwError $ CMD PROHIBITED - where - t tkn = withToken c tkn Nothing registerToken :: NtfToken -> m () registerToken tkn@NtfToken {ntfPubKey, ntfDhKeys = (pubDhKey, privDhKey)} = do (tknId, srvPubDhKey) <- agentNtfRegisterToken c tkn ntfPubKey pubDhKey diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 38ef6cc72..d9af0fb29 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -106,6 +106,7 @@ removeInactiveTokenRegistrations st NtfTknData {ntfTknId = tId, token} = forM_ tIds $ \(regKey, tId') -> do TM.delete regKey tknRegs TM.delete tId' $ tokens st + -- TODO remove token subscriptions as in deleteNtfToken pure $ map snd tIds removeTokenRegistration :: NtfStore -> NtfTknData -> STM () @@ -130,6 +131,7 @@ deleteNtfToken st tknId = do ) ) + -- TODO refactor qs <- TM.lookupDelete tknId (tokenSubscriptions st) >>= mapM diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index f3a0cc111..a8dbd1a17 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -276,31 +276,25 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do liftIO $ threadDelay 1000000 aliceId <- joinConnection bob True qInfo "bob's connInfo" liftIO $ threadDelay 750000 - liftIO $ print 0 void $ messageNotification apnsQ ("", _, CONF confId _ "bob's connInfo") <- get alice liftIO $ threadDelay 500000 allowConnection alice bobId confId "alice's connInfo" - liftIO $ print 1 void $ messageNotification apnsQ get bob ##> ("", aliceId, INFO "alice's connInfo") - liftIO $ print 2 void $ messageNotification apnsQ get alice ##> ("", bobId, CON) - liftIO $ print 3 void $ messageNotification apnsQ get bob ##> ("", aliceId, CON) -- bob sends message 1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello" get bob ##> ("", aliceId, SENT $ baseId + 1) - liftIO $ print 4 void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False ackMessage alice bobId $ baseId + 1 -- alice sends message 2 <- msgId <$> sendMessage alice bobId (SMP.MsgFlags True) "hey there" get alice ##> ("", bobId, SENT $ baseId + 2) - liftIO $ print 5 void $ messageNotification apnsQ get bob =##> \case ("", c, Msg "hey there") -> c == aliceId; _ -> False ackMessage bob aliceId $ baseId + 2 From da5058a0c470d0c3563922eb435464f6d1516281 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 6 Sep 2022 14:39:50 +0100 Subject: [PATCH 4/4] remove connId from message delivery queue keys (#515) --- src/Simplex/Messaging/Agent.hs | 22 +++++++++++----------- src/Simplex/Messaging/Agent/Client.hs | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1407c3862..dfeb54ab3 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -545,7 +545,7 @@ enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue enqueueMessage c cData@ConnData {connId, connAgentVersion} sq msgFlags aMessage = do resumeMsgDelivery c cData sq msgId <- storeSentMsg - queuePendingMsgs c connId sq [msgId] + queuePendingMsgs c sq [msgId] pure $ unId msgId where storeSentMsg :: m InternalId @@ -565,28 +565,28 @@ enqueueMessage c cData@ConnData {connId, connAgentVersion} sq msgFlags aMessage resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m () resumeMsgDelivery c cData@ConnData {connId} sq@SndQueue {server, sndId} = do - let qKey = (connId, server, sndId) + let qKey = (server, sndId) unlessM (queueDelivering qKey) $ async (runSmpQueueMsgDelivery c cData sq) >>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c) unlessM connQueued $ withStore' c (`getPendingMsgs` connId) - >>= queuePendingMsgs c connId sq + >>= queuePendingMsgs c sq where queueDelivering qKey = atomically $ TM.member qKey (smpQueueMsgDeliveries c) connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connMsgsQueued c) -queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m () -queuePendingMsgs c connId sq msgIds = atomically $ do +queuePendingMsgs :: AgentMonad m => AgentClient -> SndQueue -> [InternalId] -> m () +queuePendingMsgs c sq msgIds = atomically $ do modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + length msgIds} -- s <- readTVar (msgDeliveryOp c) -- unsafeIOToSTM $ putStrLn $ "msgDeliveryOp: " <> show (opsInProgress s) - q <- getPendingMsgQ c connId sq + q <- getPendingMsgQ c sq mapM_ (writeTQueue q) msgIds -getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId) -getPendingMsgQ c connId SndQueue {server, sndId} = do - let qKey = (connId, server, sndId) +getPendingMsgQ :: AgentClient -> SndQueue -> STM (TQueue InternalId) +getPendingMsgQ c SndQueue {server, sndId} = do + let qKey = (server, sndId) maybe (newMsgQueue qKey) pure =<< TM.lookup qKey (smpQueueMsgQueues c) where newMsgQueue qKey = do @@ -596,7 +596,7 @@ getPendingMsgQ c connId SndQueue {server, sndId} = do runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m () runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandshake} sq = do - mq <- atomically $ getPendingMsgQ c connId sq + mq <- atomically $ getPendingMsgQ c sq ri <- asks $ messageRetryInterval . config forever $ do atomically $ endAgentOperation c AOSndNetwork @@ -1251,7 +1251,7 @@ enqueueConfirmation :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQ enqueueConfirmation c cData@ConnData {connId, connAgentVersion} sq connInfo e2eEncryption = do resumeMsgDelivery c cData sq msgId <- storeConfirmation - queuePendingMsgs c connId sq [msgId] + queuePendingMsgs c sq [msgId] where storeConfirmation :: m InternalId storeConfirmation = withStore c $ \db -> runExceptT $ do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 329ebbb34..dbfcf589f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -160,8 +160,8 @@ data AgentClient = AgentClient subscrConns :: TVar (Set ConnId), activeSubscrConns :: TMap ConnId SMPServer, connMsgsQueued :: TMap ConnId Bool, - smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId), - smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()), + smpQueueMsgQueues :: TMap (SMPServer, SMP.SenderId) (TQueue InternalId), + smpQueueMsgDeliveries :: TMap (SMPServer, SMP.SenderId) (Async ()), ntfNetworkOp :: TVar AgentOpState, rcvNetworkOp :: TVar AgentOpState, msgDeliveryOp :: TVar AgentOpState,