Merge branch 'master' into ep/log-events

This commit is contained in:
Evgeny Poberezkin
2024-05-21 12:52:55 +01:00
25 changed files with 606 additions and 365 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 5.8.0.1
version: 5.8.0.2
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+2 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 5.8.0.1
version: 5.8.0.2
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
@@ -105,6 +105,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
+20 -15
View File
@@ -111,8 +111,8 @@ closeXFTPAgent a = do
where
stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker)
xftpReceiveFile' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> AM RcvFileId
xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redirect}) cfArgs = do
xftpReceiveFile' :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> Bool -> AM RcvFileId
xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redirect}) cfArgs approvedRelays = do
g <- asks random
prefixPath <- lift $ getPrefixPath "rcv.xftp"
createDirectory prefixPath
@@ -123,7 +123,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redi
lift $ createEmptyFile =<< toFSFilePath relSavePath
let saveFile = CryptoFile relSavePath cfArgs
fId <- case redirect of
Nothing -> withStore c $ \db -> createRcvFile db g userId fd relPrefixPath relTmpPath saveFile
Nothing -> withStore c $ \db -> createRcvFile db g userId fd relPrefixPath relTmpPath saveFile approvedRelays
Just _ -> do
-- prepare description paths
let relTmpPathRedirect = relPrefixPath </> "xftp.redirect-encrypted"
@@ -133,7 +133,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks, redi
cfArgsRedirect <- atomically $ CF.randomArgs g
let saveFileRedirect = CryptoFile relSavePathRedirect $ Just cfArgsRedirect
-- create download tasks
withStore c $ \db -> createRcvFileRedirect db g userId fd relPrefixPath relTmpPathRedirect saveFileRedirect relTmpPath saveFile
withStore c $ \db -> createRcvFileRedirect db g userId fd relPrefixPath relTmpPathRedirect saveFileRedirect relTmpPath saveFile approvedRelays
forM_ chunks (downloadChunk c)
pure fId
@@ -175,12 +175,12 @@ runXFTPRcvWorker c srv Worker {doWork} = do
runXFTPOperation :: AgentConfig -> AM ()
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} =
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas")
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
liftIO $ waitForUserNetwork c
downloadFileChunk fc replica
downloadFileChunk fc replica approvedRelays
`catchAgentError` \e -> retryOnError c "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
where
retryLoop loop e replicaDelay = do
@@ -190,9 +190,10 @@ runXFTPRcvWorker c srv Worker {doWork} = do
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
loop
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> AM ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do
retryDone = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwError $ XFTP "" XFTP.NOT_APPROVED
fsFileTmpPath <- lift $ toFSFilePath fileTmpPath
chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
@@ -213,6 +214,10 @@ runXFTPRcvWorker c srv Worker {doWork} = do
when complete . lift . void $
getXFTPRcvWorker True c Nothing
where
ipAddressProtected' :: AM Bool
ipAddressProtected' = do
cfg <- liftIO $ getNetworkConfig' c
pure $ ipAddressProtected cfg srv
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
receivedChunkSize ch@RcvFileChunk {chunkSize = s}
@@ -231,11 +236,11 @@ retryOnError c name loop done e = do
logWarn c $ name <> " error: " <> tshow e
if temporaryAgentError e then loop else done
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> AM ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr
notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr
withStore' c $ \db -> updateRcvFileError db rcvFileId (show err)
notify c rcvFileEntityId $ RFERR err
runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM ()
runXFTPRcvLocalWorker c Worker {doWork} = do
@@ -249,7 +254,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
runXFTPOperation AgentConfig {rcvFilesTTL} =
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
decryptFile f `catchAgentError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath
decryptFile :: RcvFile -> AM ()
decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do
let CryptoFile savePath cfArgs = saveFile
+8 -6
View File
@@ -14,6 +14,7 @@ module Simplex.FileTransfer.Client where
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.ByteString.Builder (Builder, byteString)
@@ -38,6 +39,7 @@ import Simplex.Messaging.Client
defaultNetworkConfig,
proxyUsername,
transportClientConfig,
unexpectedResponse,
)
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
@@ -56,7 +58,7 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost,
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Client
import Simplex.Messaging.Transport.HTTP2.File
import Simplex.Messaging.Util (bshow, liftEitherWith, liftError', tshow, whenM)
import Simplex.Messaging.Util (liftEitherWith, liftError', tshow, whenM)
import Simplex.Messaging.Version
import UnliftIO
import UnliftIO.Directory
@@ -228,13 +230,13 @@ createXFTPChunk ::
createXFTPChunk c spKey file rcps auth_ =
sendXFTPCommand c spKey "" (FNEW file rcps auth_) Nothing >>= \case
(FRSndIds sId rIds, body) -> noFile body (sId, rIds)
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
addXFTPRecipients :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> NonEmpty C.APublicAuthKey -> ExceptT XFTPClientError IO (NonEmpty RecipientId)
addXFTPRecipients c spKey fId rcps =
sendXFTPCommand c spKey fId (FADD rcps) Nothing >>= \case
(FRRcvIds rIds, body) -> noFile body rIds
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
uploadXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT XFTPClientError IO ()
uploadXFTPChunk c spKey fId chunkSpec =
@@ -262,7 +264,7 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
receiveEncFile chunkPart cbState chunkSpec `catchError` \e ->
whenM (doesFileExist filePath) (removeFile filePath) >> throwError e
_ -> throwError $ PCEResponseError NO_FILE
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int
xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ =
@@ -286,12 +288,12 @@ pingXFTP c@XFTPClient {thParams} = do
(r, _) <- sendXFTPTransmission c t Nothing
case r of
FRPong -> pure ()
_ -> throwError $ PCEUnexpectedResponse $ bshow r
_ -> throwE $ unexpectedResponse r
okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO ()
okResponse = \case
(FROk, body) -> noFile body ()
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
-- TODO this currently does not check anything because response size is not set and bodyPart is always Just
noFile :: HTTP2Body -> a -> ExceptT XFTPClientError IO a
+4
View File
@@ -217,6 +217,8 @@ data XFTPErrorType
TIMEOUT
| -- | bad redirect data
REDIRECT {redirectError :: String}
| -- | cannot proceed with download from not approved relays without proxy
NOT_APPROVED
| -- | internal server error
INTERNAL
| -- | used internally, never returned by the server (to be removed)
@@ -249,6 +251,7 @@ instance Encoding XFTPErrorType where
FILE_IO -> "FILE_IO"
TIMEOUT -> "TIMEOUT"
REDIRECT err -> "REDIRECT " <> smpEncode err
NOT_APPROVED -> "NOT_APPROVED"
INTERNAL -> "INTERNAL"
DUPLICATE_ -> "DUPLICATE_"
@@ -268,6 +271,7 @@ instance Encoding XFTPErrorType where
"FILE_IO" -> pure FILE_IO
"TIMEOUT" -> pure TIMEOUT
"REDIRECT" -> REDIRECT <$> _smpP
"NOT_APPROVED" -> pure NOT_APPROVED
"INTERNAL" -> pure INTERNAL
"DUPLICATE_" -> pure DUPLICATE_
_ -> fail "bad error type"
+89 -77
View File
@@ -83,6 +83,7 @@ module Simplex.Messaging.Agent
setNtfServers,
setNetworkConfig,
getNetworkConfig,
getNetworkConfig',
setUserNetworkInfo,
reconnectAllServers,
registerNtfToken,
@@ -159,7 +160,7 @@ import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission, TransmissionType (..))
import Simplex.Messaging.Client (ProtocolClient (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
@@ -170,7 +171,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth)
import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import qualified Simplex.Messaging.TMap as TM
@@ -426,28 +427,20 @@ setNetworkConfig c@AgentClient {useNetworkConfig} cfg' = do
-- returns fast network config
getNetworkConfig :: AgentClient -> IO NetworkConfig
getNetworkConfig = fmap snd . readTVarIO . useNetworkConfig
getNetworkConfig = getNetworkConfig'
{-# INLINE getNetworkConfig #-}
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()
setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkDelay} netInfo = withAgentEnv' c $ do
ni <- asks $ userNetworkInterval . config
let d = initialInterval ni
off <- atomically $ do
wasOnline <- isOnline <$> swapTVar userNetworkInfo netInfo
let off = wasOnline && not (isOnline netInfo)
when off $ writeTVar userNetworkDelay d
pure off
liftIO . when off . void . forkIO $
growOfflineDelay 0 d ni
setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkUpdated} ni = withAgentEnv' c $ do
ts' <- liftIO getCurrentTime
i <- asks $ userOfflineDelay . config
-- if network offline event happens in less than `userOfflineDelay` after the previous event, it is ignored
atomically . whenM ((isOnline ni ||) <$> notRecentlyChanged ts' i) $ do
writeTVar userNetworkInfo ni
writeTVar userNetworkUpdated $ Just ts'
where
growOfflineDelay elapsed d ni = do
online <- waitOnlineOrDelay c d
unless online $ do
let elapsed' = elapsed + d
d' = nextRetryDelay elapsed' d ni
atomically $ writeTVar userNetworkDelay d'
growOfflineDelay elapsed' d' ni
notRecentlyChanged ts' i =
maybe True (\ts -> diffUTCTime ts' ts > i) <$> readTVar userNetworkUpdated
reconnectAllServers :: AgentClient -> IO ()
reconnectAllServers c = do
@@ -491,8 +484,8 @@ xftpStartWorkers c = withAgentEnv c . startXFTPWorkers c
{-# INLINE xftpStartWorkers #-}
-- | Receive XFTP file
xftpReceiveFile :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> AE RcvFileId
xftpReceiveFile c = withAgentEnv c .:. xftpReceiveFile' c
xftpReceiveFile :: AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> Bool -> AE RcvFileId
xftpReceiveFile c = withAgentEnv c .:: xftpReceiveFile' c
{-# INLINE xftpReceiveFile #-}
-- | Delete XFTP rcv file (deletes work files from file system and db records)
@@ -2006,14 +1999,10 @@ getSMPServer c userId = withUserServers c userId pickServer
{-# INLINE getSMPServer #-}
subscriber :: AgentClient -> AM' ()
subscriber c@AgentClient {subQ, msgQ} = forever $ do
subscriber c@AgentClient {msgQ} = forever $ do
t <- atomically $ readTBQueue msgQ
agentOperationBracket c AORcvNetwork waitUntilActive $
tryAgentError' (processSMPTransmission c t) >>= \case
Left e -> do
logError c $ tshow e
atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e)
Right _ -> return ()
processSMPTransmissions c t
cleanupManager :: AgentClient -> AM' ()
cleanupManager c@AgentClient {subQ} = do
@@ -2087,28 +2076,72 @@ cleanupManager c@AgentClient {subQ} = do
data ACKd = ACKd | ACKPending
-- | make sure to ACK or throw in each message processing branch
-- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL
processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM ()
processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do
(rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId)
processSMP rq conn $ toConnData conn
-- | Make sure to ACK or throw in each message processing branch
-- It cannot be finally, as sometimes it needs to be ACK+DEL,
-- and sometimes ACK has to be sent from the consumer.
processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do
upConnIds <- newTVarIO []
forM_ ts $ \(entId, t) -> case t of
STEvent msgOrErr ->
withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
Right msg -> processSMP rq conn (toConnData conn) msg
Left e -> lift $ notifyErr connId e
STResponse (Cmd SRecipient cmd) respOrErr ->
withRcvConn entId $ \rq conn -> case cmd of
-- TODO process expired responses to ACK and DEL
SMP.SUB -> case respOrErr of
Right SMP.OK -> processSubOk rq upConnIds
Right msg@SMP.MSG {} -> do
processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
processSMP rq conn (toConnData conn) msg
Right r -> processSubErr rq $ unexpectedResponse r
Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
_ -> pure ()
STResponse {} -> pure () -- TODO process expired responses to sent messages
STUnexpectedError e -> do
logServer "<--" c srv entId $ "error: " <> bshow e
notifyErr "" e
connIds <- readTVarIO upConnIds
unless (null connIds) $ notify' "" $ UP srv connIds
where
processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> AM ()
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
tryAgentError' (withStore c $ \db -> getRcvConn db srv rId) >>= \case
Left e -> notify' "" (ERR e)
Right (rq@RcvQueue {connId}, SomeConn _ conn) ->
tryAgentError' (a rq conn) >>= \case
Left e -> notify' connId (ERR e)
Right () -> pure ()
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
processSubOk rq@RcvQueue {connId} upConnIds =
atomically . whenM (isPendingSub connId) $ do
addSubscription c rq
modifyTVar' upConnIds (connId :)
processSubErr :: RcvQueue -> SMPClientError -> AM ()
processSubErr rq@RcvQueue {connId} e = do
atomically . whenM (isPendingSub connId) $ failSubscription c rq e
lift $ notifyErr connId e
isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> ACommand 'Agent e -> m ()
notify' connId msg = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) msg)
notifyErr :: ConnId -> SMPClientError -> AM' ()
notifyErr connId = notify' connId . ERR . protocolClientError SMP (B.unpack $ strEncode srv)
processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> BrokerMsg -> AM ()
processSMP
rq@RcvQueue {e2ePrivKey, e2eDhSecret, status}
rq@RcvQueue {rcvId = rId, e2ePrivKey, e2eDhSecret, status}
conn
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} =
withConnLock c connId "processSMP" $ case cmd of
Right r@(SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId}) ->
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss}
smpMsg =
withConnLock c connId "processSMP" $ case smpMsg of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
void . handleNotifyAck $ do
isGET <- atomically $ hasGetLock c rq
unless isGET $ checkExpiredResponse r
msg' <- decryptSMPMessage rq msg
ack' <- handleNotifyAck $ case msg' of
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody
SMP.ClientRcvMsgQuota {} -> queueDrained >> ack
when isGET $ notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
whenM (atomically $ hasGetLock c rq) $
notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
pure ack'
where
queueDrained = case conn of
@@ -2267,29 +2300,23 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd
handleNotifyAck :: AM ACKd -> AM ACKd
handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack
Right SMP.END ->
atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND)
>>= logServer "<--" c srv rId
SMP.END ->
atomically (TM.lookup tSess (smpClients c) $>>= (tryReadTMVar . sessionVar) >>= processEND)
>>= notifyEnd
where
processEND = \case
Just (Right clnt)
| sessId == sessionId (thParams $ connectedClient clnt) -> do
removeSubscription c connId
notify' END
pure "END"
| otherwise -> ignored
_ -> ignored
ignored = pure "END from disconnected client - ignored"
Right (SMP.ERR e) -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
Right r@SMP.OK -> checkExpiredResponse r
Right r -> unexpected r
Left e -> notify $ ERR $ protocolClientError SMP (B.unpack $ strEncode srv) e
| sessId == sessionId (thParams $ connectedClient clnt) ->
removeSubscription c connId $> True
_ -> pure False
notifyEnd removed
| removed = notify END >> logServer "<--" c srv rId "END"
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
r -> unexpected r
where
notify :: forall e m. MonadIO m => AEntityI e => ACommand 'Agent e -> m ()
notify = atomically . notify'
notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM ()
notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg)
notify :: forall e m. (AEntityI e, MonadIO m) => ACommand 'Agent e -> m ()
notify = notify' connId
prohibited :: AM ()
prohibited = notify . ERR $ AGENT A_PROHIBITED
@@ -2299,25 +2326,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
unexpected :: BrokerMsg -> AM ()
unexpected r = do
logServer "<--" c srv rId $ "unexpected: " <> bshow cmd
logServer "<--" c srv rId $ "unexpected: " <> bshow r
-- TODO add extended information about transmission type once UNEXPECTED has string
notify . ERR $ BROKER (B.unpack $ strEncode srv) $ UNEXPECTED (take 32 $ show r)
checkExpiredResponse :: BrokerMsg -> AM ()
checkExpiredResponse r = case tType of
TTEvent -> pure ()
TTUncorrelatedResponse -> unexpected r
TTExpiredResponse (SMP.Cmd _ cmd') -> case cmd' of
SMP.SUB -> do
added <-
atomically $
ifM
((&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId)
(True <$ addSubscription c rq)
(pure False)
when added $ notify $ UP srv [connId]
_ -> pure ()
decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> AM (SMP.PrivHeader, AgentMsgEnvelope)
decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do
clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody
+23 -27
View File
@@ -28,6 +28,7 @@ module Simplex.Messaging.Agent.Client
withConnLocks,
withInvLock,
withLockMap,
ipAddressProtected,
closeAgentClient,
closeProtocolServerClients,
reconnectServerClients,
@@ -41,6 +42,7 @@ module Simplex.Messaging.Agent.Client
getQueueMessage,
decryptSMPMessage,
addSubscription,
failSubscription,
addNewQueueSubscription,
getSubscriptions,
sendConfirmation,
@@ -108,8 +110,8 @@ module Simplex.Messaging.Agent.Client
waitUntilActive,
UserNetworkInfo (..),
UserNetworkType (..),
getNetworkConfig',
waitForUserNetwork,
waitOnlineOrDelay,
isNetworkOnline,
isOnline,
throwWhenInactive,
@@ -172,7 +174,6 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (deleteFirstsBy, foldl', partition, (\\))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
@@ -279,7 +280,7 @@ data AgentClient = AgentClient
active :: TVar Bool,
rcvQ :: TBQueue (ATransmission 'Client),
subQ :: TBQueue (ATransmission 'Agent),
msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg),
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
smpServers :: TMap UserId (NonEmpty SMPServerWithAuth),
smpClients :: TMap SMPTransportSession SMPClientVar,
-- smpProxiedRelays:
@@ -292,7 +293,7 @@ data AgentClient = AgentClient
xftpClients :: TMap XFTPTransportSession XFTPClientVar,
useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks
userNetworkInfo :: TVar UserNetworkInfo,
userNetworkDelay :: TVar Int64,
userNetworkUpdated :: TVar (Maybe UTCTime),
subscrConns :: TVar (Set ConnId),
activeSubs :: TRcvQueues,
pendingSubs :: TRcvQueues,
@@ -466,7 +467,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
xftpClients <- TM.empty
useNetworkConfig <- newTVar (slowNetworkConfig netCfg, netCfg)
userNetworkInfo <- newTVar $ UserNetworkInfo UNOther True
userNetworkDelay <- newTVar $ initialInterval $ userNetworkInterval cfg
userNetworkUpdated <- newTVar Nothing
subscrConns <- newTVar S.empty
activeSubs <- RQ.empty
pendingSubs <- RQ.empty
@@ -505,7 +506,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
xftpClients,
useNetworkConfig,
userNetworkInfo,
userNetworkDelay,
userNetworkUpdated,
subscrConns,
activeSubs,
pendingSubs,
@@ -867,25 +868,16 @@ getNetworkConfig c = do
UNNone -> slowCfg
_ -> fastCfg
-- returns fast network config
getNetworkConfig' :: AgentClient -> IO NetworkConfig
getNetworkConfig' = fmap snd . readTVarIO . useNetworkConfig
{-# INLINE getNetworkConfig' #-}
waitForUserNetwork :: AgentClient -> IO ()
waitForUserNetwork c =
unlessM (atomically $ isNetworkOnline c) $
readTVarIO (userNetworkDelay c) >>= void . waitOnlineOrDelay c
waitOnlineOrDelay :: AgentClient -> Int64 -> IO Bool
waitOnlineOrDelay c t = do
let maxWait = min t $ fromIntegral (maxBound :: Int)
t' = t - maxWait
delay <- registerDelay $ fromIntegral maxWait
online <-
atomically $ do
expired <- readTVar delay
online <- isNetworkOnline c
unless (expired || online) retry
pure online
if online || t' <= 0
then pure online
else waitOnlineOrDelay c t'
unlessM (atomically $ isNetworkOnline c) $ do
delay <- registerDelay $ userNetworkInterval $ config $ agentEnv c
atomically $ unlessM (isNetworkOnline c) $ unlessM (readTVar delay) retry
closeAgentClient :: AgentClient -> IO ()
closeAgentClient c = do
@@ -1107,7 +1099,7 @@ protocolClientError :: (Show err, Encoding err) => (HostName -> err -> AgentErro
protocolClientError protocolError_ host = \case
PCEProtocolError e -> protocolError_ host e
PCEResponseError e -> BROKER host $ RESPONSE $ B.unpack $ smpEncode e
PCEUnexpectedResponse r -> BROKER host $ UNEXPECTED $ take 32 $ show r
PCEUnexpectedResponse e -> BROKER host $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> BROKER host TIMEOUT
PCENetworkError -> BROKER host NETWORK
PCEIncompatibleHost -> BROKER host HOST
@@ -1297,9 +1289,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do
processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM ()
processSubResult c rq@RcvQueue {connId} = \case
Left e ->
unless (temporaryClientError e) $ do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
unless (temporaryClientError e) $
failSubscription c rq e
Right () ->
whenM (hasPendingSubscription c connId) $
addSubscription c rq
@@ -1419,6 +1410,11 @@ addSubscription c rq@RcvQueue {connId} = do
RQ.addQueue rq $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
failSubscription :: AgentClient -> RcvQueue -> SMPClientError -> STM ()
failSubscription c rq e = do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
addPendingSubscription :: AgentClient -> RcvQueue -> STM ()
addPendingSubscription c rq@RcvQueue {connId} = do
modifyTVar' (subscrConns c) $ S.insert connId
+5 -11
View File
@@ -92,7 +92,8 @@ data AgentConfig = AgentConfig
xftpCfg :: XFTPClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval2,
userNetworkInterval :: RetryInterval,
userNetworkInterval :: Int,
userOfflineDelay :: NominalDiffTime,
messageTimeout :: NominalDiffTime,
connDeleteDeliveryTimeout :: NominalDiffTime,
helloTimeout :: NominalDiffTime,
@@ -147,14 +148,6 @@ defaultMessageRetryInterval =
}
}
defaultUserNetworkInterval :: RetryInterval
defaultUserNetworkInterval =
RetryInterval
{ initialInterval = 1200_000000, -- 20 minutes
increaseAfter = 0,
maxInterval = 7200_000000 -- 2 hours
}
defaultAgentConfig :: AgentConfig
defaultAgentConfig =
AgentConfig
@@ -170,7 +163,8 @@ defaultAgentConfig =
xftpCfg = defaultXFTPClientConfig,
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
userNetworkInterval = defaultUserNetworkInterval,
userNetworkInterval = 1800_000000, -- 30 minutes, should be less than Int32 max value
userOfflineDelay = 2, -- if network offline event happens in less than 2 seconds after it was set online, it is ignored
messageTimeout = 2 * nominalDay,
connDeleteDeliveryTimeout = 2 * nominalDay,
helloTimeout = 2 * nominalDay,
@@ -179,7 +173,7 @@ defaultAgentConfig =
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
cleanupStepInterval = 200000, -- 200ms
maxWorkerRestartsPerMin = 5,
-- 3 consecutive subscription timeouts will result in alert to the user
-- 5 consecutive subscription timeouts will result in alert to the user
-- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking.
maxSubscriptionTimeouts = 5,
storedMsgDataTTL = 21 * nominalDay,
+32 -29
View File
@@ -2275,20 +2275,20 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do
firstRow fromOnly SEXFTPServerNotFound $
DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash)
createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId)
createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath file = runExceptT $ do
(rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile db gVar userId fd prefixPath tmpPath file Nothing Nothing
createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Bool -> IO (Either StoreError RcvFileId)
createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath file approvedRelays = runExceptT $ do
(rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile db gVar userId fd prefixPath tmpPath file Nothing Nothing approvedRelays
liftIO $
forM_ chunks $ \fc@FileChunk {replicas} -> do
chunkId <- insertRcvFileChunk db fc rcvFileId
forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertRcvFileChunkReplica db rno replica chunkId
pure rcvFileEntityId
createRcvFileRedirect :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription FRecipient -> FilePath -> FilePath -> CryptoFile -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId)
createRcvFileRedirect _ _ _ FileDescription {redirect = Nothing} _ _ _ _ _ = pure $ Left $ SEInternal "createRcvFileRedirect called without redirect"
createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redirectChunks, redirect = Just RedirectFileInfo {size, digest}} prefixPath redirectPath redirectFile dstPath dstFile = runExceptT $ do
(dstEntityId, dstId) <- ExceptT $ insertRcvFile db gVar userId dummyDst prefixPath dstPath dstFile Nothing Nothing
(_, redirectId) <- ExceptT $ insertRcvFile db gVar userId redirectFd prefixPath redirectPath redirectFile (Just dstId) (Just dstEntityId)
createRcvFileRedirect :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription FRecipient -> FilePath -> FilePath -> CryptoFile -> FilePath -> CryptoFile -> Bool -> IO (Either StoreError RcvFileId)
createRcvFileRedirect _ _ _ FileDescription {redirect = Nothing} _ _ _ _ _ _ = pure $ Left $ SEInternal "createRcvFileRedirect called without redirect"
createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redirectChunks, redirect = Just RedirectFileInfo {size, digest}} prefixPath redirectPath redirectFile dstPath dstFile approvedRelays = runExceptT $ do
(dstEntityId, dstId) <- ExceptT $ insertRcvFile db gVar userId dummyDst prefixPath dstPath dstFile Nothing Nothing approvedRelays
(_, redirectId) <- ExceptT $ insertRcvFile db gVar userId redirectFd prefixPath redirectPath redirectFile (Just dstId) (Just dstEntityId) approvedRelays
liftIO $
forM_ redirectChunks $ \fc@FileChunk {replicas} -> do
chunkId <- insertRcvFileChunk db fc redirectId
@@ -2308,8 +2308,8 @@ createRcvFileRedirect db gVar userId redirectFd@FileDescription {chunks = redire
chunks = []
}
insertRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Maybe DBRcvFileId -> Maybe RcvFileId -> IO (Either StoreError (RcvFileId, DBRcvFileId))
insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSize, redirect} prefixPath tmpPath (CryptoFile savePath cfArgs) redirectId_ redirectEntityId_ = runExceptT $ do
insertRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> Maybe DBRcvFileId -> Maybe RcvFileId -> Bool -> IO (Either StoreError (RcvFileId, DBRcvFileId))
insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSize, redirect} prefixPath tmpPath (CryptoFile savePath cfArgs) redirectId_ redirectEntityId_ approvedRelays = runExceptT $ do
let (redirectDigest_, redirectSize_) = case redirect of
Just RedirectFileInfo {digest = d, size = s} -> (Just d, Just s)
Nothing -> (Nothing, Nothing)
@@ -2317,8 +2317,8 @@ insertRcvFile db gVar userId FileDescription {size, digest, key, nonce, chunkSiz
createWithRandomId gVar $ \rcvFileEntityId ->
DB.execute
db
"INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, save_file_key, save_file_nonce, status, redirect_id, redirect_entity_id, redirect_digest, redirect_size) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
((rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath) :. (savePath, fileKey <$> cfArgs, fileNonce <$> cfArgs, RFSReceiving, redirectId_, redirectEntityId_, redirectDigest_, redirectSize_))
"INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, save_file_key, save_file_nonce, status, redirect_id, redirect_entity_id, redirect_digest, redirect_size, approved_relays) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
((rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath) :. (savePath, fileKey <$> cfArgs, fileNonce <$> cfArgs, RFSReceiving, redirectId_, redirectEntityId_, redirectDigest_, redirectSize_, approvedRelays))
rcvFileId <- liftIO $ insertedRowId db
pure (rcvFileEntityId, rcvFileId)
@@ -2468,7 +2468,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO ()
deleteRcvFile' db rcvFileId =
DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId)
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFileChunk))
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool)))
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do
getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd)
where
@@ -2492,7 +2492,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
LIMIT 1
|]
(host, port, keyHash, RFSReceiving, cutoffTs)
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError RcvFileChunk)
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool))
getChunkData (rcvFileChunkReplicaId, _fileId) =
firstRow toChunk SEFileNotFound $
DB.query
@@ -2500,7 +2500,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
[sql|
SELECT
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries,
f.approved_relays
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
@@ -2509,20 +2510,22 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|]
(Only rcvFileChunkReplicaId)
where
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int)) -> RcvFileChunk
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) =
RcvFileChunk
{ rcvFileId,
rcvFileEntityId,
userId,
rcvChunkId,
chunkNo,
chunkSize,
digest,
fileTmpPath,
chunkTmpPath,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
}
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool)
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) =
( RcvFileChunk
{ rcvFileId,
rcvFileEntityId,
userId,
rcvChunkId,
chunkNo,
chunkSize,
digest,
fileTmpPath,
chunkTmpPath,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
},
approvedRelays
)
getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile))
getNextRcvFileToDecrypt db ttl =
@@ -71,6 +71,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_deliver
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -110,7 +111,8 @@ schemaMigrations =
("m20240121_message_delivery_indexes", m20240121_message_delivery_indexes, Just down_m20240121_message_delivery_indexes),
("m20240124_file_redirect", m20240124_file_redirect, Just down_m20240124_file_redirect),
("m20240223_connections_wait_delivery", m20240223_connections_wait_delivery, Just down_m20240223_connections_wait_delivery),
("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem)
("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem),
("m20240417_rcv_files_approved_relays", m20240417_rcv_files_approved_relays, Just down_m20240417_rcv_files_approved_relays)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,18 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20240417_rcv_files_approved_relays :: Query
m20240417_rcv_files_approved_relays =
[sql|
ALTER TABLE rcv_files ADD COLUMN approved_relays INTEGER NOT NULL DEFAULT 0;
|]
down_m20240417_rcv_files_approved_relays :: Query
down_m20240417_rcv_files_approved_relays =
[sql|
ALTER TABLE rcv_files DROP COLUMN approved_relays;
|]
@@ -287,6 +287,7 @@ CREATE TABLE rcv_files(
redirect_entity_id BLOB,
redirect_size INTEGER,
redirect_digest BLOB,
approved_relays INTEGER NOT NULL DEFAULT 0,
UNIQUE(rcv_file_entity_id)
);
CREATE TABLE rcv_file_chunks(
+59 -44
View File
@@ -65,6 +65,7 @@ module Simplex.Messaging.Client
ProtocolClientError (..),
SMPClientError,
ProxyClientError (..),
unexpectedResponse,
ProtocolClientConfig (..),
NetworkConfig (..),
TransportSessionMode (..),
@@ -80,8 +81,8 @@ module Simplex.Messaging.Client
proxyUsername,
temporaryClientError,
smpProxyError,
ServerTransmission,
TransmissionType (..),
ServerTransmissionBatch,
ServerTransmission (..),
ClientCommand,
-- * For testing
@@ -111,7 +112,7 @@ import Data.Int (Int64)
import Data.List (find)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import Data.Maybe (fromMaybe)
import Data.Maybe (catMaybes, fromMaybe)
import Data.Time.Clock (UTCTime (..), diffUTCTime, getCurrentTime)
import qualified Data.X509 as X
import qualified Data.X509.Validation as XV
@@ -155,7 +156,7 @@ data PClient v err msg = PClient
sentCommands :: TMap CorrId (Request err msg),
sndQ :: TBQueue ByteString,
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
msgQ :: Maybe (TBQueue (ServerTransmission v err msg))
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
}
smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> STM SMPClient
@@ -206,10 +207,14 @@ type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg
-- | Type for client command data
type ClientCommand msg = (Maybe C.APrivateAuthKey, EntityId, ProtoCommand msg)
-- | Type synonym for transmission from some SPM server queue.
type ServerTransmission v err msg = (TransportSession msg, Version v, SessionId, TransmissionType msg, EntityId, Either (ProtocolClientError err) msg)
-- | Type synonym for transmission from SPM servers.
-- Batch response is presented as a single `ServerTransmissionBatch` tuple.
type ServerTransmissionBatch v err msg = (TransportSession msg, Version v, SessionId, NonEmpty (EntityId, ServerTransmission err msg))
data TransmissionType msg = TTEvent | TTUncorrelatedResponse | TTExpiredResponse (ProtoCommand msg)
data ServerTransmission err msg
= STEvent (Either (ProtocolClientError err) msg)
| STResponse (ProtoCommand msg) (Either (ProtocolClientError err) msg)
| STUnexpectedError (ProtocolClientError err)
data HostMode
= -- | prefer (or require) onion hosts when connecting via SOCKS proxy
@@ -396,7 +401,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId)
--
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmission v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret} msgQ disconnected = do
case chooseTransportHost networkConfig (host srv) of
Right useHost ->
@@ -498,38 +503,48 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
maxCnt = smpPingCount networkConfig
process :: ProtocolClient v err msg -> IO ()
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= mapM_ (processMsg c)
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c
processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO ()
processMsg c@ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr))
| not $ B.null $ bs corrId =
processMsgs :: ProtocolClient v err msg -> NonEmpty (SignedTransmission err msg) -> IO ()
processMsgs c ts = do
ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts)
forM_ msgQ $ \q ->
mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts')
processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg))
processMsg ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr))
| B.null $ bs corrId = sendMsg $ STEvent clientResp
| otherwise =
atomically (TM.lookup corrId sentCommands) >>= \case
Nothing -> sendMsg TTUncorrelatedResponse
Nothing -> sendMsg $ STUnexpectedError unexpected
Just Request {entityId, command, pending, responseVar} -> do
wasPending <-
atomically $ do
TM.delete corrId sentCommands
ifM
(swapTVar pending False)
(True <$ tryPutTMVar responseVar (response entityId))
(True <$ tryPutTMVar responseVar (if entityId == entId then clientResp else Left unexpected))
(pure False)
unless wasPending $ sendMsg $ if entityId == entId then TTExpiredResponse command else TTUncorrelatedResponse
| otherwise = sendMsg TTEvent
if wasPending
then pure Nothing
else sendMsg $ if entityId == entId then STResponse command clientResp else STUnexpectedError unexpected
where
response entityId
| entityId == entId = clientResp
| otherwise = Left . PCEUnexpectedResponse $ bshow respOrErr
unexpected = unexpectedResponse respOrErr
clientResp = case respOrErr of
Left e -> Left $ PCEResponseError e
Right r -> case protocolError r of
Just e -> Left $ PCEProtocolError e
_ -> Right r
sendMsg :: TransmissionType msg -> IO ()
sendMsg tType = case msgQ of
Just q -> atomically $ writeTBQueue q $ serverTransmission c tType entId clientResp
Nothing -> case clientResp of
Left e -> logError $ "SMP client error: " <> tshow e
Right _ -> logWarn $ "SMP client unprocessed event"
sendMsg :: ServerTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg))
sendMsg t = case msgQ of
Just _ -> pure $ Just (entId, t)
Nothing ->
Nothing <$ case clientResp of
Left e -> logError $ "SMP client error: " <> tshow e
Right _ -> logWarn "SMP client unprocessed event"
unexpectedResponse :: Show r => r -> ProtocolClientError err
unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show
proxyUsername :: TransportSession msg -> ByteString
proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_
@@ -585,7 +600,7 @@ smpProxyError :: SMPClientError -> ErrorType
smpProxyError = \case
PCEProtocolError e -> PROXY $ PROTOCOL e
PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e
PCEUnexpectedResponse s -> PROXY $ BROKER $ UNEXPECTED $ B.unpack $ B.take 32 s
PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> PROXY $ BROKER TIMEOUT
PCENetworkError -> PROXY $ BROKER NETWORK
PCEIncompatibleHost -> PROXY $ BROKER HOST
@@ -606,7 +621,7 @@ createSMPQueue ::
createSMPQueue c (rKey, rpKey) dhKey auth subMode =
sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey auth subMode) >>= \case
IDS qik -> pure qik
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to the SMP queue.
--
@@ -617,7 +632,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d
sendSMPCommand c (Just rpKey) rId SUB >>= \case
OK -> pure ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to multiple SMP queues batching commands if supported.
subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
@@ -637,15 +652,15 @@ processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SM
processSUBResponse c (Response rId r) = case r of
Right OK -> pure $ Right ()
Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right ()
Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r'
Right r' -> pure . Left $ unexpectedResponse r'
Left e -> pure $ Left e
writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO ()
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c TTEvent rId (Right msg)) (msgQ $ client_ c)
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c)
serverTransmission :: ProtocolClient v err msg -> TransmissionType msg -> RecipientId -> Either (ProtocolClientError err) msg -> ServerTransmission v err msg
serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} tType entityId msgOrErr =
(transportSession, thVersion, sessionId, tType, entityId, msgOrErr)
serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg
serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} ts =
(transportSession, thVersion, sessionId, ts)
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
--
@@ -655,7 +670,7 @@ getSMPMessage c rpKey rId =
sendSMPCommand c (Just rpKey) rId GET >>= \case
OK -> pure Nothing
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to the SMP queue notifications.
--
@@ -683,7 +698,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId ->
enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case
NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey)
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Enable notifications for the multiple queues for push notifications server.
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
@@ -692,7 +707,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs
cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
process (Response _ r) = case r of
Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey)
Right r' -> Left . PCEUnexpectedResponse $ bshow r'
Right r' -> Left $ unexpectedResponse r'
Left e -> Left e
-- | Disable notifications for the queue for push notifications server.
@@ -714,7 +729,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -
sendSMPMessage c spKey sId flags msg =
sendSMPCommand c spKey sId (SEND flags msg) >>= \case
OK -> pure ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Acknowledge message delivery (server deletes the message).
--
@@ -724,7 +739,7 @@ ackSMPMessage c rpKey rId msgId =
sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case
OK -> return ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Irreversibly suspend SMP queue.
-- The existing messages from the queue will still be delivered.
@@ -756,7 +771,7 @@ connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, t
case supportedClientSMPRelayVRange `compatibleVersion` vr of
Nothing -> throwE $ transportErr TEVersion
Just (Compatible v) -> liftEitherWith (const $ transportErr $ TEHandshake IDENTITY) $ ProxiedRelay sId v <$> validateRelay chain key
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
| otherwise = throwE $ PCETransportError TEVersion
where
tOut = Just $ tcpConnectTimeout + tcpTimeout
@@ -862,14 +877,14 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
(_auth, _signed, (_c, _e, cmd)) -> case cmd of
Right OK -> pure $ Right ()
Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay
Right e -> throwE $ PCEUnexpectedResponse $ B.take 32 $ bshow e
Right r' -> throwE $ unexpectedResponse r'
Left e -> throwE $ PCEResponseError e
_ -> throwE $ PCETransportError TEBadBlock
ERR e -> pure . Left $ ProxyProtocolError e -- this will not happen, this error is returned via Left
_ -> pure . Left $ ProxyUnexpectedResponse $ take 32 $ show r
Left e -> case e of
PCEProtocolError e' -> pure . Left $ ProxyProtocolError e'
PCEUnexpectedResponse r -> pure . Left $ ProxyUnexpectedResponse $ B.unpack r
PCEUnexpectedResponse e' -> pure . Left $ ProxyUnexpectedResponse $ B.unpack e'
PCEResponseError e' -> pure . Left $ ProxyResponseError e'
_ -> throwE e
@@ -894,13 +909,13 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId =
r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr
FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r'
pure fwdResponse
r -> throwE . PCEUnexpectedResponse $ B.take 32 $ bshow r
r -> throwE $ unexpectedResponse r
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
sendSMPCommand c (Just pKey) qId cmd >>= \case
OK -> return ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ()))
okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
@@ -909,7 +924,7 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs
process (Response _ r) = case r of
Right OK -> Right ()
Right r' -> Left . PCEUnexpectedResponse $ bshow r'
Right r' -> Left $ unexpectedResponse r'
Left e -> Left e
-- | Send SMP command
+29 -14
View File
@@ -54,7 +54,7 @@ import UnliftIO.Exception (Exception)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient)
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
data SMPClientAgentEvent
= CAConnected SMPServer
@@ -75,7 +75,8 @@ data SMPClientAgentConfig = SMPClientAgentConfig
persistErrorInterval :: NominalDiffTime,
msgQSize :: Natural,
agentQSize :: Natural,
agentSubsBatchSize :: Int
agentSubsBatchSize :: Int,
ownServerDomains :: [ByteString]
}
defaultSMPClientAgentConfig :: SMPClientAgentConfig
@@ -91,7 +92,8 @@ defaultSMPClientAgentConfig =
persistErrorInterval = 0,
msgQSize = 256,
agentQSize = 256,
agentSubsBatchSize = 900
agentSubsBatchSize = 900,
ownServerDomains = []
}
where
second = 1000000
@@ -99,17 +101,19 @@ defaultSMPClientAgentConfig =
data SMPClientAgent = SMPClientAgent
{ agentCfg :: SMPClientAgentConfig,
active :: TVar Bool,
msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg),
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,
smpSessions :: TMap SessionId SMPClient,
smpSessions :: TMap SessionId (OwnServer, SMPClient),
srvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
pendingSrvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
workerSeq :: TVar Int
}
type OwnServer = Bool
newtype InternalException e = InternalException {unInternalException :: e}
deriving (Eq, Show)
@@ -163,13 +167,17 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg
-- | Get or create SMP client for SMPServer
getSMPServerClient' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv =
getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv =
atomically getClientVar >>= either (ExceptT . newSMPClient) waitForSMPClient
where
getClientVar :: STM (Either SMPClientVar SMPClientVar)
getClientVar = getSessVar workerSeq srv smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
waitForSMPClient v = do
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
@@ -179,20 +187,22 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker
Just (Left (e, Just ts)) ->
ifM
((ts <) <$> liftIO getCurrentTime)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv)
(throwE e)
Nothing -> throwE PCEResponseTimeout
newSMPClient :: SMPClientVar -> IO (Either SMPClientError SMPClient)
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
newSMPClient v = do
r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError)
case r of
Right smp -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
let c = (isOwnServer ca srv, smp)
atomically $ do
putTMVar (sessionVar v) (Right smp)
TM.insert (sessionId $ thParams smp) smp smpSessions
putTMVar (sessionVar v) (Right c)
TM.insert (sessionId $ thParams smp) c smpSessions
notify ca $ CAConnected srv
pure $ Right c
Left e -> do
if persistErrorInterval agentCfg == 0 || e == PCENetworkError || e == PCEResponseTimeout
then atomically $ do
@@ -202,7 +212,12 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker
ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime
atomically $ putTMVar (sessionVar v) (Left (e, Just ts))
reconnectClient ca srv
pure r
pure $ Left e
isOwnServer :: SMPClientAgent -> SMPServer -> OwnServer
isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} =
let srv = strEncode $ L.head host
in any (\s -> s == srv || (B.cons '.' s) `B.isSuffixOf` srv) (ownServerDomains agentCfg)
-- | Run an SMP client for SMPClientVar
connectClient :: SMPClientAgent -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
@@ -298,7 +313,7 @@ notify :: MonadIO m => SMPClientAgent -> SMPClientAgentEvent -> m ()
notify ca evt = atomically $ writeTBQueue (agentQ ca) evt
{-# INLINE notify #-}
lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe SMPClient)
lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe (OwnServer, SMPClient))
lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookup sessId smpSessions
closeSMPClientAgent :: SMPClientAgent -> IO ()
@@ -315,7 +330,7 @@ closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_
where
closeClient v =
atomically (readTMVar $ sessionVar v) >>= \case
Right smp -> closeProtocolClient smp `catchAll_` pure ()
Right (_, smp) -> closeProtocolClient smp `catchAll_` pure ()
_ -> pure ()
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
@@ -12,7 +12,6 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Transport (NTFVersion, supportedClientNTFVRange, supportedNTFHandshakes)
import Simplex.Messaging.Protocol (ErrorType)
import Simplex.Messaging.Util (bshow)
type NtfClient = ProtocolClient NTFVersion ErrorType NtfResponse
@@ -25,7 +24,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Token -> Exc
ntfRegisterToken c pKey newTkn =
sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case
NRTknId tknId dhKey -> pure (tknId, dhKey)
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfVerifyToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> NtfRegCode -> ExceptT NtfClientError IO ()
ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId
@@ -34,7 +33,7 @@ ntfCheckToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClie
ntfCheckToken c pKey tknId =
sendNtfCommand c (Just pKey) tknId TCHK >>= \case
NRTkn stat -> pure stat
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfReplaceToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> DeviceToken -> ExceptT NtfClientError IO ()
ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId
@@ -49,13 +48,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscri
ntfCreateSubscription c pKey newSub =
sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case
NRSubId subId -> pure subId
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfCheckSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO NtfSubStatus
ntfCheckSubscription c pKey subId =
sendNtfCommand c (Just pKey) subId SCHK >>= \case
NRSub stat -> pure stat
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfDeleteSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO ()
ntfDeleteSubscription = okNtfCommand SDEL
@@ -68,4 +67,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey -
okNtfCommand cmd c pKey entId =
sendNtfCommand c (Just pKey) entId cmd >>= \case
NROk -> return ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
+22 -19
View File
@@ -31,7 +31,7 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Network.Socket (ServiceName)
import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError)
import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..))
import Simplex.Messaging.Client.Agent
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
@@ -220,23 +220,27 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
receiveSMP :: M ()
receiveSMP = forever $ do
((_, srv, _), _, _, _tType, ntfId, msgOrErr) <- atomically $ readTBQueue msgQ
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
atomically $ updatePeriodStats (activeSubs stats) ntfId
atomically $
findNtfSubscriptionToken st smpQueue
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
incNtfStat ntfReceived
Right SMP.END -> updateSubStatus smpQueue NSEnd
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError $ "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
((_, srv, _), _, _, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
STResponse {} -> pure () -- it was already reported as timeout error
STEvent msgOrErr -> do
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
atomically $ updatePeriodStats (activeSubs stats) ntfId
atomically $
findNtfSubscriptionToken st smpQueue
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
incNtfStat ntfReceived
Right SMP.END -> updateSubStatus smpQueue NSEnd
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError $ "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
receiveAgent =
forever $
@@ -248,7 +252,6 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
forM_ subs $ \(_, ntfId) -> do
let smpQueue = SMPQueueNtf srv ntfId
updateSubStatus smpQueue NSInactive
CAResubscribed srv subs -> do
forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive
logSubStatus srv "resubscribed" $ length subs
+96 -48
View File
@@ -6,6 +6,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
@@ -68,8 +69,8 @@ import GHC.Stats (getRTSStats)
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName, Socket, socketToHandle)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError)
import Simplex.Messaging.Client.Agent (SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient', lookupSMPServerClient)
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError, temporaryClientError)
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -232,7 +233,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
@@ -251,32 +252,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
psNtf <- atomically $ periodStatCounts activeQueuesNtf ts
pRelays' <- atomically $ getResetProxyStatsData pRelays
pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn
pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds
pMsgFwdsOwn' <- atomically $ getResetProxyStatsData pMsgFwdsOwn
pMsgFwdsRecv' <- atomically $ swapTVar pMsgFwdsRecv 0
qCount' <- readTVarIO qCount
msgCount' <- readTVarIO msgCount
hPutStrLn h $
intercalate
","
[ iso8601Show $ utctDay fromTime',
show qCreated',
show qSecured',
show qDeletedAll',
show msgSent',
show msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
show msgSentNtf',
show msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
show qCount',
show msgCount',
show msgExpired',
show qDeletedNew',
show qDeletedSecured'
]
( [ iso8601Show $ utctDay fromTime',
show qCreated',
show qSecured',
show qDeletedAll',
show msgSent',
show msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
show msgSentNtf',
show msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
show qCount',
show msgCount',
show msgExpired',
show qDeletedNew',
show qDeletedSecured'
]
<> showProxyStats pRelays'
<> showProxyStats pRelaysOwn'
<> showProxyStats pMsgFwds'
<> showProxyStats pMsgFwdsOwn'
<> [show pMsgFwdsRecv']
)
liftIO $ threadDelay' interval
where
showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
[show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther]
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
@@ -346,7 +361,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
subscriptions' <- bshow . M.size <$> readTVarIO subscriptions
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
CPStats -> withAdminRole $ do
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats
ss <- unliftIO u $ asks serverStats
let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO ()
putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n"
putProxyStat :: ByteString -> (ServerStats -> ProxyStats) -> IO ()
putProxyStat label var = do
ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- atomically $ getProxyStatsData $ var ss
B.hPutStr h $ label <> ": requests=" <> bshow _pRequests <> ", successes=" <> bshow _pSuccesses <> ", errorsConnect=" <> bshow _pErrorsConnect <> ", errorsCompat=" <> bshow _pErrorsCompat <> ", errorsOther=" <> bshow _pErrorsOther <> "\n"
putStat "fromTime" fromTime
putStat "qCreated" qCreated
putStat "qSecured" qSecured
@@ -359,9 +380,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
putStat "msgRecvNtf" msgRecvNtf
putStat "qCount" qCount
putStat "msgCount" msgCount
where
putStat :: Show a => String -> TVar a -> IO ()
putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v
putProxyStat "pRelays" pRelays
putProxyStat "pRelaysOwn" pRelaysOwn
putProxyStat "pMsgFwds" pMsgFwds
putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn
putStat "pMsgFwdsRecv" pMsgFwdsRecv
CPStatsRTS -> getRTSStats >>= hPrint h
CPThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
@@ -647,33 +670,56 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
getRelay = do
ProxyAgent {smpAgent} <- asks proxyAgent
liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError)
ServerStats {pRelays, pRelaysOwn} <- asks serverStats
let inc = mkIncProxyStats pRelays pRelaysOwn
ProxyAgent {smpAgent = a} <- asks proxyAgent
liftIO (runExceptT (getSMPServerClient'' a srv) `catch` (pure . Left . PCEIOError)) >>= \case
Right (own, smp) -> do
inc own pRequests
case proxyResp smp of
r@PKEY {} -> r <$ inc own pSuccesses
r -> r <$ inc own pErrorsCompat
Left e -> do
let own = isOwnServer a srv
inc own pRequests
inc own $ if temporaryClientError e then pErrorsConnect else pErrorsOther
pure . ERR $ smpProxyError e
where
proxyResp = \case
Left err -> ERR $ smpProxyError err
Right smp ->
let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp
in case compatibleVRange thServerVRange proxiedSMPRelayVRange of
-- Cap the destination relay version range to prevent client version fingerprinting.
-- See comment for proxiedSMPRelayVersion.
Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of
Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey
Nothing -> ERR $ transportErr TENoServerAuth
_ -> ERR $ transportErr TEVersion
proxyResp smp =
let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp
in case compatibleVRange thServerVRange proxiedSMPRelayVRange of
-- Cap the destination relay version range to prevent client version fingerprinting.
-- See comment for proxiedSMPRelayVersion.
Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of
Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey
Nothing -> ERR $ transportErr TENoServerAuth
_ -> ERR $ transportErr TEVersion
PFWD fwdV pubKey encBlock -> do
ProxyAgent {smpAgent} <- asks proxyAgent
atomically (lookupSMPServerClient smpAgent sessId) >>= \case
Just smp
| v >= sendingProxySMPVersion ->
liftIO $ either (ERR . smpProxyError) PRES <$>
runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError)
| otherwise -> pure . ERR $ transportErr TEVersion
ProxyAgent {smpAgent = a} <- asks proxyAgent
ServerStats {pMsgFwds, pMsgFwdsOwn} <- asks serverStats
let inc = mkIncProxyStats pMsgFwds pMsgFwdsOwn
atomically (lookupSMPServerClient a sessId) >>= \case
Just (own, smp) -> do
inc own pRequests
if
| v >= sendingProxySMPVersion ->
liftIO (runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catch` (pure . Left . PCEIOError)) >>= \case
Right r -> PRES r <$ inc own pSuccesses
Left e -> case e of
PCEProtocolError {} -> ERR err <$ inc own pSuccesses
_ -> ERR err <$ inc own pErrorsOther
where
err = smpProxyError e
| otherwise -> ERR (transportErr TEVersion) <$ inc own pErrorsCompat
where
THandleParams {thVersion = v} = thParams smp
Nothing -> pure $ ERR $ PROXY NO_SESSION
Nothing -> inc False pRequests >> inc False pErrorsConnect $> ERR (PROXY NO_SESSION)
transportErr :: TransportError -> ErrorType
transportErr = PROXY . BROKER . TRANSPORT
mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m ()
mkIncProxyStats ps psOwn = \own sel -> do
atomically $ modifyTVar' (sel ps) (+ 1)
when own $ atomically $ modifyTVar' (sel psOwn) (+ 1)
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Either (Transmission (Command 'ProxiedClient)) (Transmission BrokerMsg))
processCommand (qr_, (corrId, queueId, cmd)) = do
st <- asks queueStore
@@ -987,6 +1033,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
-- encrypt to proxy
let fr = FwdResponse {fwdCorrId, fwdResponse = r2}
r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr)
stats <- asks serverStats
atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1)
pure $ RRES r3
where
rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
+1 -1
View File
@@ -126,7 +126,7 @@ data Server = Server
savingLock :: Lock
}
data ProxyAgent = ProxyAgent
newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent
}
+6
View File
@@ -10,6 +10,7 @@ module Simplex.Messaging.Server.Main where
import Control.Concurrent.STM
import Control.Monad (void)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Ini (lookupValue, readIniFile)
@@ -147,6 +148,8 @@ smpServerCLI cfgPath logPath =
\# It defines prefferred hostname for destination servers with multiple hostnames.\n\
\# host_mode: public\n\
\# required_host_mode: off\n\n\
\# The domain suffixes of the relays you operate (space-separated) to count as separate proxy statistics.\n\
\# own_server_domains: \n\n\
\# SOCKS proxy port for forwarding messages to destination servers.\n\
\# You may need a separate instance of SOCKS proxy for incoming single-hop requests.\n\
\# socks_proxy: localhost:9050\n\n\
@@ -245,6 +248,7 @@ smpServerCLI cfgPath logPath =
requiredHostMode = fromMaybe False $ iniOnOff "PROXY" "required_host_mode" ini
}
},
ownServerDomains = either (const []) textToOwnServers $ lookupValue "PROXY" "own_server_domains" ini,
persistErrorInterval = 30 -- seconds
},
allowSMPProxy = True
@@ -259,6 +263,8 @@ smpServerCLI cfgPath logPath =
"public" -> HMPublic
"onion" -> HMOnionViaSocks
s -> error . T.unpack $ "Invalid host_mode: " <> s
textToOwnServers :: Text -> [ByteString]
textToOwnServers = map encodeUtf8 . T.words
data CliCommand
= Init InitOptions
+137 -18
View File
@@ -33,6 +33,11 @@ data ServerStats = ServerStats
msgSentNtf :: TVar Int,
msgRecvNtf :: TVar Int,
activeQueuesNtf :: PeriodStats RecipientId,
pRelays :: ProxyStats,
pRelaysOwn :: ProxyStats,
pMsgFwds :: ProxyStats,
pMsgFwdsOwn :: ProxyStats,
pMsgFwdsRecv :: TVar Int,
qCount :: TVar Int,
msgCount :: TVar Int
}
@@ -51,6 +56,11 @@ data ServerStatsData = ServerStatsData
_msgSentNtf :: Int,
_msgRecvNtf :: Int,
_activeQueuesNtf :: PeriodStatsData RecipientId,
_pRelays :: ProxyStatsData,
_pRelaysOwn :: ProxyStatsData,
_pMsgFwds :: ProxyStatsData,
_pMsgFwdsOwn :: ProxyStatsData,
_pMsgFwdsRecv :: Int,
_qCount :: Int,
_msgCount :: Int
}
@@ -71,9 +81,14 @@ newServerStats ts = do
msgSentNtf <- newTVar 0
msgRecvNtf <- newTVar 0
activeQueuesNtf <- newPeriodStats
pRelays <- newProxyStats
pRelaysOwn <- newProxyStats
pMsgFwds <- newProxyStats
pMsgFwdsOwn <- newProxyStats
pMsgFwdsRecv <- newTVar 0
qCount <- newTVar 0
msgCount <- newTVar 0
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount}
getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
@@ -90,9 +105,14 @@ getServerStatsData s = do
_msgSentNtf <- readTVar $ msgSentNtf s
_msgRecvNtf <- readTVar $ msgRecvNtf s
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
_pRelays <- getProxyStatsData $ pRelays s
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
_pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s
_pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s
_qCount <- readTVar $ qCount s
_msgCount <- readTVar $ msgCount s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount}
setServerStats :: ServerStats -> ServerStatsData -> STM ()
setServerStats s d = do
@@ -109,28 +129,42 @@ setServerStats s d = do
writeTVar (msgSentNtf s) $! _msgSentNtf d
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
setProxyStats (pRelays s) $! _pRelays d
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
setProxyStats (pMsgFwds s) $! _pMsgFwds d
setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d
writeTVar (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
writeTVar (qCount s) $! _qCount d
writeTVar (msgCount s) $! _msgCount d
instance StrEncoding ServerStatsData where
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} =
strEncode d =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"qCreated=" <> strEncode _qCreated,
"qSecured=" <> strEncode _qSecured,
"qDeletedAll=" <> strEncode _qDeletedAll,
"qDeletedNew=" <> strEncode _qDeletedNew,
"qDeletedSecured=" <> strEncode _qDeletedSecured,
"qCount=" <> strEncode _qCount,
"msgSent=" <> strEncode _msgSent,
"msgRecv=" <> strEncode _msgRecv,
"msgExpired=" <> strEncode _msgExpired,
"msgSentNtf=" <> strEncode _msgSentNtf,
"msgRecvNtf=" <> strEncode _msgRecvNtf,
[ "fromTime=" <> strEncode (_fromTime d),
"qCreated=" <> strEncode (_qCreated d),
"qSecured=" <> strEncode (_qSecured d),
"qDeletedAll=" <> strEncode (_qDeletedAll d),
"qDeletedNew=" <> strEncode (_qDeletedNew d),
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qCount=" <> strEncode (_qCount d),
"msgSent=" <> strEncode (_msgSent d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgExpired=" <> strEncode (_msgExpired d),
"msgSentNtf=" <> strEncode (_msgSentNtf d),
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
"activeQueues:",
strEncode _activeQueues,
strEncode (_activeQueues d),
"activeQueuesNtf:",
strEncode _activeQueuesNtf
strEncode (_activeQueuesNtf d),
"pRelays:",
strEncode (_pRelays d),
"pRelaysOwn:",
strEncode (_pRelaysOwn d),
"pMsgFwds:",
strEncode (_pMsgFwds d),
"pMsgFwdsOwn:",
strEncode (_pMsgFwdsOwn d),
"pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d)
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
@@ -157,7 +191,17 @@ instance StrEncoding ServerStatsData where
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newPeriodStatsData
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0}
_pRelays <- proxyStatsP "pRelays:"
_pRelaysOwn <- proxyStatsP "pRelaysOwn:"
_pMsgFwds <- proxyStatsP "pMsgFwds:"
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
_pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0}
where
proxyStatsP key =
optional (A.string key >> A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newProxyStatsData
data PeriodStats a = PeriodStats
{ day :: TVar (Set a),
@@ -231,3 +275,78 @@ updatePeriodStats stats pId = do
updatePeriod month
where
updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId)
data ProxyStats = ProxyStats
{ pRequests :: TVar Int,
pSuccesses :: TVar Int, -- includes destination server error responses that will be forwarded to the client
pErrorsConnect :: TVar Int,
pErrorsCompat :: TVar Int,
pErrorsOther :: TVar Int
}
newProxyStats :: STM ProxyStats
newProxyStats = do
pRequests <- newTVar 0
pSuccesses <- newTVar 0
pErrorsConnect <- newTVar 0
pErrorsCompat <- newTVar 0
pErrorsOther <- newTVar 0
pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther}
data ProxyStatsData = ProxyStatsData
{ _pRequests :: Int,
_pSuccesses :: Int,
_pErrorsConnect :: Int,
_pErrorsCompat :: Int,
_pErrorsOther :: Int
}
deriving (Show)
newProxyStatsData :: ProxyStatsData
newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0}
getProxyStatsData :: ProxyStats -> STM ProxyStatsData
getProxyStatsData s = do
_pRequests <- readTVar $ pRequests s
_pSuccesses <- readTVar $ pSuccesses s
_pErrorsConnect <- readTVar $ pErrorsConnect s
_pErrorsCompat <- readTVar $ pErrorsCompat s
_pErrorsOther <- readTVar $ pErrorsOther s
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
getResetProxyStatsData :: ProxyStats -> STM ProxyStatsData
getResetProxyStatsData s = do
_pRequests <- swapTVar (pRequests s) 0
_pSuccesses <- swapTVar (pSuccesses s) 0
_pErrorsConnect <- swapTVar (pErrorsConnect s) 0
_pErrorsCompat <- swapTVar (pErrorsCompat s) 0
_pErrorsOther <- swapTVar (pErrorsOther s) 0
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
setProxyStats :: ProxyStats -> ProxyStatsData -> STM ()
setProxyStats s d = do
writeTVar (pRequests s) $! _pRequests d
writeTVar (pSuccesses s) $! _pSuccesses d
writeTVar (pErrorsConnect s) $! _pErrorsConnect d
writeTVar (pErrorsCompat s) $! _pErrorsCompat d
writeTVar (pErrorsOther s) $! _pErrorsOther d
instance StrEncoding ProxyStatsData where
strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
"requests="
<> strEncode _pRequests
<> "\nsuccesses="
<> strEncode _pSuccesses
<> "\nerrorsConnect="
<> strEncode _pErrorsConnect
<> "\nerrorsCompat="
<> strEncode _pErrorsCompat
<> "\nerrorsOther="
<> strEncode _pErrorsOther
strP = do
_pRequests <- "requests=" *> strP <* A.endOfLine
_pSuccesses <- "successes=" *> strP <* A.endOfLine
_pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine
_pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine
_pErrorsOther <- "errorsOther=" *> strP
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
+21 -22
View File
@@ -78,7 +78,6 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), SMPProxyFallback (..), SMPProxyMode (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig)
@@ -439,7 +438,7 @@ functionalAPITests t = do
it "send delivery receipts concurrently with messages" $ testDeliveryReceiptsConcurrent t
describe "user network info" $ do
it "should wait for user network" testWaitForUserNetwork
it "should not reset offline interval while offline" testDoNotResetOfflineInterval
it "should not reset online to offline if happens too quickly" testDoNotResetOnlineToOffline
it "should resume multiple threads" testResumeMultipleThreads
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> IO Int
@@ -2710,11 +2709,8 @@ testWaitForUserNetwork = do
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNNone False
threadDelay 5000
networkDelay a 100000
networkDelay a 150000
networkDelay a 200000
networkDelay a 200000
networkDelay a 100000
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
@@ -2724,26 +2720,29 @@ testWaitForUserNetwork = do
(networkDelay a 50000)
noNetworkDelay a
where
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0}
testDoNotResetOfflineInterval :: IO ()
testDoNotResetOfflineInterval = do
testDoNotResetOnlineToOffline :: IO ()
testDoNotResetOnlineToOffline = do
a <- getSMPAgentClient' 1 aCfg initAgentServers testDB
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
threadDelay 5000
networkDelay a 100000
networkDelay a 150000
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
networkDelay a 200000
setUserNetworkInfo a $ UserNetworkInfo UNNone False
networkDelay a 200000
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
setUserNetworkInfo a $ UserNetworkInfo UNWifi True
noNetworkDelay a
setUserNetworkInfo a $ UserNetworkInfo UNCellular False
setUserNetworkInfo a $ UserNetworkInfo UNWifi False -- ingnored
noNetworkDelay a
threadDelay 100000
setUserNetworkInfo a $ UserNetworkInfo UNWifi False
networkDelay a 100000
setUserNetworkInfo a $ UserNetworkInfo UNNone False
networkDelay a 100000
setUserNetworkInfo a $ UserNetworkInfo UNWifi True
setUserNetworkInfo a $ UserNetworkInfo UNNone False -- ingnored
noNetworkDelay a
where
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 100000, increaseAfter = 0, maxInterval = 200000}}
aCfg = agentCfg {userNetworkInterval = 100000, userOfflineDelay = 0.1}
testResumeMultipleThreads :: IO ()
testResumeMultipleThreads = do
@@ -2758,14 +2757,14 @@ testResumeMultipleThreads = do
threadDelay 1000000
setUserNetworkInfo a $ UserNetworkInfo UNCellular True
ts <- mapM (atomically . readTMVar) vs
print $ minimum ts
print $ maximum ts
print $ sum ts `div` fromIntegral (length ts)
-- print $ minimum ts
-- print $ maximum ts
-- print $ sum ts `div` fromIntegral (length ts)
let average = sum ts `div` fromIntegral (length ts)
average < 3000000 `shouldBe` True
maximum ts < 4000000 `shouldBe` True
where
aCfg = agentCfg {userNetworkInterval = RetryInterval {initialInterval = 1000000, increaseAfter = 0, maxInterval = 3600_000_000}}
aCfg = agentCfg {userOfflineDelay = 0}
noNetworkDelay :: AgentClient -> IO ()
noNetworkDelay a = do
+5 -5
View File
@@ -709,15 +709,15 @@ testGetNextRcvChunkToDownload st = do
withTransaction st $ \db -> do
Right Nothing <- getNextRcvChunkToDownload db xftpServer1 86400
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True
DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1"
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True
Left e <- getNextRcvChunkToDownload db xftpServer1 86400
show e `shouldContain` "ConversionFailed"
DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
Right (Just RcvFileChunk {rcvFileEntityId}) <- getNextRcvChunkToDownload db xftpServer1 86400
Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400
rcvFileEntityId `shouldBe` fId2
testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation
@@ -726,10 +726,10 @@ testGetNextRcvFileToDecrypt st = do
withTransaction st $ \db -> do
Right Nothing <- getNextRcvFileToDecrypt db 86400
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True
DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 1"
DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1"
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) True
DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 2"
Left e <- getNextRcvFileToDecrypt db 86400
+2 -2
View File
@@ -120,7 +120,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
-- send via proxy to unsecured queue
Right () <- proxySMPMessage pc sess Nothing sndId noMsgFlags msg
-- receive 1
(_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})))]) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId encBody `shouldBe` Right msg
ackSMPMessage rc rPriv rcvId msgId
-- secure queue
@@ -129,7 +129,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
-- send via proxy to secured queue
Right () <- proxySMPMessage pc sess (Just sPriv) sndId noMsgFlags msg'
-- receive 2
(_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})))]) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId' encBody' `shouldBe` Right msg'
ackSMPMessage rc rPriv rcvId msgId'
+3 -3
View File
@@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1
@@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3
@@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5
+14 -15
View File
@@ -179,7 +179,7 @@ testXFTPAgentSendReceiveRedirect = withXFTPServer $ do
withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> do
FileDescriptionURI {description} <- either fail pure $ strDecode uri
rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing
rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing True
rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 65536 totalSize) -- extra RFPROG before switching to real file
rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 4194304 totalSize)
rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 8388608 totalSize)
@@ -223,7 +223,7 @@ testXFTPAgentSendReceiveNoRedirect = withXFTPServer $ do
FileDescriptionURI {description} <- either fail pure $ strDecode uri
let ValidFileDescription FileDescription {redirect} = description
redirect `shouldBe` Nothing
rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing
rcvFileId <- runRight $ xftpReceiveFile rcp 1 description Nothing True
-- NO extra "RFPROG 65k 65k" before switching to real file
rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 4194304 totalSize)
rfGet rcp `shouldReturn` ("", rcvFileId, RFPROG 5242880 totalSize)
@@ -311,7 +311,7 @@ testReceive' rcp rfd originalFilePath = testReceiveCF' rcp rfd Nothing originalF
testReceiveCF' :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> FilePath -> Int64 -> ExceptT AgentErrorType IO RcvFileId
testReceiveCF' rcp rfd cfArgs originalFilePath size = do
rfId <- xftpReceiveFile rcp 1 rfd cfArgs
rfId <- xftpReceiveFile rcp 1 rfd cfArgs True
rfProgress rcp size
("", rfId', RFDONE path) <- rfGet rcp
liftIO $ do
@@ -336,7 +336,7 @@ testXFTPAgentReceiveRestore = do
-- receive file - should not succeed with server down
rfId <- withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
rfId <- xftpReceiveFile rcp 1 rfd Nothing
rfId <- xftpReceiveFile rcp 1 rfd Nothing True
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
pure rfId
@@ -380,7 +380,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
-- receive file - should not succeed with server down
rfId <- withAgent 2 agentCfg initAgentServers testDB2 $ \rcp -> runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
rfId <- xftpReceiveFile rcp 1 rfd Nothing
rfId <- xftpReceiveFile rcp 1 rfd Nothing True
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
pure rfId
@@ -392,8 +392,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
-- receive file - should fail with AUTH error
withAgent 3 agentCfg initAgentServers testDB2 $ \rcp' -> do
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
rfGet rcp'
("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <- rfGet rcp'
rfId' `shouldBe` rfId
-- tmp path should be removed after permanent error
@@ -507,8 +506,8 @@ testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $
-- receive file - should fail with AUTH error
withAgent 3 agentCfg initAgentServers testDB2 $ \rcp2 -> runRight $ do
xftpStartWorkers rcp2 (Just recipientFiles)
rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing
("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing True
("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <-
rfGet rcp2
liftIO $ rfId' `shouldBe` rfId
@@ -545,8 +544,8 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do
-- receive file - should fail with AUTH error
withAgent 5 agentCfg initAgentServers testDB3 $ \rcp2 -> runRight $ do
xftpStartWorkers rcp2 (Just recipientFiles)
rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing
("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
rfId <- xftpReceiveFile rcp2 1 rfd2 Nothing True
("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <-
rfGet rcp2
liftIO $ rfId' `shouldBe` rfId
@@ -580,8 +579,8 @@ testXFTPAgentDeleteOnServer = withGlobalLogging logCfgNoLogs $
runRight_ . void $ do
-- receive file 1 again
rfId1 <- xftpReceiveFile rcp 1 rfd1_2 Nothing
("", rfId1', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
rfId1 <- xftpReceiveFile rcp 1 rfd1_2 Nothing True
("", rfId1', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <-
rfGet rcp
liftIO $ rfId1 `shouldBe` rfId1'
@@ -613,8 +612,8 @@ testXFTPAgentExpiredOnServer = withGlobalLogging logCfgNoLogs $ do
-- receive file 1 again - should fail with AUTH error
runRight $ do
rfId <- xftpReceiveFile rcp 1 rfd1_2 Nothing
("", rfId', RFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
rfId <- xftpReceiveFile rcp 1 rfd1_2 Nothing True
("", rfId', RFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <-
rfGet rcp
liftIO $ rfId' `shouldBe` rfId