debug: track pending ACKs

This commit is contained in:
Alexander Bondarenko
2024-04-02 12:54:53 +03:00
parent be9a84905f
commit fd586dee99
5 changed files with 95 additions and 27 deletions
+1
View File
@@ -59,6 +59,7 @@ dependencies:
- network-udp >= 0.0 && < 0.1
- optparse-applicative >= 0.15 && < 0.17
- process == 1.6.*
- psqueues
- random >= 1.1 && < 1.3
- simple-logger == 0.1.*
- socks == 0.6.*
+7
View File
@@ -219,6 +219,7 @@ library
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, socks ==0.6.*
@@ -293,6 +294,7 @@ executable ntf-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
@@ -368,6 +370,7 @@ executable smp-agent
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
@@ -443,6 +446,7 @@ executable smp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
@@ -518,6 +522,7 @@ executable xftp
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
@@ -593,6 +598,7 @@ executable xftp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
@@ -707,6 +713,7 @@ test-suite simplexmq-test
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues
, random >=1.1 && <1.3
, silently ==1.2.*
, simple-logger ==0.1.*
+27 -13
View File
@@ -136,19 +136,21 @@ import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
import qualified Data.OrdPSQ as OP
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.System (systemToUTCTime)
import Data.Time.Clock.System (getSystemTime, systemToUTCTime)
import Data.Traversable (mapAccumL)
import Data.Word (Word16)
import Debug.Trace
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFileRemote, deleteSndFilesInternal, deleteSndFilesRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpDeleteRcvFiles', xftpReceiveFile', xftpSendDescription', xftpSendFile')
import Simplex.FileTransfer.Description (ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Util (removePath)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock (withLock', withLock)
import Simplex.Messaging.Agent.Lock (withLock, withLock')
import Simplex.Messaging.Agent.NtfSubSupervisor
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
@@ -159,9 +161,10 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOn, pattern PQEncOff, pattern PQSupportOn, pattern PQSupportOff)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import qualified Simplex.Messaging.Encoding.Base64 as B64
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId)
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
@@ -197,7 +200,7 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode =
liftIO $ newSMPAgentEnv cfg store >>= runReaderT runAgent
where
runAgent = do
c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask
c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask
t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c)
atomically . writeTVar acThread . Just =<< mkWeakThreadId t
pure c
@@ -238,7 +241,7 @@ createUser c = withAgentEnv c .: createUser' c
{-# INLINE createUser #-}
-- | Delete user record optionally deleting all user's connections on SMP servers
deleteUser :: AgentClient -> UserId -> Bool -> AE ()
deleteUser :: AgentClient -> UserId -> Bool -> AE ()
deleteUser c = withAgentEnv c .: deleteUser' c
{-# INLINE deleteUser #-}
@@ -769,7 +772,7 @@ compatibleContactUri (CRContactUri ConnReqUriData {crAgentVRange, crSmpQueues =
AgentConfig {smpClientVRange, smpAgentVRange} <- asks config
pure $
(,)
<$> (qUri `compatibleVersion` smpClientVRange)
<$> (qUri `compatibleVersion` smpClientVRange)
<*> (crAgentVRange `compatibleVersion` smpAgentVRange pqSup)
versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport
@@ -1190,7 +1193,7 @@ enqueueMessage c cData sq msgFlags aMessage =
{-# INLINE enqueueMessage #-}
-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
enqueueMessageB :: forall t. (Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB c reqs = do
cfg <- asks config
reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db cfg) reqs
@@ -1223,7 +1226,7 @@ enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM'
enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId)
{-# INLINE enqueueSavedMessage #-}
enqueueSavedMessageB :: (Foldable t) => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB c reqs = do
-- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved
void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs
@@ -2021,7 +2024,18 @@ cleanupManager c@AgentClient {subQ} = do
notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> AM ()
notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd)
data ACKd = ACKd | ACKPending
data ACKd = ACKd | ACKPending String SMP.RecipientId SMP.EntityId
trackAcks :: AgentClient -> AM ACKd -> AM ()
trackAcks AgentClient {agentEnv = Env {acks}} a =
a >>= \case
ACKd -> pure ()
ACKPending label rId msgId -> liftIO $ do
now <- liftIO getSystemTime
atomically $ modifyTVar' acks $ OP.insert (rId, msgId) now label
traceEventIO $ mconcat ["ACK-PENDING ", logEntity rId, "/", logEntity msgId, " ", label]
where
logEntity bs = B.unpack . B64.encode $ B.take 3 bs
-- | make sure to ACK or throw in each message processing branch
-- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL
@@ -2037,7 +2051,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} =
withConnLock c connId "processSMP" $ case cmd of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
void . handleNotifyAck $ do
trackAcks c . handleNotifyAck $ do
msg' <- decryptSMPMessage rq msg
ack' <- handleNotifyAck $ case msg' of
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody
@@ -2094,7 +2108,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
A_MSG body -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
pure $ ACKPending "R/A_MSG" rId srvMsgId
A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta
QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending srvMsgId addr
QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg srvMsgId qs
@@ -2126,7 +2140,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
pure $ ACKPending "L-DUPLICATE/A_MSG" rId srvMsgId
_ -> ack
_ -> checkDuplicateHash e encryptedMsgHash >> ack
Left (AGENT (A_CRYPTO e)) -> do
@@ -2317,7 +2331,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
logServer "<--" c srv rId $ "MSG <RCPT>:" <> logSecret srvMsgId
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing
case L.nonEmpty . catMaybes $ L.toList rs of
Just rs' -> notify (RCVD msgMeta rs') $> ACKPending
Just rs' -> notify (RCVD msgMeta rs') $> ACKPending "RCVD" rId srvMsgId
Nothing -> ack
where
ack :: AM ACKd
+29 -10
View File
@@ -153,6 +153,7 @@ import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing, listToMaybe)
import qualified Data.OrdPSQ as OP
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
@@ -160,12 +161,14 @@ import Data.Text.Encoding
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import Debug.Trace
import GHC.Conc (unsafeIOToSTM)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb)
import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse)
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..), XFTPErrorType (DIGEST), XFTPVersion)
import Simplex.FileTransfer.Transport (XFTPErrorType (DIGEST), XFTPRcvChunkSpec (..), XFTPVersion)
import Simplex.FileTransfer.Types (DeletedSndChunkReplica (..), NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..))
import Simplex.FileTransfer.Util (uniqueCombine)
import Simplex.Messaging.Agent.Env.SQLite
@@ -195,6 +198,7 @@ import Simplex.Messaging.Protocol
ErrorType,
MsgFlags (..),
MsgId,
NtfPublicAuthKey,
NtfServer,
NtfServerWithAuth,
ProtoServer,
@@ -206,22 +210,21 @@ import Simplex.Messaging.Protocol
QueueIdsKeys (..),
RcvMessage (..),
RcvNtfPublicDhKey,
NtfPublicAuthKey,
SMPMsgMeta (..),
SProtocolType (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
VersionRangeSMPC,
VersionSMPC,
XFTPServer,
XFTPServerWithAuth,
VersionSMPC,
VersionRangeSMPC,
sameSrvAddr',
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util
import Simplex.Messaging.Version
@@ -531,7 +534,8 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
g <- asks random
env <- ask
liftError' (protocolClientError SMP $ B.unpack $ strEncode srv) $
getProtocolClient g tSess cfg (Just msgQ) $ clientDisconnected env v
getProtocolClient g tSess cfg (Just msgQ) $
clientDisconnected env v
clientDisconnected :: Env -> SMPClientVar -> SMPClient -> IO ()
clientDisconnected env v client = do
@@ -634,7 +638,8 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
cfg <- lift $ getClientConfig c ntfCfg
g <- asks random
liftError' (protocolClientError NTF $ B.unpack $ strEncode srv) $
getProtocolClient g tSess cfg Nothing $ clientDisconnected v
getProtocolClient g tSess cfg Nothing $
clientDisconnected v
clientDisconnected :: NtfClientVar -> NtfClient -> IO ()
clientDisconnected v client = do
@@ -656,7 +661,8 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@
cfg <- asks $ xftpCfg . config
xftpNetworkConfig <- readTVarIO useNetworkConfig
liftError' (protocolClientError XFTP $ B.unpack $ strEncode srv) $
X.getXFTPClient tSess cfg {xftpNetworkConfig} $ clientDisconnected v
X.getXFTPClient tSess cfg {xftpNetworkConfig} $
clientDisconnected v
clientDisconnected :: XFTPClientVar -> XFTPClient -> IO ()
clientDisconnected v client = do
@@ -1258,9 +1264,22 @@ disableQueuesNtfs :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentEr
disableQueuesNtfs = sendTSessionBatches "NDEL" 90 id $ sendBatch disableSMPQueuesNtfs
sendAck :: AgentClient -> RcvQueue -> MsgId -> AM ()
sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do
withSMPClient c rq ("ACK:" <> logSecret msgId) $ \smp ->
sendAck c@AgentClient {agentEnv = Env {acks, acksLate}} rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do
withSMPClient c rq ("ACK:" <> logSecret msgId) $ \smp -> do
ackSMPMessage smp rcvPrivateKey rcvId msgId
let k = (rcvId, msgId)
atomically $
OP.deleteView k <$> readTVar acks >>= \case
Just (_time, label, op') -> do
writeTVar acks op'
unsafeIOToSTM . traceEventIO $ mconcat ["ACK-MET ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId, " ", label]
Nothing ->
OP.deleteView k <$> readTVar acksLate >>= \case
Just (_time, label, op') -> do
writeTVar acksLate op'
unsafeIOToSTM . traceEventIO $ mconcat ["ACK-LATE ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId, " ", label]
Nothing ->
unsafeIOToSTM . traceEventIO $ mconcat ["ACK-DUPL ", B.unpack $ logSecret rcvId, "/", B.unpack $ logSecret msgId]
atomically $ releaseGetLock c rq
hasGetLock :: AgentClient -> RcvQueue -> STM Bool
+31 -4
View File
@@ -35,17 +35,23 @@ module Simplex.Messaging.Agent.Env.SQLite
)
where
import Control.Concurrent (threadDelay)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.ByteArray (ScrubbedBytes)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Map (Map)
import Data.OrdPSQ (OrdPSQ)
import qualified Data.OrdPSQ as OP
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Time.Clock.System (SystemTime (..))
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Word (Word16)
import Debug.Trace
import Network.Socket
import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
@@ -57,10 +63,11 @@ import Simplex.Messaging.Client
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (PQSupport, VersionRangeE2E, supportedE2EEncryptVRange)
import qualified Simplex.Messaging.Encoding.Base64 as B64
import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig)
import Simplex.Messaging.Notifications.Transport (NTFVersion)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, VersionRangeSMPC, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange)
import Simplex.Messaging.Protocol (EntityId, NtfServer, RecipientId, VersionRangeSMPC, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..))
@@ -68,6 +75,7 @@ import Simplex.Messaging.Transport.Client (defaultSMPPort)
import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors')
import System.Random (StdGen, newStdGen)
import UnliftIO (Async, SomeException)
import UnliftIO.Async (async, cancel)
import UnliftIO.STM
type AM' a = ReaderT Env IO a
@@ -201,7 +209,9 @@ data Env = Env
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor,
xftpAgent :: XFTPAgent,
multicastSubscribers :: TMVar Int
multicastSubscribers :: TMVar Int,
acks :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String),
acksLate :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String)
}
newSMPAgentEnv :: AgentConfig -> SQLiteStore -> IO Env
@@ -211,7 +221,24 @@ newSMPAgentEnv config store = do
ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config
xftpAgent <- atomically newXFTPAgent
multicastSubscribers <- newTMVarIO 0
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}
acks <- newTVarIO OP.empty
acksLate <- newTVarIO OP.empty
ackMonPid <- async $ ackMonitor acks acksLate
void $! mkWeakTVar acks (cancel ackMonPid >> putStrLn "monitor begone!")
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers, acks, acksLate}
ackMonitor :: TVar (OrdPSQ (RecipientId, EntityId) SystemTime String) -> TVar (OrdPSQ (RecipientId, EntityId) SystemTime String) -> IO a
ackMonitor acks acksLate = forever $ do
MkSystemTime now _ns <- liftIO getSystemTime
late <- atomically $ do
(late, later) <- OP.atMostView (MkSystemTime (now - 30) 0) <$> readTVar acks
late <$ writeTVar acks later
forM_ late $ \(k@(rId, msgId), time, label) -> do
traceEventIO $ concat ["ACK-MISS ", logEntity rId, "/", logEntity msgId, " ", label]
atomically $ modifyTVar' acksLate $ OP.insert k time label
threadDelay 1000000
where
logEntity bs = B.unpack . B64.encode $ B.take 3 bs
createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey keepKey Migrations.app