From fd586dee990e15b9d9e48ffa007246d2c639bc2c Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Tue, 2 Apr 2024 12:54:53 +0300 Subject: [PATCH] debug: track pending ACKs --- package.yaml | 1 + simplexmq.cabal | 7 ++++ src/Simplex/Messaging/Agent.hs | 40 +++++++++++++++-------- src/Simplex/Messaging/Agent/Client.hs | 39 ++++++++++++++++------ src/Simplex/Messaging/Agent/Env/SQLite.hs | 35 +++++++++++++++++--- 5 files changed, 95 insertions(+), 27 deletions(-) diff --git a/package.yaml b/package.yaml index 00461d890..e3784d7a1 100644 --- a/package.yaml +++ b/package.yaml @@ -59,6 +59,7 @@ dependencies: - network-udp >= 0.0 && < 0.1 - optparse-applicative >= 0.15 && < 0.17 - process == 1.6.* + - psqueues - random >= 1.1 && < 1.3 - simple-logger == 0.1.* - socks == 0.6.* diff --git a/simplexmq.cabal b/simplexmq.cabal index 50e672a1a..6919c7d82 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -219,6 +219,7 @@ library , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , socks ==0.6.* @@ -293,6 +294,7 @@ executable ntf-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -368,6 +370,7 @@ executable smp-agent , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -443,6 +446,7 @@ executable smp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -518,6 +522,7 @@ executable xftp , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -593,6 +598,7 @@ executable xftp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -707,6 +713,7 @@ test-suite simplexmq-test , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues , random >=1.1 && <1.3 , silently ==1.2.* , simple-logger ==0.1.* diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7330e823f..fd11e1c40 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -136,19 +136,21 @@ import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe) +import qualified Data.OrdPSQ as OP import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock -import Data.Time.Clock.System (systemToUTCTime) +import Data.Time.Clock.System (getSystemTime, systemToUTCTime) import Data.Traversable (mapAccumL) import Data.Word (Word16) +import Debug.Trace import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFileRemote, deleteSndFilesInternal, deleteSndFilesRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpDeleteRcvFiles', xftpReceiveFile', xftpSendDescription', xftpSendFile') import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite -import Simplex.Messaging.Agent.Lock (withLock', withLock) +import Simplex.Messaging.Agent.Lock (withLock, withLock') import Simplex.Messaging.Agent.NtfSubSupervisor import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval @@ -159,9 +161,10 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) -import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOn, pattern PQEncOff, pattern PQSupportOn, pattern PQSupportOff) +import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding +import qualified Simplex.Messaging.Encoding.Base64 as B64 import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId) import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) @@ -197,7 +200,7 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode = liftIO $ newSMPAgentEnv cfg store >>= runReaderT runAgent where runAgent = do - c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask + c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c @@ -238,7 +241,7 @@ createUser c = withAgentEnv c .: createUser' c {-# INLINE createUser #-} -- | Delete user record optionally deleting all user's connections on SMP servers -deleteUser :: AgentClient -> UserId -> Bool -> AE () +deleteUser :: AgentClient -> UserId -> Bool -> AE () deleteUser c = withAgentEnv c .: deleteUser' c {-# INLINE deleteUser #-} @@ -769,7 +772,7 @@ compatibleContactUri (CRContactUri ConnReqUriData {crAgentVRange, crSmpQueues = AgentConfig {smpClientVRange, smpAgentVRange} <- asks config pure $ (,) - <$> (qUri `compatibleVersion` smpClientVRange) + <$> (qUri `compatibleVersion` smpClientVRange) <*> (crAgentVRange `compatibleVersion` smpAgentVRange pqSup) versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport @@ -1190,7 +1193,7 @@ enqueueMessage c cData sq msgFlags aMessage = {-# INLINE enqueueMessage #-} -- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries -enqueueMessageB :: forall t. (Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) +enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) enqueueMessageB c reqs = do cfg <- asks config reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db cfg) reqs @@ -1223,7 +1226,7 @@ enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM' enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId) {-# INLINE enqueueSavedMessage #-} -enqueueSavedMessageB :: (Foldable t) => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' () +enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' () enqueueSavedMessageB c reqs = do -- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs @@ -2021,7 +2024,18 @@ cleanupManager c@AgentClient {subQ} = do notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> AM () notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd) -data ACKd = ACKd | ACKPending +data ACKd = ACKd | ACKPending String SMP.RecipientId SMP.EntityId + +trackAcks :: AgentClient -> AM ACKd -> AM () +trackAcks AgentClient {agentEnv = Env {acks}} a = + a >>= \case + ACKd -> pure () + ACKPending label rId msgId -> liftIO $ do + now <- liftIO getSystemTime + atomically $ modifyTVar' acks $ OP.insert (rId, msgId) now label + traceEventIO $ mconcat ["ACK-PENDING ", logEntity rId, "/", logEntity msgId, " ", label] + where + logEntity bs = B.unpack . B64.encode $ B.take 3 bs -- | make sure to ACK or throw in each message processing branch -- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL @@ -2037,7 +2051,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} = withConnLock c connId "processSMP" $ case cmd of SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> - void . handleNotifyAck $ do + trackAcks c . handleNotifyAck $ do msg' <- decryptSMPMessage rq msg ack' <- handleNotifyAck $ case msg' of SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody @@ -2094,7 +2108,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, A_MSG body -> do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId notify $ MSG msgMeta msgFlags body - pure ACKPending + pure $ ACKPending "R/A_MSG" rId srvMsgId A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending srvMsgId addr QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg srvMsgId qs @@ -2126,7 +2140,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, AgentMessage _ (A_MSG body) -> do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId notify $ MSG msgMeta msgFlags body - pure ACKPending + pure $ ACKPending "L-DUPLICATE/A_MSG" rId srvMsgId _ -> ack _ -> checkDuplicateHash e encryptedMsgHash >> ack Left (AGENT (A_CRYPTO e)) -> do @@ -2317,7 +2331,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing case L.nonEmpty . catMaybes $ L.toList rs of - Just rs' -> notify (RCVD msgMeta rs') $> ACKPending + Just rs' -> notify (RCVD msgMeta rs') $> ACKPending "RCVD" rId srvMsgId Nothing -> ack where ack :: AM ACKd diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 7798fcadd..9b7eed80d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -153,6 +153,7 @@ import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust, isNothing, listToMaybe) +import qualified Data.OrdPSQ as OP import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -160,12 +161,14 @@ import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) +import Debug.Trace +import GHC.Conc (unsafeIOToSTM) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb) import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse) -import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..), XFTPErrorType (DIGEST), XFTPVersion) +import Simplex.FileTransfer.Transport (XFTPErrorType (DIGEST), XFTPRcvChunkSpec (..), XFTPVersion) import Simplex.FileTransfer.Types (DeletedSndChunkReplica (..), NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..)) import Simplex.FileTransfer.Util (uniqueCombine) import Simplex.Messaging.Agent.Env.SQLite @@ -195,6 +198,7 @@ import Simplex.Messaging.Protocol ErrorType, MsgFlags (..), MsgId, + NtfPublicAuthKey, NtfServer, NtfServerWithAuth, ProtoServer, @@ -206,22 +210,21 @@ import Simplex.Messaging.Protocol QueueIdsKeys (..), RcvMessage (..), RcvNtfPublicDhKey, - NtfPublicAuthKey, SMPMsgMeta (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, + VersionRangeSMPC, + VersionSMPC, XFTPServer, XFTPServerWithAuth, - VersionSMPC, - VersionRangeSMPC, sameSrvAddr', ) import qualified Simplex.Messaging.Protocol as SMP -import Simplex.Messaging.Transport (SMPVersion) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport (SMPVersion) import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version @@ -531,7 +534,8 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, g <- asks random env <- ask liftError' (protocolClientError SMP $ B.unpack $ strEncode srv) $ - getProtocolClient g tSess cfg (Just msgQ) $ clientDisconnected env v + getProtocolClient g tSess cfg (Just msgQ) $ + clientDisconnected env v clientDisconnected :: Env -> SMPClientVar -> SMPClient -> IO () clientDisconnected env v client = do @@ -634,7 +638,8 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d cfg <- lift $ getClientConfig c ntfCfg g <- asks random liftError' (protocolClientError NTF $ B.unpack $ strEncode srv) $ - getProtocolClient g tSess cfg Nothing $ clientDisconnected v + getProtocolClient g tSess cfg Nothing $ + clientDisconnected v clientDisconnected :: NtfClientVar -> NtfClient -> IO () clientDisconnected v client = do @@ -656,7 +661,8 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@ cfg <- asks $ xftpCfg . config xftpNetworkConfig <- readTVarIO useNetworkConfig liftError' (protocolClientError XFTP $ B.unpack $ strEncode srv) $ - X.getXFTPClient tSess cfg {xftpNetworkConfig} $ clientDisconnected v + X.getXFTPClient tSess cfg {xftpNetworkConfig} $ + clientDisconnected v clientDisconnected :: XFTPClientVar -> XFTPClient -> IO () clientDisconnected v client = do @@ -1258,9 +1264,22 @@ disableQueuesNtfs :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentEr disableQueuesNtfs = sendTSessionBatches "NDEL" 90 id $ sendBatch disableSMPQueuesNtfs sendAck :: AgentClient -> RcvQueue -> MsgId -> AM () -sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do - withSMPClient c rq ("ACK:" <> logSecret msgId) $ \smp -> +sendAck c@AgentClient {agentEnv = Env {acks, acksLate}} rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do + withSMPClient c rq ("ACK:" <> logSecret msgId) $ \smp -> do ackSMPMessage smp rcvPrivateKey rcvId msgId + let k = (rcvId, msgId) + atomically $ + OP.deleteView k <$> readTVar acks >>= \case + Just (_time, label, op') -> do + writeTVar acks op' + unsafeIOToSTM . traceEventIO $ mconcat ["ACK-MET ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId, " ", label] + Nothing -> + OP.deleteView k <$> readTVar acksLate >>= \case + Just (_time, label, op') -> do + writeTVar acksLate op' + unsafeIOToSTM . traceEventIO $ mconcat ["ACK-LATE ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId, " ", label] + Nothing -> + unsafeIOToSTM . traceEventIO $ mconcat ["ACK-DUPL ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId] atomically $ releaseGetLock c rq hasGetLock :: AgentClient -> RcvQueue -> STM Bool diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index a1d060586..e813aa7ee 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -35,17 +35,23 @@ module Simplex.Messaging.Agent.Env.SQLite ) where +import Control.Concurrent (threadDelay) +import Control.Monad import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Reader import Crypto.Random import Data.ByteArray (ScrubbedBytes) +import qualified Data.ByteString.Char8 as B import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) import Data.Map (Map) +import Data.OrdPSQ (OrdPSQ) +import qualified Data.OrdPSQ as OP import Data.Time.Clock (NominalDiffTime, nominalDay) -import Data.Time.Clock.System (SystemTime (..)) +import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Word (Word16) +import Debug.Trace import Network.Socket import Numeric.Natural import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig) @@ -57,10 +63,11 @@ import Simplex.Messaging.Client import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (PQSupport, VersionRangeE2E, supportedE2EEncryptVRange) +import qualified Simplex.Messaging.Encoding.Base64 as B64 import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig) import Simplex.Messaging.Notifications.Transport (NTFVersion) import Simplex.Messaging.Notifications.Types -import Simplex.Messaging.Protocol (NtfServer, VersionRangeSMPC, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange) +import Simplex.Messaging.Protocol (EntityId, NtfServer, RecipientId, VersionRangeSMPC, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..)) @@ -68,6 +75,7 @@ import Simplex.Messaging.Transport.Client (defaultSMPPort) import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors') import System.Random (StdGen, newStdGen) import UnliftIO (Async, SomeException) +import UnliftIO.Async (async, cancel) import UnliftIO.STM type AM' a = ReaderT Env IO a @@ -201,7 +209,9 @@ data Env = Env randomServer :: TVar StdGen, ntfSupervisor :: NtfSupervisor, xftpAgent :: XFTPAgent, - multicastSubscribers :: TMVar Int + multicastSubscribers :: TMVar Int, + acks :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String), + acksLate :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String) } newSMPAgentEnv :: AgentConfig -> SQLiteStore -> IO Env @@ -211,7 +221,24 @@ newSMPAgentEnv config store = do ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config xftpAgent <- atomically newXFTPAgent multicastSubscribers <- newTMVarIO 0 - pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers} + acks <- newTVarIO OP.empty + acksLate <- newTVarIO OP.empty + ackMonPid <- async $ ackMonitor acks acksLate + void $! mkWeakTVar acks (cancel ackMonPid >> putStrLn "monitor begone!") + pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers, acks, acksLate} + +ackMonitor :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String) -> TVar (OrdPSQ (RecipientId, EntityId) SystemTime String) -> IO a +ackMonitor acks acksLate = forever $ do + MkSystemTime now _ns <- liftIO getSystemTime + late <- atomically $ do + (late, later) <- OP.atMostView (MkSystemTime (now - 30) 0) <$> readTVar acks + late <$ writeTVar acks later + forM_ late $ \(k@(rId, msgId), time, label) -> do + traceEventIO $ concat ["ACK-MISS ", logEntity rId, "/", logEntity msgId, " ", label] + atomically $ modifyTVar' acksLate $ OP.insert k time label + threadDelay 1000000 + where + logEntity bs = B.unpack . B64.encode $ B.take 3 bs createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore) createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey keepKey Migrations.app