diff --git a/package.yaml b/package.yaml index a9cdabe9b..434f6cbdd 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.8.0.0 +version: 5.8.0.2 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index c6a26e49f..ffa92dce7 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.8.0.0 +version: 5.8.0.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and @@ -105,6 +105,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 415ead6c0..c8030b206 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -112,8 +112,8 @@ closeXFTPAgent a = do where stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker) -xftpReceiveFile' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> AM RcvFileId -xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redirect}) cfArgs = do +xftpReceiveFile' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> Bool -> AM RcvFileId +xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redirect}) cfArgs approvedRelays = do g <- asks random prefixPath <- lift $ getPrefixPath "rcv.xftp" createDirectory prefixPath @@ -124,7 +124,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redi lift $ createEmptyFile =<< toFSFilePath relSavePath let saveFile = CryptoFile relSavePath cfArgs fId <- case redirect of - Nothing -> withStore c $ \db -> createRcvFile db g userId fd relPrefixPath relTmpPath saveFile + Nothing -> withStore c $ \db -> createRcvFile db g userId fd relPrefixPath relTmpPath saveFile approvedRelays Just _ -> do -- prepare description paths let relTmpPathRedirect = relPrefixPath "xftp.redirect-encrypted" @@ -134,7 +134,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redi cfArgsRedirect <- atomically $ CF.randomArgs g let saveFileRedirect = CryptoFile relSavePathRedirect $ Just cfArgsRedirect -- create download tasks - withStore c $ \db -> createRcvFileRedirect db g userId fd relPrefixPath relTmpPathRedirect saveFileRedirect relTmpPath saveFile + withStore c $ \db -> createRcvFileRedirect db g userId fd relPrefixPath relTmpPathRedirect saveFileRedirect relTmpPath saveFile approvedRelays forM_ chunks (downloadChunk c) pure fId @@ -176,12 +176,12 @@ runXFTPRcvWorker c srv Worker {doWork} = do runXFTPOperation :: AgentConfig -> AM () runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case - RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" - fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do + (RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas") + (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c - downloadFileChunk fc replica + liftIO $ waitForUserNetwork c + downloadFileChunk fc replica approvedRelays `catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e where retryLoop loop e replicaDelay = do @@ -191,9 +191,10 @@ runXFTPRcvWorker c srv Worker {doWork} = do withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e) - downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> AM () - downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do + retryDone = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) + downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM () + downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do + unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwError $ XFTP "" XFTP.NOT_APPROVED fsFileTmpPath <- lift $ toFSFilePath fileTmpPath chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) @@ -214,6 +215,10 @@ runXFTPRcvWorker c srv Worker {doWork} = do when complete . lift . void $ getXFTPRcvWorker True c Nothing where + ipAddressProtected' :: AM Bool + ipAddressProtected' = do + cfg <- liftIO $ getNetworkConfig' c + pure $ ipAddressProtected cfg srv receivedSize :: [RcvFileChunk] -> Int64 receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0 receivedChunkSize ch@RcvFileChunk {chunkSize = s} @@ -234,11 +239,11 @@ retryOnError name loop done e = do then loop else done -rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> AM () -rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do +rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM () +rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do lift $ forM_ tmpPath (removePath <=< toFSFilePath) - withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr - notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr + withStore' c $ \db -> updateRcvFileError db rcvFileId (show err) + notify c rcvFileEntityId $ RFERR err runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM () runXFTPRcvLocalWorker c Worker {doWork} = do @@ -252,7 +257,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do runXFTPOperation AgentConfig {rcvFilesTTL} = withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $ \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> - decryptFile f `catchAgentError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show) + decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath decryptFile :: RcvFile -> AM () decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do let CryptoFile savePath cfArgs = saveFile @@ -425,7 +430,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer]) withRetryInterval (riFast ri) $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c createWithNextSrv usedSrvs `catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e where @@ -458,7 +463,7 @@ runXFTPSndWorker c srv Worker {doWork} = do fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c uploadFileChunk cfg fc replica `catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where @@ -625,7 +630,7 @@ runXFTPDelWorker c srv Worker {doWork} = do processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c deleteChunkReplica `catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e where diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 7875542a6..4fb18c27a 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -14,6 +14,7 @@ module Simplex.FileTransfer.Client where import Control.Logger.Simple import Control.Monad import Control.Monad.Except +import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import Data.Bifunctor (first) import Data.ByteString.Builder (Builder, byteString) @@ -38,6 +39,7 @@ import Simplex.Messaging.Client defaultNetworkConfig, proxyUsername, transportClientConfig, + unexpectedResponse, ) import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C @@ -56,7 +58,7 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost, import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Client import Simplex.Messaging.Transport.HTTP2.File -import Simplex.Messaging.Util (bshow, liftEitherWith, liftError', tshow, whenM) +import Simplex.Messaging.Util (liftEitherWith, liftError', tshow, whenM) import Simplex.Messaging.Version import UnliftIO import UnliftIO.Directory @@ -228,13 +230,13 @@ createXFTPChunk :: createXFTPChunk c spKey file rcps auth_ = sendXFTPCommand c spKey "" (FNEW file rcps auth_) Nothing >>= \case (FRSndIds sId rIds, body) -> noFile body (sId, rIds) - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r addXFTPRecipients :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> NonEmpty C.APublicAuthKey -> ExceptT XFTPClientError IO (NonEmpty RecipientId) addXFTPRecipients c spKey fId rcps = sendXFTPCommand c spKey fId (FADD rcps) Nothing >>= \case (FRRcvIds rIds, body) -> noFile body rIds - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r uploadXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT XFTPClientError IO () uploadXFTPChunk c spKey fId chunkSpec = @@ -262,7 +264,7 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec { receiveEncFile chunkPart cbState chunkSpec `catchError` \e -> whenM (doesFileExist filePath) (removeFile filePath) >> throwError e _ -> throwError $ PCEResponseError NO_FILE - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ = @@ -286,12 +288,12 @@ pingXFTP c@XFTPClient {thParams} = do (r, _) <- sendXFTPTransmission c t Nothing case r of FRPong -> pure () - _ -> throwError $ PCEUnexpectedResponse $ bshow r + _ -> throwE $ unexpectedResponse r okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO () okResponse = \case (FROk, body) -> noFile body () - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r -- TODO this currently does not check anything because response size is not set and bodyPart is always Just noFile :: HTTP2Body -> a -> ExceptT XFTPClientError IO a diff --git a/src/Simplex/FileTransfer/Transport.hs b/src/Simplex/FileTransfer/Transport.hs index 244e00972..2f0a5de4f 100644 --- a/src/Simplex/FileTransfer/Transport.hs +++ b/src/Simplex/FileTransfer/Transport.hs @@ -217,6 +217,8 @@ data XFTPErrorType TIMEOUT | -- | bad redirect data REDIRECT {redirectError :: String} + | -- | cannot proceed with download from not approved relays without proxy + NOT_APPROVED | -- | internal server error INTERNAL | -- | used internally, never returned by the server (to be removed) @@ -249,6 +251,7 @@ instance Encoding XFTPErrorType where FILE_IO -> "FILE_IO" TIMEOUT -> "TIMEOUT" REDIRECT err -> "REDIRECT " <> smpEncode err + NOT_APPROVED -> "NOT_APPROVED" INTERNAL -> "INTERNAL" DUPLICATE_ -> "DUPLICATE_" @@ -268,6 +271,7 @@ instance Encoding XFTPErrorType where "FILE_IO" -> pure FILE_IO "TIMEOUT" -> pure TIMEOUT "REDIRECT" -> REDIRECT <$> _smpP + "NOT_APPROVED" -> pure NOT_APPROVED "INTERNAL" -> pure INTERNAL "DUPLICATE_" -> pure DUPLICATE_ _ -> fail "bad error type" diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index d4fb11dd5..a08c5c99e 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -83,6 +83,7 @@ module Simplex.Messaging.Agent setNtfServers, setNetworkConfig, getNetworkConfig, + getNetworkConfig', setUserNetworkInfo, reconnectAllServers, registerNtfToken, @@ -159,7 +160,7 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations -import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission, TransmissionType (..)) +import Simplex.Messaging.Client (ProtocolClient (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) @@ -170,7 +171,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth) +import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) import qualified Simplex.Messaging.TMap as TM @@ -426,26 +427,25 @@ setNetworkConfig c@AgentClient {useNetworkConfig} cfg' = do -- returns fast network config getNetworkConfig :: AgentClient -> IO NetworkConfig -getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig +getNetworkConfig = getNetworkConfig' {-# INLINE getNetworkConfig #-} setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO () -setUserNetworkInfo c@AgentClient {userNetworkState} UserNetworkInfo {networkType = nt', online} = withAgentEnv' c $ do - d <- asks $ initialInterval . userNetworkInterval . config - ts <- liftIO getCurrentTime - atomically $ do - ns@UserNetworkState {networkType = nt, offline} <- readTVar userNetworkState - when (nt' /= nt || online /= isNothing offline) $ - writeTVar userNetworkState $! - let offline' - | nt' /= UNNone && online = Nothing - | isJust offline = offline - | otherwise = Just UNSOffline {offlineDelay = d, offlineFrom = ts} - in ns {networkType = nt', offline = offline'} +setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkUpdated} ni = withAgentEnv' c $ do + ts' <- liftIO getCurrentTime + i <- asks $ userOfflineDelay . config + -- if network offline event happens in less than `userOfflineDelay` after the previous event, it is ignored + atomically . whenM ((isOnline ni ||) <$> notRecentlyChanged ts' i) $ do + writeTVar userNetworkInfo ni + writeTVar userNetworkUpdated $ Just ts' + where + notRecentlyChanged ts' i = + maybe True (\ts -> diffUTCTime ts' ts > i) <$> readTVar userNetworkUpdated reconnectAllServers :: AgentClient -> IO () reconnectAllServers c = do reconnectServerClients c smpClients + reconnectServerClients c xftpClients reconnectServerClients c ntfClients -- | Register device notifications token @@ -484,8 +484,8 @@ xftpStartWorkers c = withAgentEnv c . startXFTPWorkers c {-# INLINE xftpStartWorkers #-} -- | Receive XFTP file -xftpReceiveFile :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> AE RcvFileId -xftpReceiveFile c = withAgentEnv c .:. xftpReceiveFile' c +xftpReceiveFile :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> Bool -> AE RcvFileId +xftpReceiveFile c = withAgentEnv c .:: xftpReceiveFile' c {-# INLINE xftpReceiveFile #-} -- | Delete XFTP rcv file (deletes work files from file system and db records) @@ -1317,7 +1317,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork let mId = unId msgId ri' = maybe id updateRetryInterval2 msgRetryState ri withRetryLock2 ri' qLock $ \riState loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody @@ -1999,14 +1999,10 @@ getSMPServer c userId = withUserServers c userId pickServer {-# INLINE getSMPServer #-} subscriber :: AgentClient -> AM' () -subscriber c@AgentClient {subQ, msgQ} = forever $ do +subscriber c@AgentClient {msgQ} = forever $ do t <- atomically $ readTBQueue msgQ agentOperationBracket c AORcvNetwork waitUntilActive $ - tryAgentError' (processSMPTransmission c t) >>= \case - Left e -> do - logError $ tshow e - atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e) - Right _ -> return () + processSMPTransmissions c t cleanupManager :: AgentClient -> AM' () cleanupManager c@AgentClient {subQ} = do @@ -2080,28 +2076,72 @@ cleanupManager c@AgentClient {subQ} = do data ACKd = ACKd | ACKPending --- | make sure to ACK or throw in each message processing branch --- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL -processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM () -processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do - (rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId) - processSMP rq conn $ toConnData conn +-- | Make sure to ACK or throw in each message processing branch +-- It cannot be finally, as sometimes it needs to be ACK+DEL, +-- and sometimes ACK has to be sent from the consumer. +processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' () +processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do + upConnIds <- newTVarIO [] + forM_ ts $ \(entId, t) -> case t of + STEvent msgOrErr -> + withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of + Right msg -> processSMP rq conn (toConnData conn) msg + Left e -> lift $ notifyErr connId e + STResponse (Cmd SRecipient cmd) respOrErr -> + withRcvConn entId $ \rq conn -> case cmd of + -- TODO process expired responses to ACK and DEL + SMP.SUB -> case respOrErr of + Right SMP.OK -> processSubOk rq upConnIds + Right msg@SMP.MSG {} -> do + processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails + processSMP rq conn (toConnData conn) msg + Right r -> processSubErr rq $ unexpectedResponse r + Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported + _ -> pure () + STResponse {} -> pure () -- TODO process expired responses to sent messages + STUnexpectedError e -> do + logServer "<--" c srv entId $ "error: " <> bshow e + notifyErr "" e + connIds <- readTVarIO upConnIds + unless (null connIds) $ notify' "" $ UP srv connIds where - processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> AM () + withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () + withRcvConn rId a = do + tryAgentError' (withStore c $ \db -> getRcvConn db srv rId) >>= \case + Left e -> notify' "" (ERR e) + Right (rq@RcvQueue {connId}, SomeConn _ conn) -> + tryAgentError' (a rq conn) >>= \case + Left e -> notify' connId (ERR e) + Right () -> pure () + processSubOk :: RcvQueue -> TVar [ConnId] -> AM () + processSubOk rq@RcvQueue {connId} upConnIds = + atomically . whenM (isPendingSub connId) $ do + addSubscription c rq + modifyTVar' upConnIds (connId :) + processSubErr :: RcvQueue -> SMPClientError -> AM () + processSubErr rq@RcvQueue {connId} e = do + atomically . whenM (isPendingSub connId) $ failSubscription c rq e + lift $ notifyErr connId e + isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId + notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> ACommand 'Agent e -> m () + notify' connId msg = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) + notifyErr :: ConnId -> SMPClientError -> AM' () + notifyErr connId = notify' connId . ERR . protocolClientError SMP (B.unpack $ strEncode srv) + processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> BrokerMsg -> AM () processSMP - rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} + rq@RcvQueue {rcvId = rId, e2ePrivKey, e2eDhSecret, status} conn - cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} = - withConnLock c connId "processSMP" $ case cmd of - Right r@(SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId}) -> + cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} + smpMsg = + withConnLock c connId "processSMP" $ case smpMsg of + SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> void . handleNotifyAck $ do - isGET <- atomically $ hasGetLock c rq - unless isGET $ checkExpiredResponse r msg' <- decryptSMPMessage rq msg ack' <- handleNotifyAck $ case msg' of SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody SMP.ClientRcvMsgQuota {} -> queueDrained >> ack - when isGET $ notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg') + whenM (atomically $ hasGetLock c rq) $ + notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg') pure ack' where queueDrained = case conn of @@ -2260,29 +2300,23 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd handleNotifyAck :: AM ACKd -> AM ACKd handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack - Right SMP.END -> - atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND) - >>= logServer "<--" c srv rId + SMP.END -> + atomically (TM.lookup tSess (smpClients c) $>>= (tryReadTMVar . sessionVar) >>= processEND) + >>= notifyEnd where processEND = \case Just (Right clnt) - | sessId == sessionId (thParams $ connectedClient clnt) -> do - removeSubscription c connId - notify' END - pure "END" - | otherwise -> ignored - _ -> ignored - ignored = pure "END from disconnected client - ignored" - Right (SMP.ERR e) -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e - Right r@SMP.OK -> checkExpiredResponse r - Right r -> unexpected r - Left e -> notify $ ERR $ protocolClientError SMP (B.unpack $ strEncode srv) e + | sessId == sessionId (thParams $ connectedClient clnt) -> + removeSubscription c connId $> True + _ -> pure False + notifyEnd removed + | removed = notify END >> logServer "<--" c srv rId "END" + | otherwise = logServer "<--" c srv rId "END from disconnected client - ignored" + SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e + r -> unexpected r where - notify :: forall e m. MonadIO m => AEntityI e => ACommand 'Agent e -> m () - notify = atomically . notify' - - notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM () - notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) + notify :: forall e m. (AEntityI e, MonadIO m) => ACommand 'Agent e -> m () + notify = notify' connId prohibited :: AM () prohibited = notify . ERR $ AGENT A_PROHIBITED @@ -2292,25 +2326,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, unexpected :: BrokerMsg -> AM () unexpected r = do - logServer "<--" c srv rId $ "unexpected: " <> bshow cmd + logServer "<--" c srv rId $ "unexpected: " <> bshow r -- TODO add extended information about transmission type once UNEXPECTED has string notify . ERR $ BROKER (B.unpack $ strEncode srv) $ UNEXPECTED (take 32 $ show r) - checkExpiredResponse :: BrokerMsg -> AM () - checkExpiredResponse r = case tType of - TTEvent -> pure () - TTUncorrelatedResponse -> unexpected r - TTExpiredResponse (SMP.Cmd _ cmd') -> case cmd' of - SMP.SUB -> do - added <- - atomically $ - ifM - ((&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId) - (True <$ addSubscription c rq) - (pure False) - when added $ notify $ UP srv [connId] - _ -> pure () - decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> AM (SMP.PrivHeader, AgentMsgEnvelope) decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 13adb9fdc..addd889a8 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -28,6 +28,7 @@ module Simplex.Messaging.Agent.Client withConnLocks, withInvLock, withLockMap, + ipAddressProtected, closeAgentClient, closeProtocolServerClients, reconnectServerClients, @@ -41,6 +42,7 @@ module Simplex.Messaging.Agent.Client getQueueMessage, decryptSMPMessage, addSubscription, + failSubscription, addNewQueueSubscription, getSubscriptions, sendConfirmation, @@ -108,9 +110,10 @@ module Simplex.Messaging.Agent.Client waitUntilActive, UserNetworkInfo (..), UserNetworkType (..), - UserNetworkState (..), - UNSOffline (..), + getNetworkConfig', waitForUserNetwork, + isNetworkOnline, + isOnline, throwWhenInactive, throwWhenNoDelivery, beginAgentOperation, @@ -162,7 +165,6 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (partitionEithers) import Data.Functor (($>)) -import Data.Int (Int64) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L @@ -173,7 +175,7 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) import Data.Text.Encoding -import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) +import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) import qualified Database.SQLite.Simple as SQL @@ -269,7 +271,7 @@ data AgentClient = AgentClient active :: TVar Bool, rcvQ :: TBQueue (ATransmission 'Client), subQ :: TBQueue (ATransmission 'Agent), - msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg), + msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), smpServers :: TMap UserId (NonEmpty SMPServerWithAuth), smpClients :: TMap SMPTransportSession SMPClientVar, -- smpProxiedRelays: @@ -281,7 +283,8 @@ data AgentClient = AgentClient xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth), xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks - userNetworkState :: TVar UserNetworkState, + userNetworkInfo :: TVar UserNetworkInfo, + userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues, pendingSubs :: TRcvQueues, @@ -426,22 +429,20 @@ data UserNetworkInfo = UserNetworkInfo } deriving (Show) +isNetworkOnline :: AgentClient -> STM Bool +isNetworkOnline c = isOnline <$> readTVar (userNetworkInfo c) + +isOnline :: UserNetworkInfo -> Bool +isOnline UserNetworkInfo {networkType, online} = networkType /= UNNone && online + data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -data UserNetworkState = UserNetworkState - { networkType :: UserNetworkType, - offline :: Maybe UNSOffline - } - deriving (Show) - -data UNSOffline = UNSOffline {offlineDelay :: Int64, offlineFrom :: UTCTime} - deriving (Show) - -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. newAgentClient :: Int -> InitialAgentServers -> Env -> STM AgentClient newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do - let qSize = tbqSize $ config agentEnv + let cfg = config agentEnv + qSize = tbqSize cfg acThread <- newTVar Nothing active <- newTVar True rcvQ <- newTBQueue qSize @@ -455,7 +456,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpServers <- newTVar xftp xftpClients <- TM.empty useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg) - userNetworkState <- newTVar $ UserNetworkState UNOther Nothing + userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True + userNetworkUpdated <- newTVar Nothing subscrConns <- newTVar S.empty activeSubs <- RQ.empty pendingSubs <- RQ.empty @@ -492,7 +494,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = xftpServers, xftpClients, useNetworkConfig, - userNetworkState, + userNetworkInfo, + userNetworkUpdated, subscrConns, activeSubs, pendingSubs, @@ -703,7 +706,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = withRetryInterval ri $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do - waitForUserNetwork c + lift $ waitForUserNetwork c void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs loop getPending = RQ.getSessQueues tSess $ pendingSubs c @@ -720,18 +723,17 @@ reconnectSMPClient tc c tSess@(_, srv, _) qs = do -- this allows 3x of timeout per batch of subscription (90 queues per batch empirically) let t = (length qs `div` 90 + 1) * tcpTimeout * 3 ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case - Just _ -> atomically $ writeTVar tc 0 - Nothing -> - (offline <$> readTVarIO (userNetworkState c)) >>= \case - -- reset and do not report consequitive timeouts while offline - Just _ -> atomically $ writeTVar tc 0 - Nothing -> do - tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) - maxTC <- asks $ maxSubscriptionTimeouts . config - when (tc' >= maxTC) $ do - let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess - atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) + Just _ -> resetTimeouts + -- reset and do not report consecutive timeouts while offline + Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts where + resetTimeouts = atomically $ writeTVar tc 0 + notifyTimeout = do + tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1) + maxTC <- asks $ maxSubscriptionTimeouts . config + when (tc' >= maxTC) $ do + let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess + atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg) resubscribe :: AM () resubscribe = do cs <- readTVarIO $ RQ.getConnections $ activeSubs c @@ -848,43 +850,22 @@ getClientConfig c cfgSel = do getNetworkConfig :: AgentClient -> STM NetworkConfig getNetworkConfig c = do (slowCfg, fastCfg) <- readTVar (useNetworkConfig c) - UserNetworkState {networkType} <- readTVar (userNetworkState c) + UserNetworkInfo {networkType} <- readTVar $ userNetworkInfo c pure $ case networkType of UNCellular -> slowCfg UNNone -> slowCfg _ -> fastCfg -waitForUserNetwork :: AgentClient -> AM' () -waitForUserNetwork AgentClient {userNetworkState} = - readTVarIO userNetworkState >>= mapM_ waitWhileOffline . offline - where - waitWhileOffline UNSOffline {offlineDelay = d} = - unlessM (liftIO $ waitOnline d False) $ do - -- network delay reached, increase delay - ts' <- liftIO getCurrentTime - ni <- asks $ userNetworkInterval . config - atomically $ do - ns@UserNetworkState {offline} <- readTVar userNetworkState - forM_ offline $ \UNSOffline {offlineDelay = d', offlineFrom = ts} -> - -- Using `min` to avoid multiple updates in a short period of time - -- and to reset `offlineDelay` if network went `on` and `off` again. - writeTVar userNetworkState $! - let d'' = nextRetryDelay (diffToMicroseconds $ diffUTCTime ts' ts) (min d d') ni - in ns {offline = Just UNSOffline {offlineDelay = d'', offlineFrom = ts}} - waitOnline :: Int64 -> Bool -> IO Bool - waitOnline t online' - | t <= 0 = pure online' - | otherwise = - registerDelay (fromIntegral maxWait) - >>= atomically . onlineOrDelay - >>= waitOnline (t - maxWait) - where - maxWait = min t $ fromIntegral (maxBound :: Int) - onlineOrDelay delay = do - online <- isNothing . offline <$> readTVar userNetworkState - expired <- readTVar delay - unless (online || expired) retry - pure online +-- returns fast network config +getNetworkConfig' :: AgentClient -> IO NetworkConfig +getNetworkConfig' = fmap snd . readTVarIO . useNetworkConfig +{-# INLINE getNetworkConfig' #-} + +waitForUserNetwork :: AgentClient -> IO () +waitForUserNetwork c = + unlessM (atomically $ isNetworkOnline c) $ do + delay <- registerDelay $ userNetworkInterval $ config $ agentEnv c + atomically $ unlessM (isNetworkOnline c) $ unlessM (readTVar delay) retry closeAgentClient :: AgentClient -> IO () closeAgentClient c = do @@ -1106,7 +1087,7 @@ protocolClientError :: (Show err, Encoding err) => (HostName -> err -> AgentErro protocolClientError protocolError_ host = \case PCEProtocolError e -> protocolError_ host e PCEResponseError e -> BROKER host $ RESPONSE $ B.unpack $ smpEncode e - PCEUnexpectedResponse r -> BROKER host $ UNEXPECTED $ take 32 $ show r + PCEUnexpectedResponse e -> BROKER host $ UNEXPECTED $ B.unpack e PCEResponseTimeout -> BROKER host TIMEOUT PCENetworkError -> BROKER host NETWORK PCEIncompatibleHost -> BROKER host HOST @@ -1296,9 +1277,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () processSubResult c rq@RcvQueue {connId} = \case Left e -> - unless (temporaryClientError e) $ do - RQ.deleteQueue rq (pendingSubs c) - TM.insert (RQ.qKey rq) e (removedSubs c) + unless (temporaryClientError e) $ + failSubscription c rq e Right () -> whenM (hasPendingSubscription c connId) $ addSubscription c rq @@ -1418,6 +1398,11 @@ addSubscription c rq@RcvQueue {connId} = do RQ.addQueue rq $ activeSubs c RQ.deleteQueue rq $ pendingSubs c +failSubscription :: AgentClient -> RcvQueue -> SMPClientError -> STM () +failSubscription c rq e = do + RQ.deleteQueue rq (pendingSubs c) + TM.insert (RQ.qKey rq) e (removedSubs c) + addPendingSubscription :: AgentClient -> RcvQueue -> STM () addPendingSubscription c rq@RcvQueue {connId} = do modifyTVar' (subscrConns c) $ S.insert connId diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 07d3f29a8..9613adf3c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -92,7 +92,8 @@ data AgentConfig = AgentConfig xftpCfg :: XFTPClientConfig, reconnectInterval :: RetryInterval, messageRetryInterval :: RetryInterval2, - userNetworkInterval :: RetryInterval, + userNetworkInterval :: Int, + userOfflineDelay :: NominalDiffTime, messageTimeout :: NominalDiffTime, connDeleteDeliveryTimeout :: NominalDiffTime, helloTimeout :: NominalDiffTime, @@ -147,14 +148,6 @@ defaultMessageRetryInterval = } } -defaultUserNetworkInterval :: RetryInterval -defaultUserNetworkInterval = - RetryInterval - { initialInterval = 1200_000000, -- 20 minutes - increaseAfter = 0, - maxInterval = 7200_000000 -- 2 hours - } - defaultAgentConfig :: AgentConfig defaultAgentConfig = AgentConfig @@ -170,7 +163,8 @@ defaultAgentConfig = xftpCfg = defaultXFTPClientConfig, reconnectInterval = defaultReconnectInterval, messageRetryInterval = defaultMessageRetryInterval, - userNetworkInterval = defaultUserNetworkInterval, + userNetworkInterval = 1800_000000, -- 30 minutes, should be less than Int32 max value + userOfflineDelay = 2, -- if network offline event happens in less than 2 seconds after it was set online, it is ignored messageTimeout = 2 * nominalDay, connDeleteDeliveryTimeout = 2 * nominalDay, helloTimeout = 2 * nominalDay, @@ -179,7 +173,7 @@ defaultAgentConfig = cleanupInterval = 30 * 60 * 1000000, -- 30 minutes cleanupStepInterval = 200000, -- 200ms maxWorkerRestartsPerMin = 5, - -- 3 consecutive subscription timeouts will result in alert to the user + -- 5 consecutive subscription timeouts will result in alert to the user -- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking. maxSubscriptionTimeouts = 5, storedMsgDataTTL = 21 * nominalDay, diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index ae0066328..4aaa5f278 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -161,7 +161,7 @@ runNtfWorker c srv Worker {doWork} = do logInfo $ "runNtfWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> AM () @@ -245,7 +245,7 @@ runNtfSMPWorker c srv Worker {doWork} = do logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - lift $ waitForUserNetwork c + liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM () diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index e47d2a15c..04dd826a6 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -2275,20 +2275,20 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do firstRow fromOnly SEXFTPServerNotFound $ DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash) -createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId) -createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath file = runExceptT $ do - (rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile db gVar userId fd prefixPath tmpPath file Nothing Nothing +createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Bool -> IO (Either StoreError RcvFileId) +createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath file approvedRelays = runExceptT $ do + (rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile db gVar userId fd prefixPath tmpPath file Nothing Nothing approvedRelays liftIO $ forM_ chunks $ \fc@FileChunk {replicas} -> do chunkId <- insertRcvFileChunk db fc rcvFileId forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertRcvFileChunkReplica db rno replica chunkId pure rcvFileEntityId -createRcvFileRedirect :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription FRecipient -> FilePath -> FilePath -> CryptoFile -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId) -createRcvFileRedirect _ _ _ FileDescription {redirect = Nothing} _ _ _ _ _ = pure $ Left $ SEInternal "createRcvFileRedirect called without redirect" -createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redirectChunks, redirect = Just RedirectFileInfo {size, digest}} prefixPath redirectPath redirectFile dstPath dstFile = runExceptT $ do - (dstEntityId, dstId) <- ExceptT $ insertRcvFile db gVar userId dummyDst prefixPath dstPath dstFile Nothing Nothing - (_, redirectId) <- ExceptT $ insertRcvFile db gVar userId redirectFd prefixPath redirectPath redirectFile (Just dstId) (Just dstEntityId) +createRcvFileRedirect :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription FRecipient -> FilePath -> FilePath -> CryptoFile -> FilePath -> CryptoFile -> Bool -> IO (Either StoreError RcvFileId) +createRcvFileRedirect _ _ _ FileDescription {redirect = Nothing} _ _ _ _ _ _ = pure $ Left $ SEInternal "createRcvFileRedirect called without redirect" +createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redirectChunks, redirect = Just RedirectFileInfo {size, digest}} prefixPath redirectPath redirectFile dstPath dstFile approvedRelays = runExceptT $ do + (dstEntityId, dstId) <- ExceptT $ insertRcvFile db gVar userId dummyDst prefixPath dstPath dstFile Nothing Nothing approvedRelays + (_, redirectId) <- ExceptT $ insertRcvFile db gVar userId redirectFd prefixPath redirectPath redirectFile (Just dstId) (Just dstEntityId) approvedRelays liftIO $ forM_ redirectChunks $ \fc@FileChunk {replicas} -> do chunkId <- insertRcvFileChunk db fc redirectId @@ -2308,8 +2308,8 @@ createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redire chunks = [] } -insertRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Maybe DBRcvFileId -> Maybe RcvFileId -> IO (Either StoreError (RcvFileId, DBRcvFileId)) -insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSize, redirect} prefixPath tmpPath (CryptoFile savePath cfArgs) redirectId_ redirectEntityId_ = runExceptT $ do +insertRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Maybe DBRcvFileId -> Maybe RcvFileId -> Bool -> IO (Either StoreError (RcvFileId, DBRcvFileId)) +insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSize, redirect} prefixPath tmpPath (CryptoFile savePath cfArgs) redirectId_ redirectEntityId_ approvedRelays = runExceptT $ do let (redirectDigest_, redirectSize_) = case redirect of Just RedirectFileInfo {digest = d, size = s} -> (Just d, Just s) Nothing -> (Nothing, Nothing) @@ -2317,8 +2317,8 @@ insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSiz createWithRandomId gVar $ \rcvFileEntityId -> DB.execute db - "INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, save_file_key, save_file_nonce, status, redirect_id, redirect_entity_id, redirect_digest, redirect_size) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" - ((rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath) :. (savePath, fileKey <$> cfArgs, fileNonce <$> cfArgs, RFSReceiving, redirectId_, redirectEntityId_, redirectDigest_, redirectSize_)) + "INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, save_file_key, save_file_nonce, status, redirect_id, redirect_entity_id, redirect_digest, redirect_size, approved_relays) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + ((rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath) :. (savePath, fileKey <$> cfArgs, fileNonce <$> cfArgs, RFSReceiving, redirectId_, redirectEntityId_, redirectDigest_, redirectSize_, approvedRelays)) rcvFileId <- liftIO $ insertedRowId db pure (rcvFileEntityId, rcvFileId) @@ -2468,7 +2468,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () deleteRcvFile' db rcvFileId = DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) -getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFileChunk)) +getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool))) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd) where @@ -2492,7 +2492,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d LIMIT 1 |] (host, port, keyHash, RFSReceiving, cutoffTs) - getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError RcvFileChunk) + getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool)) getChunkData (rcvFileChunkReplicaId, _fileId) = firstRow toChunk SEFileNotFound $ DB.query @@ -2500,7 +2500,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d [sql| SELECT f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries, + f.approved_relays FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id @@ -2509,20 +2510,22 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d |] (Only rcvFileChunkReplicaId) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int)) -> RcvFileChunk - toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) = - RcvFileChunk - { rcvFileId, - rcvFileEntityId, - userId, - rcvChunkId, - chunkNo, - chunkSize, - digest, - fileTmpPath, - chunkTmpPath, - replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] - } + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool) + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) = + ( RcvFileChunk + { rcvFileId, + rcvFileEntityId, + userId, + rcvChunkId, + chunkNo, + chunkSize, + digest, + fileTmpPath, + chunkTmpPath, + replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] + }, + approvedRelays + ) getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile)) getNextRcvFileToDecrypt db ttl = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 344a3f9ce..5a5ed5b5b 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -71,6 +71,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_deliver import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -110,7 +111,8 @@ schemaMigrations = ("m20240121_message_delivery_indexes", m20240121_message_delivery_indexes, Just down_m20240121_message_delivery_indexes), ("m20240124_file_redirect", m20240124_file_redirect, Just down_m20240124_file_redirect), ("m20240223_connections_wait_delivery", m20240223_connections_wait_delivery, Just down_m20240223_connections_wait_delivery), - ("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem) + ("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem), + ("m20240417_rcv_files_approved_relays", m20240417_rcv_files_approved_relays, Just down_m20240417_rcv_files_approved_relays) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240417_rcv_files_approved_relays.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240417_rcv_files_approved_relays.hs new file mode 100644 index 000000000..9eb10c27a --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240417_rcv_files_approved_relays.hs @@ -0,0 +1,18 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20240417_rcv_files_approved_relays :: Query +m20240417_rcv_files_approved_relays = + [sql| +ALTER TABLE rcv_files ADD COLUMN approved_relays INTEGER NOT NULL DEFAULT 0; +|] + +down_m20240417_rcv_files_approved_relays :: Query +down_m20240417_rcv_files_approved_relays = + [sql| +ALTER TABLE rcv_files DROP COLUMN approved_relays; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 0818be904..caf94418a 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -287,6 +287,7 @@ CREATE TABLE rcv_files( redirect_entity_id BLOB, redirect_size INTEGER, redirect_digest BLOB, + approved_relays INTEGER NOT NULL DEFAULT 0, UNIQUE(rcv_file_entity_id) ); CREATE TABLE rcv_file_chunks( diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 97a2867ca..ecf4ee766 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -65,6 +65,7 @@ module Simplex.Messaging.Client ProtocolClientError (..), SMPClientError, ProxyClientError (..), + unexpectedResponse, ProtocolClientConfig (..), NetworkConfig (..), TransportSessionMode (..), @@ -80,8 +81,8 @@ module Simplex.Messaging.Client proxyUsername, temporaryClientError, smpProxyError, - ServerTransmission, - TransmissionType (..), + ServerTransmissionBatch, + ServerTransmission (..), ClientCommand, -- * For testing @@ -111,7 +112,7 @@ import Data.Int (Int64) import Data.List (find) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L -import Data.Maybe (fromMaybe) +import Data.Maybe (catMaybes, fromMaybe) import Data.Time.Clock (UTCTime (..), diffUTCTime, getCurrentTime) import qualified Data.X509 as X import qualified Data.X509.Validation as XV @@ -155,7 +156,7 @@ data PClient v err msg = PClient sentCommands :: TMap CorrId (Request err msg), sndQ :: TBQueue ByteString, rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)), - msgQ :: Maybe (TBQueue (ServerTransmission v err msg)) + msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg)) } smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> STM SMPClient @@ -206,10 +207,14 @@ type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg -- | Type for client command data type ClientCommand msg = (Maybe C.APrivateAuthKey, EntityId, ProtoCommand msg) --- | Type synonym for transmission from some SPM server queue. -type ServerTransmission v err msg = (TransportSession msg, Version v, SessionId, TransmissionType msg, EntityId, Either (ProtocolClientError err) msg) +-- | Type synonym for transmission from SPM servers. +-- Batch response is presented as a single `ServerTransmissionBatch` tuple. +type ServerTransmissionBatch v err msg = (TransportSession msg, Version v, SessionId, NonEmpty (EntityId, ServerTransmission err msg)) -data TransmissionType msg = TTEvent | TTUncorrelatedResponse | TTExpiredResponse (ProtoCommand msg) +data ServerTransmission err msg + = STEvent (Either (ProtocolClientError err) msg) + | STResponse (ProtoCommand msg) (Either (ProtocolClientError err) msg) + | STUnexpectedError (ProtocolClientError err) data HostMode = -- | prefer (or require) onion hosts when connecting via SOCKS proxy @@ -396,7 +401,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId) -- -- A single queue can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. -getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmission v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) +getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret} msgQ disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> @@ -498,38 +503,48 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize maxCnt = smpPingCount networkConfig process :: ProtocolClient v err msg -> IO () - process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= mapM_ (processMsg c) + process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c - processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO () - processMsg c@ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr)) - | not $ B.null $ bs corrId = + processMsgs :: ProtocolClient v err msg -> NonEmpty (SignedTransmission err msg) -> IO () + processMsgs c ts = do + ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts) + forM_ msgQ $ \q -> + mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts') + + processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg)) + processMsg ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr)) + | B.null $ bs corrId = sendMsg $ STEvent clientResp + | otherwise = atomically (TM.lookup corrId sentCommands) >>= \case - Nothing -> sendMsg TTUncorrelatedResponse + Nothing -> sendMsg $ STUnexpectedError unexpected Just Request {entityId, command, pending, responseVar} -> do wasPending <- atomically $ do TM.delete corrId sentCommands ifM (swapTVar pending False) - (True <$ tryPutTMVar responseVar (response entityId)) + (True <$ tryPutTMVar responseVar (if entityId == entId then clientResp else Left unexpected)) (pure False) - unless wasPending $ sendMsg $ if entityId == entId then TTExpiredResponse command else TTUncorrelatedResponse - | otherwise = sendMsg TTEvent + if wasPending + then pure Nothing + else sendMsg $ if entityId == entId then STResponse command clientResp else STUnexpectedError unexpected where - response entityId - | entityId == entId = clientResp - | otherwise = Left . PCEUnexpectedResponse $ bshow respOrErr + unexpected = unexpectedResponse respOrErr clientResp = case respOrErr of Left e -> Left $ PCEResponseError e Right r -> case protocolError r of Just e -> Left $ PCEProtocolError e _ -> Right r - sendMsg :: TransmissionType msg -> IO () - sendMsg tType = case msgQ of - Just q -> atomically $ writeTBQueue q $ serverTransmission c tType entId clientResp - Nothing -> case clientResp of - Left e -> logError $ "SMP client error: " <> tshow e - Right _ -> logWarn $ "SMP client unprocessed event" + sendMsg :: ServerTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg)) + sendMsg t = case msgQ of + Just _ -> pure $ Just (entId, t) + Nothing -> + Nothing <$ case clientResp of + Left e -> logError $ "SMP client error: " <> tshow e + Right _ -> logWarn "SMP client unprocessed event" + +unexpectedResponse :: Show r => r -> ProtocolClientError err +unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show proxyUsername :: TransportSession msg -> ByteString proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_ @@ -585,7 +600,7 @@ smpProxyError :: SMPClientError -> ErrorType smpProxyError = \case PCEProtocolError e -> PROXY $ PROTOCOL e PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e - PCEUnexpectedResponse s -> PROXY $ BROKER $ UNEXPECTED $ B.unpack $ B.take 32 s + PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e PCEResponseTimeout -> PROXY $ BROKER TIMEOUT PCENetworkError -> PROXY $ BROKER NETWORK PCEIncompatibleHost -> PROXY $ BROKER HOST @@ -606,7 +621,7 @@ createSMPQueue :: createSMPQueue c (rKey, rpKey) dhKey auth subMode = sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey auth subMode) >>= \case IDS qik -> pure qik - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to the SMP queue. -- @@ -617,7 +632,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d sendSMPCommand c (Just rpKey) rId SUB >>= \case OK -> pure () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to multiple SMP queues batching commands if supported. subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ())) @@ -637,15 +652,15 @@ processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SM processSUBResponse c (Response rId r) = case r of Right OK -> pure $ Right () Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right () - Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r' + Right r' -> pure . Left $ unexpectedResponse r' Left e -> pure $ Left e writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () -writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c TTEvent rId (Right msg)) (msgQ $ client_ c) +writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c) -serverTransmission :: ProtocolClient v err msg -> TransmissionType msg -> RecipientId -> Either (ProtocolClientError err) msg -> ServerTransmission v err msg -serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} tType entityId msgOrErr = - (transportSession, thVersion, sessionId, tType, entityId, msgOrErr) +serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg +serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} ts = + (transportSession, thVersion, sessionId, ts) -- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue -- @@ -655,7 +670,7 @@ getSMPMessage c rpKey rId = sendSMPCommand c (Just rpKey) rId GET >>= \case OK -> pure Nothing cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to the SMP queue notifications. -- @@ -683,7 +698,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey = sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey) - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Enable notifications for the multiple queues for push notifications server. enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey))) @@ -692,7 +707,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs process (Response _ r) = case r of Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey) - Right r' -> Left . PCEUnexpectedResponse $ bshow r' + Right r' -> Left $ unexpectedResponse r' Left e -> Left e -- | Disable notifications for the queue for push notifications server. @@ -714,7 +729,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags - sendSMPMessage c spKey sId flags msg = sendSMPCommand c spKey sId (SEND flags msg) >>= \case OK -> pure () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Acknowledge message delivery (server deletes the message). -- @@ -724,7 +739,7 @@ ackSMPMessage c rpKey rId msgId = sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case OK -> return () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Irreversibly suspend SMP queue. -- The existing messages from the queue will still be delivered. @@ -756,7 +771,7 @@ connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, t case supportedClientSMPRelayVRange `compatibleVersion` vr of Nothing -> throwE $ transportErr TEVersion Just (Compatible v) -> liftEitherWith (const $ transportErr $ TEHandshake IDENTITY) $ ProxiedRelay sId v <$> validateRelay chain key - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r | otherwise = throwE $ PCETransportError TEVersion where tOut = Just $ tcpConnectTimeout + tcpTimeout @@ -862,14 +877,14 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c (_auth, _signed, (_c, _e, cmd)) -> case cmd of Right OK -> pure $ Right () Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay - Right e -> throwE $ PCEUnexpectedResponse $ B.take 32 $ bshow e + Right r' -> throwE $ unexpectedResponse r' Left e -> throwE $ PCEResponseError e _ -> throwE $ PCETransportError TEBadBlock ERR e -> pure . Left $ ProxyProtocolError e -- this will not happen, this error is returned via Left _ -> pure . Left $ ProxyUnexpectedResponse $ take 32 $ show r Left e -> case e of PCEProtocolError e' -> pure . Left $ ProxyProtocolError e' - PCEUnexpectedResponse r -> pure . Left $ ProxyUnexpectedResponse $ B.unpack r + PCEUnexpectedResponse e' -> pure . Left $ ProxyUnexpectedResponse $ B.unpack e' PCEResponseError e' -> pure . Left $ ProxyResponseError e' _ -> throwE e @@ -894,13 +909,13 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId = r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r' pure fwdResponse - r -> throwE . PCEUnexpectedResponse $ B.take 32 $ bshow r + r -> throwE $ unexpectedResponse r okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () okSMPCommand cmd c pKey qId = sendSMPCommand c (Just pKey) qId cmd >>= \case OK -> return () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ())) okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs @@ -909,7 +924,7 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs process (Response _ r) = case r of Right OK -> Right () - Right r' -> Left . PCEUnexpectedResponse $ bshow r' + Right r' -> Left $ unexpectedResponse r' Left e -> Left e -- | Send SMP command diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index e54c6e5ff..a7732b4d4 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -15,6 +15,7 @@ module Simplex.Messaging.Client.Agent where import Control.Concurrent (forkIO) import Control.Concurrent.Async (Async, uninterruptibleCancel) +import Control.Concurrent.STM (retry) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -46,19 +47,18 @@ import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport -import Simplex.Messaging.Util (catchAll_, ifM, toChunks, ($>>=)) +import Simplex.Messaging.Util (catchAll_, ifM, toChunks, whenM, ($>>=)) import System.Timeout (timeout) import UnliftIO (async) import UnliftIO.Exception (Exception) import qualified UnliftIO.Exception as E import UnliftIO.STM -type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient) +type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient)) data SMPClientAgentEvent = CAConnected SMPServer | CADisconnected SMPServer (Set SMPSub) - | CAReconnected SMPServer | CAResubscribed SMPServer (NonEmpty SMPSub) | CASubError SMPServer (NonEmpty (SMPSub, SMPClientError)) @@ -75,7 +75,8 @@ data SMPClientAgentConfig = SMPClientAgentConfig persistErrorInterval :: NominalDiffTime, msgQSize :: Natural, agentQSize :: Natural, - agentSubsBatchSize :: Int + agentSubsBatchSize :: Int, + ownServerDomains :: [ByteString] } defaultSMPClientAgentConfig :: SMPClientAgentConfig @@ -91,25 +92,28 @@ defaultSMPClientAgentConfig = persistErrorInterval = 0, msgQSize = 256, agentQSize = 256, - agentSubsBatchSize = 900 + agentSubsBatchSize = 900, + ownServerDomains = [] } where second = 1000000 data SMPClientAgent = SMPClientAgent { agentCfg :: SMPClientAgentConfig, - msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg), + active :: TVar Bool, + msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, - smpSessions :: TMap SessionId SMPClient, + smpSessions :: TMap SessionId (OwnServer, SMPClient), srvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey), pendingSrvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey), - reconnections :: TVar [Async ()], - asyncClients :: TVar [Async ()], + smpSubWorkers :: TMap SMPServer (SessionVar (Async ())), workerSeq :: TVar Int } +type OwnServer = Bool + newtype InternalException e = InternalException {unInternalException :: e} deriving (Eq, Show) @@ -137,18 +141,19 @@ instance Exception e => MonadUnliftIO (ExceptT e (ReaderT r IO)) where newSMPClientAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> STM SMPClientAgent newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg = do + active <- newTVar True msgQ <- newTBQueue msgQSize agentQ <- newTBQueue agentQSize smpClients <- TM.empty smpSessions <- TM.empty srvSubs <- TM.empty pendingSrvSubs <- TM.empty - reconnections <- newTVar [] - asyncClients <- newTVar [] + smpSubWorkers <- TM.empty workerSeq <- newTVar 0 pure SMPClientAgent { agentCfg, + active, msgQ, agentQ, randomDrg, @@ -156,19 +161,23 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg smpSessions, srvSubs, pendingSrvSubs, - reconnections, - asyncClients, + smpSubWorkers, workerSeq } +-- | Get or create SMP client for SMPServer getSMPServerClient' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO SMPClient -getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg, workerSeq} srv = - atomically getClientVar >>= either newSMPClient waitForSMPClient +getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv +{-# INLINE getSMPServerClient' #-} + +getSMPServerClient'' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient) +getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = + atomically getClientVar >>= either (ExceptT . newSMPClient) waitForSMPClient where getClientVar :: STM (Either SMPClientVar SMPClientVar) getClientVar = getSessVar workerSeq srv smpClients - waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient + waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient) waitForSMPClient v = do let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) @@ -178,52 +187,50 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, Just (Left (e, Just ts)) -> ifM ((ts <) <$> liftIO getCurrentTime) - (atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv) + (atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv) (throwE e) Nothing -> throwE PCEResponseTimeout - newSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient - newSMPClient v = tryConnectClient pure (liftIO tryConnectAsync) - where - tryConnectClient :: (SMPClient -> ExceptT SMPClientError IO a) -> ExceptT SMPClientError IO () -> ExceptT SMPClientError IO a - tryConnectClient successAction retryAction = - tryE (connectClient v) >>= \r -> case r of - Right smp -> do - logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv - atomically $ do - putTMVar (sessionVar v) (Right smp) - TM.insert (sessionId $ thParams smp) smp smpSessions - successAction smp - Left e -> do - if - | e == PCENetworkError || e == PCEResponseTimeout -> retryAction - | persistErrorInterval agentCfg == 0 -> do - atomically $ do - putTMVar (sessionVar v) (Left (e, Nothing)) - removeSessVar v srv smpClients - | otherwise -> do - ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime - atomically $ putTMVar (sessionVar v) (Left (e, Just ts)) - throwE e - tryConnectAsync :: IO () - tryConnectAsync = do - a <- async $ void $ runExceptT connectAsync - atomically $ modifyTVar' (asyncClients ca) (a :) - connectAsync :: ExceptT SMPClientError IO () - connectAsync = - withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> - void $ tryConnectClient (const reconnectClient) loop + newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient)) + newSMPClient v = do + r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError) + case r of + Right smp -> do + logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv + let c = (isOwnServer ca srv, smp) + atomically $ do + putTMVar (sessionVar v) (Right c) + TM.insert (sessionId $ thParams smp) c smpSessions + notify ca $ CAConnected srv + pure $ Right c + Left e -> do + if persistErrorInterval agentCfg == 0 || e == PCENetworkError || e == PCEResponseTimeout + then atomically $ do + putTMVar (sessionVar v) (Left (e, Nothing)) + removeSessVar v srv smpClients + else do + ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime + atomically $ putTMVar (sessionVar v) (Left (e, Just ts)) + reconnectClient ca srv + pure $ Left e - connectClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient - connectClient v = ExceptT $ getProtocolClient randomDrg (1, srv, Nothing) (smpCfg agentCfg) (Just msgQ) (clientDisconnected v) +isOwnServer :: SMPClientAgent -> SMPServer -> OwnServer +isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = + let srv = strEncode $ L.head host + in any (\s -> s == srv || (B.cons '.' s) `B.isSuffixOf` srv) (ownServerDomains agentCfg) - clientDisconnected :: SMPClientVar -> SMPClient -> IO () - clientDisconnected v smp = do - removeClientAndSubs v smp >>= (`forM_` serverDown) +-- | Run an SMP client for SMPClientVar +connectClient :: SMPClientAgent -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) +connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg} srv v = + getProtocolClient randomDrg (1, srv, Nothing) (smpCfg agentCfg) (Just msgQ) clientDisconnected + where + clientDisconnected :: SMPClient -> IO () + clientDisconnected smp = do + removeClientAndSubs smp >>= (`forM_` serverDown) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv - removeClientAndSubs :: SMPClientVar -> SMPClient -> IO (Maybe (Map SMPSub C.APrivateAuthKey)) - removeClientAndSubs v smp = atomically $ do + removeClientAndSubs :: SMPClient -> IO (Maybe (Map SMPSub C.APrivateAuthKey)) + removeClientAndSubs smp = atomically $ do removeSessVar v srv smpClients TM.delete (sessionId $ thParams smp) smpSessions TM.lookupDelete srv (srvSubs ca) >>= mapM updateSubs @@ -241,70 +248,89 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, serverDown :: Map SMPSub C.APrivateAuthKey -> IO () serverDown ss = unless (M.null ss) $ do - notify . CADisconnected srv $ M.keysSet ss - reconnectServer + notify ca . CADisconnected srv $ M.keysSet ss + reconnectClient ca srv - reconnectServer :: IO () - reconnectServer = do - a <- async $ void $ runExceptT tryReconnectClient - atomically $ modifyTVar' (reconnections ca) (a :) +-- | Spawn reconnect worker if needed +reconnectClient :: SMPClientAgent -> SMPServer -> IO () +reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} srv = + whenM (readTVarIO active) $ atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ())) + where + getWorkerVar = + ifM + (null <$> getPending) + (pure Nothing) -- prevent race with cleanup and adding pending queues in another call + (Just <$> getSessVar workerSeq srv smpSubWorkers) + newSubWorker :: SessionVar (Async ()) -> IO () + newSubWorker v = do + a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v) + atomically $ putTMVar (sessionVar v) a + runSubWorker = + withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> do + pending <- atomically getPending + forM_ pending $ \cs -> whenM (readTVarIO active) $ do + void $ tcpConnectTimeout `timeout` runExceptT (reconnectSMPClient ca srv cs) + loop + ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg + getPending = mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca) + cleanup :: SessionVar (Async ()) -> STM () + cleanup v = do + -- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar. + -- Not waiting may result in terminated worker remaining in the map. + whenM (isEmptyTMVar $ sessionVar v) retry + removeSessVar v srv smpSubWorkers - tryReconnectClient :: ExceptT SMPClientError IO () - tryReconnectClient = do - withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> - reconnectClient `catchE` const loop - - reconnectClient :: ExceptT SMPClientError IO () - reconnectClient = do - withSMP ca srv $ \smp -> do - liftIO $ notify $ CAReconnected srv - cs_ <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca) - forM_ cs_ $ \cs -> do - subs' <- filterM (fmap not . atomically . hasSub (srvSubs ca) srv . fst) $ M.assocs cs - let (nSubs, rSubs) = partition (isNotifier . fst . fst) subs' - subscribe_ smp SPNotifier nSubs - subscribe_ smp SPRecipient rSubs +reconnectSMPClient :: SMPClientAgent -> SMPServer -> Map SMPSub C.APrivateAuthKey -> ExceptT SMPClientError IO () +reconnectSMPClient ca@SMPClientAgent {agentCfg} srv cs = + withSMP ca srv $ \smp -> do + subs' <- filterM (fmap not . atomically . hasSub (srvSubs ca) srv . fst) $ M.assocs cs + let (nSubs, rSubs) = partition (isNotifier . fst . fst) subs' + subscribe_ smp SPNotifier nSubs + subscribe_ smp SPRecipient rSubs + where + isNotifier = \case + SPNotifier -> True + SPRecipient -> False + subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateAuthKey)] -> ExceptT SMPClientError IO () + subscribe_ smp party = mapM_ subscribeBatch . toChunks (agentSubsBatchSize agentCfg) where - isNotifier = \case - SPNotifier -> True - SPRecipient -> False + subscribeBatch subs' = do + let subs'' :: (NonEmpty (QueueId, C.APrivateAuthKey)) = L.map (first snd) subs' + rs <- liftIO $ smpSubscribeQueues party ca smp srv subs'' + let rs' :: (NonEmpty ((SMPSub, C.APrivateAuthKey), Either SMPClientError ())) = + L.zipWith (first . const) subs' rs + rs'' :: [Either (SMPSub, SMPClientError) (SMPSub, C.APrivateAuthKey)] = + map (\(sub, r) -> bimap (fst sub,) (const sub) r) $ L.toList rs' + (errs, oks) = partitionEithers rs'' + (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs + mapM_ (atomically . addSubscription ca srv) oks + mapM_ (notify ca . CAResubscribed srv) $ L.nonEmpty $ map fst oks + mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs + mapM_ (notify ca . CASubError srv) $ L.nonEmpty finalErrs + mapM_ (throwE . snd) $ listToMaybe tempErrs - subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateAuthKey)] -> ExceptT SMPClientError IO () - subscribe_ smp party = mapM_ subscribeBatch . toChunks (agentSubsBatchSize agentCfg) - where - subscribeBatch subs' = do - let subs'' :: (NonEmpty (QueueId, C.APrivateAuthKey)) = L.map (first snd) subs' - rs <- liftIO $ smpSubscribeQueues party ca smp srv subs'' - let rs' :: (NonEmpty ((SMPSub, C.APrivateAuthKey), Either SMPClientError ())) = - L.zipWith (first . const) subs' rs - rs'' :: [Either (SMPSub, SMPClientError) (SMPSub, C.APrivateAuthKey)] = - map (\(sub, r) -> bimap (fst sub,) (const sub) r) $ L.toList rs' - (errs, oks) = partitionEithers rs'' - (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs - mapM_ (atomically . addSubscription ca srv) oks - mapM_ (liftIO . notify . CAResubscribed srv) $ L.nonEmpty $ map fst oks - mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs - mapM_ (liftIO . notify . CASubError srv) $ L.nonEmpty finalErrs - mapM_ (throwE . snd) $ listToMaybe tempErrs +notify :: MonadIO m => SMPClientAgent -> SMPClientAgentEvent -> m () +notify ca evt = atomically $ writeTBQueue (agentQ ca) evt +{-# INLINE notify #-} - notify :: SMPClientAgentEvent -> IO () - notify evt = atomically $ writeTBQueue (agentQ ca) evt - -lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe SMPClient) +lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe (OwnServer, SMPClient)) lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookup sessId smpSessions closeSMPClientAgent :: SMPClientAgent -> IO () closeSMPClientAgent c = do + atomically $ writeTVar (active c) False closeSMPServerClients c - cancelActions $ reconnections c - cancelActions $ asyncClients c + atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect + where + cancelReconnect :: SessionVar (Async ()) -> IO () + cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel closeSMPServerClients :: SMPClientAgent -> IO () closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_ (forkIO . closeClient) where closeClient v = atomically (readTMVar $ sessionVar v) >>= \case - Right smp -> closeProtocolClient smp `catchAll_` pure () + Right (_, smp) -> closeProtocolClient smp `catchAll_` pure () _ -> pure () cancelActions :: Foldable f => TVar (f (Async ())) -> IO () diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index cc698b344..32d92faf3 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -12,7 +12,6 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Transport (NTFVersion, supportedClientNTFVRange, supportedNTFHandshakes) import Simplex.Messaging.Protocol (ErrorType) -import Simplex.Messaging.Util (bshow) type NtfClient = ProtocolClient NTFVersion ErrorType NtfResponse @@ -25,7 +24,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Token -> Exc ntfRegisterToken c pKey newTkn = sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case NRTknId tknId dhKey -> pure (tknId, dhKey) - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfVerifyToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> NtfRegCode -> ExceptT NtfClientError IO () ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId @@ -34,7 +33,7 @@ ntfCheckToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClie ntfCheckToken c pKey tknId = sendNtfCommand c (Just pKey) tknId TCHK >>= \case NRTkn stat -> pure stat - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfReplaceToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> DeviceToken -> ExceptT NtfClientError IO () ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId @@ -49,13 +48,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscri ntfCreateSubscription c pKey newSub = sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case NRSubId subId -> pure subId - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfCheckSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO NtfSubStatus ntfCheckSubscription c pKey subId = sendNtfCommand c (Just pKey) subId SCHK >>= \case NRSub stat -> pure stat - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfDeleteSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO () ntfDeleteSubscription = okNtfCommand SDEL @@ -68,4 +67,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey - okNtfCommand cmd c pKey entId = sendNtfCommand c (Just pKey) entId cmd >>= \case NROk -> return () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 37eff1e94..892560660 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -31,7 +31,7 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -98,7 +98,9 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do stopServer = do withNtfLog closeStoreLog saveServerStats - asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread)) + NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber + liftIO $ readTVarIO smpSubscribers >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (deRefWeak >=> mapM_ killThread)) + liftIO $ closeSMPClientAgent smpAgent serverStatsThread_ :: NtfServerConfig -> [M ()] serverStatsThread_ NtfServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = @@ -218,35 +220,38 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge receiveSMP :: M () receiveSMP = forever $ do - ((_, srv, _), _, _, _tType, ntfId, msgOrErr) <- atomically $ readTBQueue msgQ - let smpQueue = SMPQueueNtf srv ntfId - case msgOrErr of - Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- liftIO getSystemTime - st <- asks store - NtfPushServer {pushQ} <- asks pushServer - stats <- asks serverStats - atomically $ updatePeriodStats (activeSubs stats) ntfId - atomically $ - findNtfSubscriptionToken st smpQueue - >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) - incNtfStat ntfReceived - Right SMP.END -> updateSubStatus smpQueue NSEnd - Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e - Right _ -> logError $ "SMP server unexpected response" - Left e -> logError $ "SMP client error: " <> tshow e + ((_, srv, _), _, _, ts) <- atomically $ readTBQueue msgQ + forM ts $ \(ntfId, t) -> case t of + STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen + STResponse {} -> pure () -- it was already reported as timeout error + STEvent msgOrErr -> do + let smpQueue = SMPQueueNtf srv ntfId + case msgOrErr of + Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do + ntfTs <- liftIO getSystemTime + st <- asks store + NtfPushServer {pushQ} <- asks pushServer + stats <- asks serverStats + atomically $ updatePeriodStats (activeSubs stats) ntfId + atomically $ + findNtfSubscriptionToken st smpQueue + >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) + incNtfStat ntfReceived + Right SMP.END -> updateSubStatus smpQueue NSEnd + Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e + Right _ -> logError $ "SMP server unexpected response" + Left e -> logError $ "SMP client error: " <> tshow e receiveAgent = forever $ atomically (readTBQueue agentQ) >>= \case - CAConnected _ -> pure () + CAConnected srv -> + logInfo $ "SMP server reconnected " <> showServer' srv CADisconnected srv subs -> do logSubStatus srv "disconnected" $ length subs forM_ subs $ \(_, ntfId) -> do let smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSInactive - CAReconnected srv -> - logInfo $ "SMP server reconnected " <> showServer' srv CAResubscribed srv subs -> do forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive logSubStatus srv "resubscribed" $ length subs diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index cf0ffd4bc..868ca3c80 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -6,6 +6,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} @@ -72,8 +73,8 @@ import GHC.Stats (getRTSStats) import GHC.TypeLits (KnownNat) import Network.Socket (ServiceName, Socket, socketToHandle) import Simplex.Messaging.Agent.Lock -import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError) -import Simplex.Messaging.Client.Agent (SMPClientAgent (..), SMPClientAgentEvent (..), getSMPServerClient', lookupSMPServerClient) +import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError, temporaryClientError) +import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -142,7 +143,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do : receiveFromProxyAgent pa : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg ) - `finally` withLock' (savingLock s) "final" (saveServer False) + `finally` withLock' (savingLock s) "final" (saveServer False >> closeServer) where runServer :: (ServiceName, ATransport) -> M () runServer (tcpPort, ATransport t) = do @@ -156,6 +157,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do saveServer :: Bool -> M () saveServer keepMsgs = withLog closeStoreLog >> saveServerMessages keepMsgs >> saveServerStats + closeServer :: M () + closeServer = asks (smpAgent . proxyAgent) >>= liftIO . closeSMPClientAgent + serverThread :: forall s. Server -> @@ -199,7 +203,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do CAConnected srv -> logInfo $ "SMP server connected " <> showServer' srv CADisconnected srv [] -> logInfo $ "SMP server disconnected " <> showServer' srv CADisconnected srv subs -> logError $ "SMP server disconnected " <> showServer' srv <> " / subscriptions: " <> tshow (length subs) - CAReconnected srv -> logInfo $ "SMP server reconnected " <> showServer' srv CAResubscribed srv subs -> logError $ "SMP server resubscribed " <> showServer' srv <> " / subscriptions: " <> tshow (length subs) CASubError srv errs -> logError $ "SMP server subscription errors " <> showServer' srv <> " / errors: " <> tshow (length errs) where @@ -243,7 +246,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -262,32 +265,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 psNtf <- atomically $ periodStatCounts activeQueuesNtf ts + pRelays' <- atomically $ getResetProxyStatsData pRelays + pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn + pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds + pMsgFwdsOwn' <- atomically $ getResetProxyStatsData pMsgFwdsOwn + pMsgFwdsRecv' <- atomically $ swapTVar pMsgFwdsRecv 0 qCount' <- readTVarIO qCount msgCount' <- readTVarIO msgCount hPutStrLn h $ intercalate "," - [ iso8601Show $ utctDay fromTime', - show qCreated', - show qSecured', - show qDeletedAll', - show msgSent', - show msgRecv', - dayCount ps, - weekCount ps, - monthCount ps, - show msgSentNtf', - show msgRecvNtf', - dayCount psNtf, - weekCount psNtf, - monthCount psNtf, - show qCount', - show msgCount', - show msgExpired', - show qDeletedNew', - show qDeletedSecured' - ] + ( [ iso8601Show $ utctDay fromTime', + show qCreated', + show qSecured', + show qDeletedAll', + show msgSent', + show msgRecv', + dayCount ps, + weekCount ps, + monthCount ps, + show msgSentNtf', + show msgRecvNtf', + dayCount psNtf, + weekCount psNtf, + monthCount psNtf, + show qCount', + show msgCount', + show msgExpired', + show qDeletedNew', + show qDeletedSecured' + ] + <> showProxyStats pRelays' + <> showProxyStats pRelaysOwn' + <> showProxyStats pMsgFwds' + <> showProxyStats pMsgFwdsOwn' + <> [show pMsgFwdsRecv'] + ) liftIO $ threadDelay' interval + where + showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = + [show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther] monitorServerRates :: Int -> Int64 -> M () monitorServerRates nBuckets bucketWidth = do @@ -416,7 +433,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do subscriptions' <- bshow . M.size <$> readTVarIO subscriptions hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] CPStats -> withAdminRole $ do - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats + ss <- unliftIO u $ asks serverStats + let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO () + putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n" + putProxyStat :: ByteString -> (ServerStats -> ProxyStats) -> IO () + putProxyStat label var = do + ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- atomically $ getProxyStatsData $ var ss + B.hPutStr h $ label <> ": requests=" <> bshow _pRequests <> ", successes=" <> bshow _pSuccesses <> ", errorsConnect=" <> bshow _pErrorsConnect <> ", errorsCompat=" <> bshow _pErrorsCompat <> ", errorsOther=" <> bshow _pErrorsOther <> "\n" putStat "fromTime" fromTime putStat "qCreated" qCreated putStat "qSecured" qSecured @@ -429,9 +452,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putStat "msgRecvNtf" msgRecvNtf putStat "qCount" qCount putStat "msgCount" msgCount - where - putStat :: Show a => String -> TVar a -> IO () - putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v + putProxyStat "pRelays" pRelays + putProxyStat "pRelaysOwn" pRelaysOwn + putProxyStat "pMsgFwds" pMsgFwds + putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn + putStat "pMsgFwdsRecv" pMsgFwdsRecv CPStatsClients -> withAdminRole $ do stats' <- unliftIO u (asks clientStats) >>= readTVarIO B.hPutStr h "peerAddresses,socketCount,createdAt,updatedAt,qCreated,qSentSigned,msgSentSigned,msgSentUnsigned,msgDeliveredSigned,proxyRelaysRequested,proxyRelaysConnected,msgSentViaProxy\n" @@ -729,42 +754,60 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth getRelay = do - withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysRequested cs) (+ 1) - ProxyAgent {smpAgent} <- asks proxyAgent - r <- liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) - case r of - PKEY {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1) - _ -> pure () - pure r + ServerStats {pRelays, pRelaysOwn} <- asks serverStats + let inc = mkIncProxyStats pRelays pRelaysOwn + ProxyAgent {smpAgent = a} <- asks proxyAgent + liftIO (runExceptT (getSMPServerClient'' a srv) `catch` (pure . Left . PCEIOError)) >>= \case + Right (own, smp) -> do + inc own pRequests + case proxyResp smp of + r@PKEY {} -> do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1) + r <$ inc own pSuccesses + r -> r <$ inc own pErrorsCompat + Left e -> do + let own = isOwnServer a srv + inc own pRequests + inc own $ if temporaryClientError e then pErrorsConnect else pErrorsOther + pure . ERR $ smpProxyError e where - proxyResp = \case - Left err -> ERR $ smpProxyError err - Right smp -> - let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp - in case compatibleVRange thServerVRange proxiedSMPRelayVRange of - -- Cap the destination relay version range to prevent client version fingerprinting. - -- See comment for proxiedSMPRelayVersion. - Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of - Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey - Nothing -> ERR $ transportErr TENoServerAuth - _ -> ERR $ transportErr TEVersion + proxyResp smp = + let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp + in case compatibleVRange thServerVRange proxiedSMPRelayVRange of + -- Cap the destination relay version range to prevent client version fingerprinting. + -- See comment for proxiedSMPRelayVersion. + Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of + Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey + Nothing -> ERR $ transportErr TENoServerAuth + _ -> ERR $ transportErr TEVersion PFWD fwdV pubKey encBlock -> do - ProxyAgent {smpAgent} <- asks proxyAgent - atomically (lookupSMPServerClient smpAgent sessId) >>= \case - Just smp - | v >= sendingProxySMPVersion -> do - r <- liftIO $ either (ERR . smpProxyError) PRES <$> - runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError) - case r of - PRES {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1) - _ -> pure () - pure r - | otherwise -> pure . ERR $ transportErr TEVersion + ProxyAgent {smpAgent = a} <- asks proxyAgent + ServerStats {pMsgFwds, pMsgFwdsOwn} <- asks serverStats + let inc = mkIncProxyStats pMsgFwds pMsgFwdsOwn + atomically (lookupSMPServerClient a sessId) >>= \case + Just (own, smp) -> do + inc own pRequests + if + | v >= sendingProxySMPVersion -> + liftIO (runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catch` (pure . Left . PCEIOError)) >>= \case + Right r -> do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1) + PRES r <$ inc own pSuccesses + Left e -> case e of + PCEProtocolError {} -> ERR err <$ inc own pSuccesses + _ -> ERR err <$ inc own pErrorsOther + where + err = smpProxyError e + | otherwise -> ERR (transportErr TEVersion) <$ inc own pErrorsCompat where THandleParams {thVersion = v} = thParams smp - Nothing -> pure $ ERR $ PROXY NO_SESSION + Nothing -> inc False pRequests >> inc False pErrorsConnect $> ERR (PROXY NO_SESSION) transportErr :: TransportError -> ErrorType transportErr = PROXY . BROKER . TRANSPORT + mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m () + mkIncProxyStats ps psOwn = \own sel -> do + atomically $ modifyTVar' (sel ps) (+ 1) + when own $ atomically $ modifyTVar' (sel psOwn) (+ 1) processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Either (Transmission (Command 'ProxiedClient)) (Transmission BrokerMsg)) processCommand (qr_, (corrId, queueId, cmd)) = do st <- asks queueStore @@ -1104,6 +1147,8 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, -- encrypt to proxy let fr = FwdResponse {fwdCorrId, fwdResponse = r2} r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr) + stats <- asks serverStats + atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1) pure $ RRES r3 where rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index c33b3abd8..4406e9ac0 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -141,7 +141,7 @@ data Server = Server savingLock :: Lock } -data ProxyAgent = ProxyAgent +newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent } diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 2e4aba88a..d09d39764 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -10,6 +10,7 @@ module Simplex.Messaging.Server.Main where import Control.Concurrent.STM import Control.Monad (void) +import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Ini (lookupValue, readIniFile) @@ -147,6 +148,8 @@ smpServerCLI cfgPath logPath = \# It defines prefferred hostname for destination servers with multiple hostnames.\n\ \# host_mode: public\n\ \# required_host_mode: off\n\n\ + \# The domain suffixes of the relays you operate (space-separated) to count as separate proxy statistics.\n\ + \# own_server_domains: \n\n\ \# SOCKS proxy port for forwarding messages to destination servers.\n\ \# You may need a separate instance of SOCKS proxy for incoming single-hop requests.\n\ \# socks_proxy: localhost:9050\n\n\ @@ -249,6 +252,7 @@ smpServerCLI cfgPath logPath = requiredHostMode = fromMaybe False $ iniOnOff "PROXY" "required_host_mode" ini } }, + ownServerDomains = either (const []) textToOwnServers $ lookupValue "PROXY" "own_server_domains" ini, persistErrorInterval = 30 -- seconds }, allowSMPProxy = True @@ -263,6 +267,8 @@ smpServerCLI cfgPath logPath = "public" -> HMPublic "onion" -> HMOnionViaSocks s -> error . T.unpack $ "Invalid host_mode: " <> s + textToOwnServers :: Text -> [ByteString] + textToOwnServers = map encodeUtf8 . T.words data CliCommand = Init InitOptions diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index d6db73188..d92c58693 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -41,6 +41,11 @@ data ServerStats = ServerStats msgSentNtf :: TVar Int, msgRecvNtf :: TVar Int, activeQueuesNtf :: PeriodStats RecipientId, + pRelays :: ProxyStats, + pRelaysOwn :: ProxyStats, + pMsgFwds :: ProxyStats, + pMsgFwdsOwn :: ProxyStats, + pMsgFwdsRecv :: TVar Int, qCount :: TVar Int, msgCount :: TVar Int } @@ -59,6 +64,11 @@ data ServerStatsData = ServerStatsData _msgSentNtf :: Int, _msgRecvNtf :: Int, _activeQueuesNtf :: PeriodStatsData RecipientId, + _pRelays :: ProxyStatsData, + _pRelaysOwn :: ProxyStatsData, + _pMsgFwds :: ProxyStatsData, + _pMsgFwdsOwn :: ProxyStatsData, + _pMsgFwdsRecv :: Int, _qCount :: Int, _msgCount :: Int } @@ -79,9 +89,14 @@ newServerStats ts = do msgSentNtf <- newTVar 0 msgRecvNtf <- newTVar 0 activeQueuesNtf <- newPeriodStats + pRelays <- newProxyStats + pRelaysOwn <- newProxyStats + pMsgFwds <- newProxyStats + pMsgFwdsOwn <- newProxyStats + pMsgFwdsRecv <- newTVar 0 qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} + pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -98,9 +113,14 @@ getServerStatsData s = do _msgSentNtf <- readTVar $ msgSentNtf s _msgRecvNtf <- readTVar $ msgRecvNtf s _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s + _pRelays <- getProxyStatsData $ pRelays s + _pRelaysOwn <- getProxyStatsData $ pRelaysOwn s + _pMsgFwds <- getProxyStatsData $ pMsgFwds s + _pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s + _pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -117,28 +137,42 @@ setServerStats s d = do writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + setProxyStats (pRelays s) $! _pRelays d + setProxyStats (pRelaysOwn s) $! _pRelaysOwn d + setProxyStats (pMsgFwds s) $! _pMsgFwds d + setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d + writeTVar (pMsgFwdsRecv s) $! _pMsgFwdsRecv d writeTVar (qCount s) $! _qCount d writeTVar (msgCount s) $! _msgCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} = + strEncode d = B.unlines - [ "fromTime=" <> strEncode _fromTime, - "qCreated=" <> strEncode _qCreated, - "qSecured=" <> strEncode _qSecured, - "qDeletedAll=" <> strEncode _qDeletedAll, - "qDeletedNew=" <> strEncode _qDeletedNew, - "qDeletedSecured=" <> strEncode _qDeletedSecured, - "qCount=" <> strEncode _qCount, - "msgSent=" <> strEncode _msgSent, - "msgRecv=" <> strEncode _msgRecv, - "msgExpired=" <> strEncode _msgExpired, - "msgSentNtf=" <> strEncode _msgSentNtf, - "msgRecvNtf=" <> strEncode _msgRecvNtf, + [ "fromTime=" <> strEncode (_fromTime d), + "qCreated=" <> strEncode (_qCreated d), + "qSecured=" <> strEncode (_qSecured d), + "qDeletedAll=" <> strEncode (_qDeletedAll d), + "qDeletedNew=" <> strEncode (_qDeletedNew d), + "qDeletedSecured=" <> strEncode (_qDeletedSecured d), + "qCount=" <> strEncode (_qCount d), + "msgSent=" <> strEncode (_msgSent d), + "msgRecv=" <> strEncode (_msgRecv d), + "msgExpired=" <> strEncode (_msgExpired d), + "msgSentNtf=" <> strEncode (_msgSentNtf d), + "msgRecvNtf=" <> strEncode (_msgRecvNtf d), "activeQueues:", - strEncode _activeQueues, + strEncode (_activeQueues d), "activeQueuesNtf:", - strEncode _activeQueuesNtf + strEncode (_activeQueuesNtf d), + "pRelays:", + strEncode (_pRelays d), + "pRelaysOwn:", + strEncode (_pRelaysOwn d), + "pMsgFwds:", + strEncode (_pMsgFwds d), + "pMsgFwdsOwn:", + strEncode (_pMsgFwdsOwn d), + "pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d) ] strP = do _fromTime <- "fromTime=" *> strP <* A.endOfLine @@ -165,7 +199,17 @@ instance StrEncoding ServerStatsData where optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine _ -> pure newPeriodStatsData - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0} + _pRelays <- proxyStatsP "pRelays:" + _pRelaysOwn <- proxyStatsP "pRelaysOwn:" + _pMsgFwds <- proxyStatsP "pMsgFwds:" + _pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:" + _pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0 + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0} + where + proxyStatsP key = + optional (A.string key >> A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> pure newProxyStatsData data PeriodStats a = PeriodStats { day :: TVar (Set a), @@ -240,6 +284,81 @@ updatePeriodStats stats pId = do where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) +data ProxyStats = ProxyStats + { pRequests :: TVar Int, + pSuccesses :: TVar Int, -- includes destination server error responses that will be forwarded to the client + pErrorsConnect :: TVar Int, + pErrorsCompat :: TVar Int, + pErrorsOther :: TVar Int + } + +newProxyStats :: STM ProxyStats +newProxyStats = do + pRequests <- newTVar 0 + pSuccesses <- newTVar 0 + pErrorsConnect <- newTVar 0 + pErrorsCompat <- newTVar 0 + pErrorsOther <- newTVar 0 + pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther} + +data ProxyStatsData = ProxyStatsData + { _pRequests :: Int, + _pSuccesses :: Int, + _pErrorsConnect :: Int, + _pErrorsCompat :: Int, + _pErrorsOther :: Int + } + deriving (Show) + +newProxyStatsData :: ProxyStatsData +newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0} + +getProxyStatsData :: ProxyStats -> STM ProxyStatsData +getProxyStatsData s = do + _pRequests <- readTVar $ pRequests s + _pSuccesses <- readTVar $ pSuccesses s + _pErrorsConnect <- readTVar $ pErrorsConnect s + _pErrorsCompat <- readTVar $ pErrorsCompat s + _pErrorsOther <- readTVar $ pErrorsOther s + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + +getResetProxyStatsData :: ProxyStats -> STM ProxyStatsData +getResetProxyStatsData s = do + _pRequests <- swapTVar (pRequests s) 0 + _pSuccesses <- swapTVar (pSuccesses s) 0 + _pErrorsConnect <- swapTVar (pErrorsConnect s) 0 + _pErrorsCompat <- swapTVar (pErrorsCompat s) 0 + _pErrorsOther <- swapTVar (pErrorsOther s) 0 + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + +setProxyStats :: ProxyStats -> ProxyStatsData -> STM () +setProxyStats s d = do + writeTVar (pRequests s) $! _pRequests d + writeTVar (pSuccesses s) $! _pSuccesses d + writeTVar (pErrorsConnect s) $! _pErrorsConnect d + writeTVar (pErrorsCompat s) $! _pErrorsCompat d + writeTVar (pErrorsOther s) $! _pErrorsOther d + +instance StrEncoding ProxyStatsData where + strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = + "requests=" + <> strEncode _pRequests + <> "\nsuccesses=" + <> strEncode _pSuccesses + <> "\nerrorsConnect=" + <> strEncode _pErrorsConnect + <> "\nerrorsCompat=" + <> strEncode _pErrorsCompat + <> "\nerrorsOther=" + <> strEncode _pErrorsOther + strP = do + _pRequests <- "requests=" *> strP <* A.endOfLine + _pSuccesses <- "successes=" *> strP <* A.endOfLine + _pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine + _pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine + _pErrorsOther <- "errorsOther=" *> strP + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + -- counter -> occurences newtype Histogram = Histogram (IntMap Int) deriving (Show) diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index a880cfaad..b42d5b378 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -152,12 +152,14 @@ timeoutThrow :: MonadUnliftIO m => e -> Int -> ExceptT e m a -> ExceptT e m a timeoutThrow e ms action = ExceptT (sequence <$> (ms `timeout` runExceptT action)) >>= maybe (throwError e) pure threadDelay' :: Int64 -> IO () -threadDelay' time - | time <= 0 = pure () -threadDelay' time = do - let maxWait = min time $ fromIntegral (maxBound :: Int) - threadDelay $ fromIntegral maxWait - when (maxWait /= time) $ threadDelay' (time - maxWait) +threadDelay' = loop + where + loop time + | time <= 0 = pure () + | otherwise = do + let maxWait = min time $ fromIntegral (maxBound :: Int) + threadDelay $ fromIntegral maxWait + loop $ time - maxWait diffToMicroseconds :: NominalDiffTime -> Int64 diffToMicroseconds diff = fromIntegral ((truncate $ diff * 1000000) :: Integer) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 6ef2e00aa..d7bddd9ec 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -51,7 +51,7 @@ module AgentTests.FunctionalAPITests where import AgentTests.ConnectionRequestTests (connReqData, queueAddr, testE2ERatchetParams12) -import Control.Concurrent (killThread, threadDelay) +import Control.Concurrent (forkIO, killThread, threadDelay) import Control.Monad import Control.Monad.Except import Control.Monad.Reader @@ -78,7 +78,6 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore) import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT) import qualified Simplex.Messaging.Agent.Protocol as A -import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew)) import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction') import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), SMPProxyFallback (..), SMPProxyMode (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig) @@ -439,7 +438,8 @@ functionalAPITests t = do it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t describe "user network info" $ do it "should wait for user network" testWaitForUserNetwork - it "should not reset offline interval while offline" testDoNotResetOfflineInterval + it "should not reset online to offline if happens too quickly" testDoNotResetOnlineToOffline + it "should resume multiple threads" testResumeMultipleThreads testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do @@ -2710,9 +2710,7 @@ testWaitForUserNetwork = do noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNNone False networkDelay a 100000 - networkDelay a 150000 - networkDelay a 200000 - networkDelay a 200000 + networkDelay a 100000 setUserNetworkInfo a $ UserNetworkInfo UNCellular True noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNCellular False @@ -2722,36 +2720,66 @@ testWaitForUserNetwork = do (networkDelay a 50000) noNetworkDelay a where - aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} + aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0} -testDoNotResetOfflineInterval :: IO () -testDoNotResetOfflineInterval = do +testDoNotResetOnlineToOffline :: IO () +testDoNotResetOnlineToOffline = do a <- getSMPAgentClient' 1 aCfg initAgentServers testDB noNetworkDelay a setUserNetworkInfo a $ UserNetworkInfo UNWifi False networkDelay a 100000 - networkDelay a 150000 - setUserNetworkInfo a $ UserNetworkInfo UNCellular False - networkDelay a 200000 - setUserNetworkInfo a $ UserNetworkInfo UNNone False - networkDelay a 200000 - setUserNetworkInfo a $ UserNetworkInfo UNCellular True + setUserNetworkInfo a $ UserNetworkInfo UNWifi False + setUserNetworkInfo a $ UserNetworkInfo UNWifi True noNetworkDelay a - setUserNetworkInfo a $ UserNetworkInfo UNCellular False + setUserNetworkInfo a $ UserNetworkInfo UNWifi False -- ingnored + noNetworkDelay a + threadDelay 100000 + setUserNetworkInfo a $ UserNetworkInfo UNWifi False networkDelay a 100000 + setUserNetworkInfo a $ UserNetworkInfo UNNone False + networkDelay a 100000 + setUserNetworkInfo a $ UserNetworkInfo UNWifi True + setUserNetworkInfo a $ UserNetworkInfo UNNone False -- ingnored + noNetworkDelay a where - aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}} + aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0.1} + +testResumeMultipleThreads :: IO () +testResumeMultipleThreads = do + a <- getSMPAgentClient' 1 aCfg initAgentServers testDB + noNetworkDelay a + setUserNetworkInfo a $ UserNetworkInfo UNNone False + vs <- + replicateM 50000 $ do + v <- newEmptyTMVarIO + void . forkIO $ waitNetwork a >>= atomically . putTMVar v + pure v + threadDelay 1000000 + setUserNetworkInfo a $ UserNetworkInfo UNCellular True + ts <- mapM (atomically . readTMVar) vs + -- print $ minimum ts + -- print $ maximum ts + -- print $ sum ts `div` fromIntegral (length ts) + let average = sum ts `div` fromIntegral (length ts) + average < 3000000 `shouldBe` True + maximum ts < 4000000 `shouldBe` True + where + aCfg = agentCfg {userOfflineDelay = 0} noNetworkDelay :: AgentClient -> IO () -noNetworkDelay a = (10000 >) <$> waitNetwork a `shouldReturn` True +noNetworkDelay a = do + d <- waitNetwork a + unless (d < 10000) $ expectationFailure $ "expected no delay, d = " <> show d networkDelay :: AgentClient -> Int64 -> IO () -networkDelay a d' = (\d -> d' < d && d < d' + 15000) <$> waitNetwork a `shouldReturn` True +networkDelay a d' = do + d <- waitNetwork a + unless (d' < d && d < d' + 15000) $ expectationFailure $ "expected delay " <> show d' <> ", d = " <> show d waitNetwork :: AgentClient -> IO Int64 waitNetwork a = do t <- getCurrentTime - waitForUserNetwork a `runReaderT` agentEnv a + waitForUserNetwork a t' <- getCurrentTime pure $ diffToMicroseconds $ diffUTCTime t' t diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 436dd0eca..63466b9d7 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -709,15 +709,15 @@ testGetNextRcvChunkToDownload st = do withTransaction st $ \db -> do Right Nothing <- getNextRcvChunkToDownload db xftpServer1 86400 - Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1" - Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True Left e <- getNextRcvChunkToDownload db xftpServer1 86400 show e `shouldContain` "ConversionFailed" DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] - Right (Just RcvFileChunk {rcvFileEntityId}) <- getNextRcvChunkToDownload db xftpServer1 86400 + Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400 rcvFileEntityId `shouldBe` fId2 testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation @@ -726,10 +726,10 @@ testGetNextRcvFileToDecrypt st = do withTransaction st $ \db -> do Right Nothing <- getNextRcvFileToDecrypt db 86400 - Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 1" DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1" - Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 2" Left e <- getNextRcvFileToDecrypt db 86400 diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 467700784..1c458a062 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -120,7 +120,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do -- send via proxy to unsecured queue Right () <- proxySMPMessage pc sess Nothing sndId noMsgFlags msg -- receive 1 - (_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})) <- atomically $ readTBQueue msgQ + (_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})))]) <- atomically $ readTBQueue msgQ liftIO $ dec msgId encBody `shouldBe` Right msg ackSMPMessage rc rPriv rcvId msgId -- secure queue @@ -129,7 +129,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do -- send via proxy to secured queue Right () <- proxySMPMessage pc sess (Just sPriv) sndId noMsgFlags msg' -- receive 2 - (_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})) <- atomically $ readTBQueue msgQ + (_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})))]) <- atomically $ readTBQueue msgQ liftIO $ dec msgId' encBody' `shouldBe` Right msg' ackSMPMessage rc rPriv rcvId msgId' diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index b0ed67913..b0c90ca96 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5 diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index 0610bf48d..8f42ec16a 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -179,7 +179,7 @@ testXFTPAgentSendReceiveRedirect = withXFTPServer $ do withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> do FileDescriptionURI {description} <- either fail pure $ strDecode uri - rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing + rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing True rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 65536 totalSize) -- extra RFPROG before switching to real file rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 4194304 totalSize) rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 8388608 totalSize) @@ -223,7 +223,7 @@ testXFTPAgentSendReceiveNoRedirect = withXFTPServer $ do FileDescriptionURI {description} <- either fail pure $ strDecode uri let ValidFileDescription FileDescription {redirect} = description redirect `shouldBe` Nothing - rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing + rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing True -- NO extra "RFPROG 65k 65k" before switching to real file rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 4194304 totalSize) rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 5242880 totalSize) @@ -311,7 +311,7 @@ testReceive' rcp rfd originalFilePath = testReceiveCF' rcp rfd Nothing originalF testReceiveCF' :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> FilePath -> Int64 -> ExceptT AgentErrorType IO RcvFileId testReceiveCF' rcp rfd cfArgs originalFilePath size = do - rfId <- xftpReceiveFile rcp 1 rfd cfArgs + rfId <- xftpReceiveFile rcp 1 rfd cfArgs True rfProgress rcp size ("", rfId', RFDONE path) <- rfGet rcp liftIO $ do @@ -336,7 +336,7 @@ testXFTPAgentReceiveRestore = do -- receive file - should not succeed with server down rfId <- withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> runRight $ do xftpStartWorkers rcp (Just recipientFiles) - rfId <- xftpReceiveFile rcp 1 rfd Nothing + rfId <- xftpReceiveFile rcp 1 rfd Nothing True liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure rfId @@ -380,7 +380,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do -- receive file - should not succeed with server down rfId <- withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> runRight $ do xftpStartWorkers rcp (Just recipientFiles) - rfId <- xftpReceiveFile rcp 1 rfd Nothing + rfId <- xftpReceiveFile rcp 1 rfd Nothing True liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure rfId @@ -392,8 +392,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do -- receive file - should fail with AUTH error withAgent 3 agentCfg initAgentServers testDB2 $ \rcp' -> do runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) - ("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <- - rfGet rcp' + ("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp' rfId' `shouldBe` rfId -- tmp path should be removed after permanent error @@ -507,8 +506,8 @@ testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $ -- receive file - should fail with AUTH error withAgent 3 agentCfg initAgentServers testDB2 $ \rcp2 -> runRight $ do xftpStartWorkers rcp2 (Just recipientFiles) - rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing - ("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <- + rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing True + ("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp2 liftIO $ rfId' `shouldBe` rfId @@ -545,8 +544,8 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do -- receive file - should fail with AUTH error withAgent 5 agentCfg initAgentServers testDB3 $ \rcp2 -> runRight $ do xftpStartWorkers rcp2 (Just recipientFiles) - rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing - ("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <- + rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing True + ("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp2 liftIO $ rfId' `shouldBe` rfId @@ -580,8 +579,8 @@ testXFTPAgentDeleteOnServer = withGlobalLogging logCfgNoLogs $ runRight_ . void $ do -- receive file 1 again - rfId1 <- xftpReceiveFile rcp 1 rfd1_2 Nothing - ("", rfId1', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <- + rfId1 <- xftpReceiveFile rcp 1 rfd1_2 Nothing True + ("", rfId1', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp liftIO $ rfId1 `shouldBe` rfId1' @@ -613,8 +612,8 @@ testXFTPAgentExpiredOnServer = withGlobalLogging logCfgNoLogs $ do -- receive file 1 again - should fail with AUTH error runRight $ do - rfId <- xftpReceiveFile rcp 1 rfd1_2 Nothing - ("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <- + rfId <- xftpReceiveFile rcp 1 rfd1_2 Nothing True + ("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp liftIO $ rfId' `shouldBe` rfId