mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 20:34:58 +00:00
xftp: agent API for xftp commands and events (#684)
* xftp: agent API for xftp commands and events * fix tests * fix tests 2 * xftp: update agent send api * update API to make temp path optional * revert tmp path changes (fixes send) --------- Co-authored-by: spacedandy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
3fe1f3c822
commit
ddc2da8fe4
@@ -29,6 +29,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Int (Int64)
|
||||
import Data.List (isSuffixOf, partition)
|
||||
@@ -36,7 +37,7 @@ import Data.List.NonEmpty (nonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFile)
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI)
|
||||
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
|
||||
import Simplex.FileTransfer.Types
|
||||
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
|
||||
@@ -44,25 +45,26 @@ import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (XFTPServer)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (tshow)
|
||||
import Simplex.Messaging.Util (liftIOEither, tshow)
|
||||
import System.FilePath (takeFileName, (</>))
|
||||
import UnliftIO
|
||||
import UnliftIO.Concurrent
|
||||
import UnliftIO.Directory
|
||||
import qualified UnliftIO.Exception as E
|
||||
|
||||
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> FilePath -> m RcvFileId
|
||||
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpPath = do
|
||||
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId
|
||||
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath = do
|
||||
g <- asks idsDrg
|
||||
encPath <- uniqueCombine xftpPath "xftp.encrypted"
|
||||
workPath <- maybe getTemporaryDirectory pure xftpWorkPath
|
||||
encPath <- uniqueCombine workPath "xftp.encrypted"
|
||||
createDirectory encPath
|
||||
fId <- withStore c $ \db -> createRcvFile db g userId fd xftpPath encPath
|
||||
fId <- withStore c $ \db -> createRcvFile db g userId fd workPath encPath
|
||||
forM_ chunks downloadChunk
|
||||
pure fId
|
||||
where
|
||||
@@ -186,9 +188,10 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
|
||||
pure $ path : ps
|
||||
getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) =
|
||||
throwError $ INTERNAL "no chunk path"
|
||||
-- TODO refactor with decrypt in CLI, streaming decryption
|
||||
decrypt :: Int64 -> [FilePath] -> m FilePath
|
||||
decrypt encSize chunkPaths = do
|
||||
lazyChunks <- readChunks chunkPaths
|
||||
lazyChunks <- liftIO $ readChunks chunkPaths
|
||||
(authOk, f) <- liftEither . first cryptoError $ LC.sbDecryptTailTag key nonce (encSize - authTagSize) lazyChunks
|
||||
let (fileHdr, f') = LB.splitAt 1024 f
|
||||
-- withFile encPath ReadMode $ \r -> do
|
||||
@@ -205,30 +208,25 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
|
||||
removeFile path
|
||||
throwError $ INTERNAL "Error decrypting file: incorrect auth tag"
|
||||
pure path
|
||||
readChunks :: [FilePath] -> m LB.ByteString
|
||||
readChunks =
|
||||
foldM
|
||||
( \s path -> do
|
||||
chunk <- liftIO $ LB.readFile path
|
||||
pure $ s <> chunk
|
||||
)
|
||||
LB.empty
|
||||
readChunks :: [FilePath] -> IO LB.ByteString
|
||||
readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) ""
|
||||
|
||||
sendFileExperimental :: forall m. AgentMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId
|
||||
sendFileExperimental AgentClient {subQ} _userId numRecipients xftpPath filePath = do
|
||||
sendFileExperimental :: forall m. AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId
|
||||
sendFileExperimental AgentClient {subQ} _userId filePath numRecipients xftpWorkPath = do
|
||||
g <- asks idsDrg
|
||||
sndFileEntityId <- liftIO $ randomId g 12
|
||||
void $ forkIO $ sendCLI sndFileEntityId
|
||||
pure sndFileEntityId
|
||||
sndFileId <- liftIO $ randomId g 12
|
||||
void $ forkIO $ sendCLI sndFileId
|
||||
pure sndFileId
|
||||
where
|
||||
randomId :: TVar ChaChaDRG -> Int -> IO ByteString
|
||||
randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n)
|
||||
sendCLI :: SndFileId -> m ()
|
||||
sendCLI sndFileEntityId = do
|
||||
sendCLI sndFileId = do
|
||||
let fileName = takeFileName filePath
|
||||
outputDir <- uniqueCombine xftpPath (fileName <> ".descr")
|
||||
workPath <- maybe getTemporaryDirectory pure xftpWorkPath
|
||||
outputDir <- uniqueCombine workPath $ fileName <> ".descr"
|
||||
createDirectory outputDir
|
||||
let tempPath = xftpPath </> "snd"
|
||||
let tempPath = workPath </> "snd"
|
||||
createDirectoryIfMissing False tempPath
|
||||
let sendOptions =
|
||||
SendOptions
|
||||
@@ -242,20 +240,21 @@ sendFileExperimental AgentClient {subQ} _userId numRecipients xftpPath filePath
|
||||
}
|
||||
liftCLI $ cliSendFile sendOptions
|
||||
(sndDescr, rcvDescrs) <- readDescrs outputDir fileName
|
||||
notify sndFileEntityId $ SFDONE sndDescr rcvDescrs
|
||||
notify sndFileId $ SFDONE sndDescr rcvDescrs
|
||||
liftCLI :: ExceptT CLIError IO () -> m ()
|
||||
liftCLI = either (throwError . INTERNAL . show) pure <=< liftIO . runExceptT
|
||||
readDescrs :: FilePath -> FilePath -> m (String, [String])
|
||||
readDescrs :: FilePath -> FilePath -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
|
||||
readDescrs outDir fileName = do
|
||||
let descrDir = outDir </> (fileName <> ".xftp")
|
||||
files <- listDirectory descrDir
|
||||
let (sdFiles, rdFiles) = partition ("snd.xftp.private" `isSuffixOf`) files
|
||||
sdFile = maybe "" (\l -> descrDir </> L.head l) (nonEmpty sdFiles)
|
||||
rdFiles' = map (descrDir </>) rdFiles
|
||||
-- TODO map files to contents
|
||||
pure (sdFile, rdFiles')
|
||||
(,) <$> readDescr sdFile <*> mapM readDescr rdFiles'
|
||||
readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p)
|
||||
readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f
|
||||
notify :: forall e. AEntityI e => SndFileId -> ACommand 'Agent e -> m ()
|
||||
notify sndFileEntityId cmd = atomically $ writeTBQueue subQ ("", sndFileEntityId, APC (sAEntity @e) cmd)
|
||||
notify sndFileId cmd = atomically $ writeTBQueue subQ ("", sndFileId, APC (sAEntity @e) cmd)
|
||||
|
||||
-- _sendFile :: AgentMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId
|
||||
_sendFile :: AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId
|
||||
|
||||
@@ -458,7 +458,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
|
||||
throwError $ CLIError "Error decrypting file: incorrect auth tag"
|
||||
pure path
|
||||
readChunks :: [FilePath] -> IO LB.ByteString
|
||||
readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) LB.empty
|
||||
readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) ""
|
||||
{-# NOINLINE readChunks #-}
|
||||
getFilePath :: String -> ExceptT CLIError IO FilePath
|
||||
getFilePath name =
|
||||
@@ -523,9 +523,8 @@ strEnc :: StrEncoding a => a -> String
|
||||
strEnc = B.unpack . strEncode
|
||||
|
||||
getFileDescription :: FilePath -> ExceptT CLIError IO AValidFileDescription
|
||||
getFileDescription path = do
|
||||
fd <- ExceptT $ first (CLIError . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path
|
||||
liftEither . first CLIError $ validateFileDescription fd
|
||||
getFileDescription path =
|
||||
ExceptT $ first (CLIError . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path
|
||||
|
||||
getFileDescription' :: FilePartyI p => FilePath -> ExceptT CLIError IO (ValidFileDescription p)
|
||||
getFileDescription' path =
|
||||
|
||||
@@ -27,6 +27,7 @@ module Simplex.FileTransfer.Description
|
||||
validateFileDescription,
|
||||
groupReplicasByServer,
|
||||
replicaServer,
|
||||
fdSeparator,
|
||||
kb,
|
||||
mb,
|
||||
gb,
|
||||
@@ -75,6 +76,7 @@ data FileDescription (p :: FileParty) = FileDescription
|
||||
data AFileDescription = forall p. FilePartyI p => AFD (FileDescription p)
|
||||
|
||||
newtype ValidFileDescription p = ValidFD (FileDescription p)
|
||||
deriving (Eq, Show)
|
||||
|
||||
pattern ValidFileDescription :: FileDescription p -> ValidFileDescription p
|
||||
pattern ValidFileDescription fd = ValidFD fd
|
||||
@@ -83,6 +85,9 @@ pattern ValidFileDescription fd = ValidFD fd
|
||||
|
||||
data AValidFileDescription = forall p. FilePartyI p => AVFD (ValidFileDescription p)
|
||||
|
||||
fdSeparator :: IsString s => s
|
||||
fdSeparator = "################################\n"
|
||||
|
||||
newtype FileDigest = FileDigest {unFileDigest :: ByteString}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -170,6 +175,16 @@ data FileServerReplica = FileServerReplica
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
instance FilePartyI p => StrEncoding (ValidFileDescription p) where
|
||||
strEncode (ValidFD fd) = strEncode fd
|
||||
strDecode s = strDecode s >>= (\(AVFD fd) -> checkParty fd)
|
||||
strP = strDecode <$?> A.takeByteString
|
||||
|
||||
instance StrEncoding AValidFileDescription where
|
||||
strEncode (AVFD fd) = strEncode fd
|
||||
strDecode = validateFileDescription <=< strDecode
|
||||
strP = strDecode <$?> A.takeByteString
|
||||
|
||||
instance FilePartyI p => StrEncoding (FileDescription p) where
|
||||
strEncode = Y.encode . encodeFileDescription
|
||||
strDecode s = strDecode s >>= (\(AFD fd) -> checkParty fd)
|
||||
|
||||
@@ -339,12 +339,12 @@ toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m
|
||||
toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c
|
||||
|
||||
-- | Receive XFTP file
|
||||
xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> FilePath -> m RcvFileId
|
||||
xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId
|
||||
xftpReceiveFile c = withAgentEnv c .:. receiveFile c
|
||||
|
||||
-- | Send XFTP file
|
||||
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId
|
||||
xftpSendFile c userId n xftpPath filePath = withAgentEnv c $ sendFileExperimental c userId n xftpPath filePath
|
||||
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId
|
||||
xftpSendFile c = withAgentEnv c .:: sendFileExperimental c
|
||||
|
||||
-- | Activate operations
|
||||
activateAgent :: AgentErrorMonad m => AgentClient -> m ()
|
||||
|
||||
@@ -441,8 +441,8 @@ reconnectSMPClient c tSess@(_, srv, _) =
|
||||
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
|
||||
liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
|
||||
mapM_ (throwError . snd) $ listToMaybe tempErrs
|
||||
notifySub :: ConnId -> ACommand 'Agent 'AEConn -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC SAEConn cmd)
|
||||
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
|
||||
getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient
|
||||
getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do
|
||||
|
||||
@@ -44,7 +44,6 @@ import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConf
|
||||
import Simplex.FileTransfer.Types (DBSndFileId)
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store (UserId)
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import Simplex.Messaging.Client
|
||||
|
||||
@@ -14,7 +14,7 @@ createLock :: STM Lock
|
||||
createLock = newEmptyTMVar
|
||||
{-# INLINE createLock #-}
|
||||
|
||||
withLock :: MonadUnliftIO m => TMVar String -> String -> m a -> m a
|
||||
withLock :: MonadUnliftIO m => Lock -> String -> m a -> m a
|
||||
withLock lock name =
|
||||
E.bracket_
|
||||
(atomically $ putTMVar lock name)
|
||||
|
||||
@@ -104,6 +104,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
MsgIntegrity (..),
|
||||
MsgErrorType (..),
|
||||
QueueStatus (..),
|
||||
UserId,
|
||||
ACorrId,
|
||||
AgentMsgId,
|
||||
NotificationsMode (..),
|
||||
@@ -163,7 +164,8 @@ import Database.SQLite.Simple.FromField
|
||||
import Database.SQLite.Simple.ToField
|
||||
import GHC.Generics (Generic)
|
||||
import Generic.Random (genericArbitraryU)
|
||||
import Simplex.FileTransfer.Protocol (XFTPErrorType)
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType)
|
||||
import Simplex.Messaging.Agent.QueryString
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri)
|
||||
@@ -224,6 +226,8 @@ type ATransmission p = (ACorrId, EntityId, APartyCmd p)
|
||||
-- | SMP agent protocol transmission or transmission error.
|
||||
type ATransmissionOrError p = (ACorrId, EntityId, Either AgentErrorType (APartyCmd p))
|
||||
|
||||
type UserId = Int64
|
||||
|
||||
type ACorrId = ByteString
|
||||
|
||||
-- | SMP agent protocol participants.
|
||||
@@ -311,8 +315,8 @@ data ACommand (p :: AParty) (e :: AEntity) where
|
||||
END :: ACommand Agent AEConn
|
||||
CONNECT :: AProtocolType -> TransportHost -> ACommand Agent AENone
|
||||
DISCONNECT :: AProtocolType -> TransportHost -> ACommand Agent AENone
|
||||
DOWN :: SMPServer -> [ConnId] -> ACommand Agent AEConn
|
||||
UP :: SMPServer -> [ConnId] -> ACommand Agent AEConn
|
||||
DOWN :: SMPServer -> [ConnId] -> ACommand Agent AENone
|
||||
UP :: SMPServer -> [ConnId] -> ACommand Agent AENone
|
||||
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> ACommand Agent AEConn
|
||||
SEND :: MsgFlags -> MsgBody -> ACommand Client AEConn
|
||||
MID :: AgentMsgId -> ACommand Agent AEConn
|
||||
@@ -335,7 +339,8 @@ data ACommand (p :: AParty) (e :: AEntity) where
|
||||
RFPROG :: Int -> Int -> ACommand Agent AERcvFile
|
||||
RFDONE :: FilePath -> ACommand Agent AERcvFile
|
||||
RFERR :: AgentErrorType -> ACommand Agent AERcvFile
|
||||
SFDONE :: String -> [String] -> ACommand Agent AESndFile
|
||||
SFPROG :: Int -> Int -> ACommand Agent AESndFile
|
||||
SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> ACommand Agent AESndFile
|
||||
|
||||
deriving instance Eq (ACommand p e)
|
||||
|
||||
@@ -367,8 +372,8 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
|
||||
END_ :: ACommandTag Agent AEConn
|
||||
CONNECT_ :: ACommandTag Agent AENone
|
||||
DISCONNECT_ :: ACommandTag Agent AENone
|
||||
DOWN_ :: ACommandTag Agent AEConn
|
||||
UP_ :: ACommandTag Agent AEConn
|
||||
DOWN_ :: ACommandTag Agent AENone
|
||||
UP_ :: ACommandTag Agent AENone
|
||||
SWITCH_ :: ACommandTag Agent AEConn
|
||||
SEND_ :: ACommandTag Client AEConn
|
||||
MID_ :: ACommandTag Agent AEConn
|
||||
@@ -391,6 +396,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
|
||||
RFDONE_ :: ACommandTag Agent AERcvFile
|
||||
RFPROG_ :: ACommandTag Agent AERcvFile
|
||||
RFERR_ :: ACommandTag Agent AERcvFile
|
||||
SFPROG_ :: ACommandTag Agent AESndFile
|
||||
SFDONE_ :: ACommandTag Agent AESndFile
|
||||
|
||||
deriving instance Eq (ACommandTag p e)
|
||||
@@ -439,6 +445,7 @@ aCommandTag = \case
|
||||
RFPROG {} -> RFPROG_
|
||||
RFDONE {} -> RFDONE_
|
||||
RFERR {} -> RFERR_
|
||||
SFPROG {} -> SFPROG_
|
||||
SFDONE {} -> SFDONE_
|
||||
|
||||
data QueueDirection = QDRcv | QDSnd
|
||||
@@ -1305,10 +1312,10 @@ instance StrEncoding ACmdTag where
|
||||
"CON" -> ct CON_
|
||||
"SUB" -> t SUB_
|
||||
"END" -> ct END_
|
||||
"CONNECT" -> at SAENone CONNECT_
|
||||
"DISCONNECT" -> at SAENone DISCONNECT_
|
||||
"DOWN" -> ct DOWN_
|
||||
"UP" -> ct UP_
|
||||
"CONNECT" -> nt CONNECT_
|
||||
"DISCONNECT" -> nt DISCONNECT_
|
||||
"DOWN" -> nt DOWN_
|
||||
"UP" -> nt UP_
|
||||
"SWITCH" -> ct SWITCH_
|
||||
"SEND" -> t SEND_
|
||||
"MID" -> ct MID_
|
||||
@@ -1321,20 +1328,23 @@ instance StrEncoding ACmdTag where
|
||||
"DEL" -> t DEL_
|
||||
"DEL_RCVQ" -> ct DEL_RCVQ_
|
||||
"DEL_CONN" -> ct DEL_CONN_
|
||||
"DEL_USER" -> at SAENone DEL_USER_
|
||||
"DEL_USER" -> nt DEL_USER_
|
||||
"CHK" -> t CHK_
|
||||
"STAT" -> ct STAT_
|
||||
"OK" -> ct OK_
|
||||
"ERR" -> ct ERR_
|
||||
"SUSPENDED" -> at SAENone SUSPENDED_
|
||||
"SUSPENDED" -> nt SUSPENDED_
|
||||
"RFPROG" -> at SAERcvFile RFPROG_
|
||||
"RFDONE" -> at SAERcvFile RFDONE_
|
||||
"RFERR" -> at SAERcvFile RFERR_
|
||||
"SFPROG" -> at SAESndFile SFPROG_
|
||||
"SFDONE" -> at SAESndFile SFDONE_
|
||||
_ -> fail "bad ACmdTag"
|
||||
where
|
||||
t = pure . ACmdTag SClient SAEConn
|
||||
at e = pure . ACmdTag SAgent e
|
||||
ct = at SAEConn
|
||||
nt = at SAENone
|
||||
|
||||
instance APartyI p => StrEncoding (APartyCmdTag p) where
|
||||
strEncode (APCT _ cmd) = strEncode cmd
|
||||
@@ -1379,6 +1389,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where
|
||||
RFPROG_ -> "RFPROG"
|
||||
RFDONE_ -> "RFDONE"
|
||||
RFERR_ -> "RFERR"
|
||||
SFPROG_ -> "SFPROG"
|
||||
SFDONE_ -> "SFDONE"
|
||||
strP = (\(APCT _ t) -> checkEntity t) <$?> strP
|
||||
|
||||
@@ -1438,14 +1449,19 @@ commandP binaryP =
|
||||
RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal)
|
||||
RFDONE_ -> s (RFDONE <$> strP)
|
||||
RFERR_ -> s (RFERR <$> strP)
|
||||
SFDONE_ -> s (SFDONE <$> strP <*> rcvDescrs)
|
||||
SFPROG_ -> s (SFPROG <$> A.decimal <* A.space <*> A.decimal)
|
||||
SFDONE_ -> s (sfDone . safeDecodeUtf8 <$?> binaryP)
|
||||
where
|
||||
s :: Parser a -> Parser a
|
||||
s p = A.space *> p
|
||||
connections :: Parser [ConnId]
|
||||
connections = strP `A.sepBy'` A.char ','
|
||||
rcvDescrs :: Parser [String]
|
||||
rcvDescrs = strP `A.sepBy'` A.char ',' -- TODO consider separator
|
||||
sfDone :: Text -> Either String (ACommand 'Agent 'AESndFile)
|
||||
sfDone t =
|
||||
let ds = T.splitOn fdSeparator t
|
||||
in case ds of
|
||||
[] -> Left "no sender file description"
|
||||
sd : rds -> SFDONE <$> strDecode (encodeUtf8 sd) <*> mapM (strDecode . encodeUtf8) rds
|
||||
msgMetaP = do
|
||||
integrity <- strP
|
||||
recipient <- " R=" *> partyMeta A.decimal
|
||||
@@ -1497,7 +1513,8 @@ serializeCommand = \case
|
||||
RFPROG rcvd total -> s (RFPROG_, rcvd, total)
|
||||
RFDONE fPath -> s (RFDONE_, fPath)
|
||||
RFERR e -> s (RFERR_, e)
|
||||
SFDONE sd rds -> B.unwords [s SFDONE_, s sd, rcvDescrs rds]
|
||||
SFPROG sent total -> s (SFPROG_, sent, total)
|
||||
SFDONE sd rds -> B.unwords [s SFDONE_, serializeBinary (sfDone sd rds)]
|
||||
where
|
||||
s :: StrEncoding a => a -> ByteString
|
||||
s = strEncode
|
||||
@@ -1505,8 +1522,7 @@ serializeCommand = \case
|
||||
showTs = B.pack . formatISO8601Millis
|
||||
connections :: [ConnId] -> ByteString
|
||||
connections = B.intercalate "," . map strEncode
|
||||
rcvDescrs :: [String] -> ByteString
|
||||
rcvDescrs = B.intercalate "," . map strEncode -- TODO consider separator
|
||||
sfDone sd rds = B.intercalate fdSeparator $ strEncode sd : map strEncode rds
|
||||
serializeMsgMeta :: MsgMeta -> ByteString
|
||||
serializeMsgMeta MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} =
|
||||
B.unwords
|
||||
|
||||
@@ -264,8 +264,6 @@ data PendingCommand = PendingCommand
|
||||
|
||||
data AgentCmdType = ACClient | ACInternal
|
||||
|
||||
type UserId = Int64
|
||||
|
||||
instance StrEncoding AgentCmdType where
|
||||
strEncode = \case
|
||||
ACClient -> "CLIENT"
|
||||
|
||||
@@ -16,8 +16,8 @@ import Control.Concurrent.STM
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Simplex.Messaging.Agent.Protocol (ConnId)
|
||||
import Simplex.Messaging.Agent.Store (RcvQueue (..), UserId)
|
||||
import Simplex.Messaging.Agent.Protocol (ConnId, UserId)
|
||||
import Simplex.Messaging.Agent.Store (RcvQueue (..))
|
||||
import Simplex.Messaging.Protocol (RecipientId, SMPServer)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
|
||||
+36
-18
@@ -92,19 +92,31 @@ tGetAgent = tGetAgent'
|
||||
|
||||
tGetAgent' :: forall c e. (Transport c, AEntityI e) => c -> IO (AEntityTransmissionOrError 'Agent e)
|
||||
tGetAgent' h = do
|
||||
(corrId, connId, cmdOrErr) <- tGet SAgent h
|
||||
(corrId, connId, cmdOrErr) <- pGetAgent h
|
||||
case cmdOrErr of
|
||||
Right (APC _ CONNECT {}) -> tGetAgent' h
|
||||
Right (APC _ DISCONNECT {}) -> tGetAgent' h
|
||||
Right (APC e cmd) -> case testEquality e (sAEntity @e) of
|
||||
Just Refl -> pure (corrId, connId, Right cmd)
|
||||
_ -> error $ "unexpected command " <> show cmd
|
||||
Left err -> pure (corrId, connId, Left err)
|
||||
|
||||
pGetAgent :: forall c. Transport c => c -> IO (ATransmissionOrError 'Agent)
|
||||
pGetAgent h = do
|
||||
(corrId, connId, cmdOrErr) <- tGet SAgent h
|
||||
case cmdOrErr of
|
||||
Right (APC _ CONNECT {}) -> pGetAgent h
|
||||
Right (APC _ DISCONNECT {}) -> pGetAgent h
|
||||
cmd -> pure (corrId, connId, cmd)
|
||||
|
||||
-- | receive message to handle `h`
|
||||
(<#:) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn)
|
||||
(<#:) = tGetAgent
|
||||
|
||||
(<#:?) :: Transport c => c -> IO (ATransmissionOrError 'Agent)
|
||||
(<#:?) = pGetAgent
|
||||
|
||||
(<#:.) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AENone)
|
||||
(<#:.) = tGetAgent'
|
||||
|
||||
-- | send transmission `t` to handle `h` and get response
|
||||
(#:) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn)
|
||||
h #: t = tPutRaw h t >> (<#:) h
|
||||
@@ -119,7 +131,7 @@ action #> (corrId, connId, cmd) = action `shouldReturn` (corrId, connId, Right c
|
||||
(=#>) :: IO (AEntityTransmissionOrError 'Agent 'AEConn) -> (AEntityTransmission 'Agent 'AEConn -> Bool) -> Expectation
|
||||
action =#> p = action >>= (`shouldSatisfy` p . correctTransmission)
|
||||
|
||||
correctTransmission :: AEntityTransmissionOrError p e -> AEntityTransmission p e
|
||||
correctTransmission :: (ACorrId, ConnId, Either AgentErrorType cmd) -> (ACorrId, ConnId, cmd)
|
||||
correctTransmission (corrId, connId, cmdOrErr) = case cmdOrErr of
|
||||
Right cmd -> (corrId, connId, cmd)
|
||||
Left e -> error $ show e
|
||||
@@ -128,10 +140,16 @@ correctTransmission (corrId, connId, cmdOrErr) = case cmdOrErr of
|
||||
(<#) :: Transport c => c -> AEntityTransmission 'Agent 'AEConn -> Expectation
|
||||
h <# (corrId, connId, cmd) = (h <#:) `shouldReturn` (corrId, connId, Right cmd)
|
||||
|
||||
(<#.) :: Transport c => c -> AEntityTransmission 'Agent 'AENone -> Expectation
|
||||
h <#. (corrId, connId, cmd) = (h <#:.) `shouldReturn` (corrId, connId, Right cmd)
|
||||
|
||||
-- | receive message to handle `h` and validate it using predicate `p`
|
||||
(<#=) :: Transport c => c -> (AEntityTransmission 'Agent 'AEConn -> Bool) -> Expectation
|
||||
h <#= p = (h <#:) >>= (`shouldSatisfy` p . correctTransmission)
|
||||
|
||||
(<#=?) :: Transport c => c -> (ATransmission 'Agent -> Bool) -> Expectation
|
||||
h <#=? p = (h <#:?) >>= (`shouldSatisfy` p . correctTransmission)
|
||||
|
||||
-- | test that nothing is delivered to handle `h` during 10ms
|
||||
(#:#) :: Transport c => c -> String -> Expectation
|
||||
h #:# err = tryGet `shouldReturn` ()
|
||||
@@ -141,7 +159,7 @@ h #:# err = tryGet `shouldReturn` ()
|
||||
Just _ -> error err
|
||||
_ -> return ()
|
||||
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent 'AEConn
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent e
|
||||
pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody
|
||||
|
||||
testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO ()
|
||||
@@ -300,7 +318,7 @@ testSubscrNotification t (server, _) client = do
|
||||
client #: ("1", "conn1", "NEW T INV") =#> \case ("1", "conn1", INV {}) -> True; _ -> False
|
||||
client #:# "nothing should be delivered to client before the server is killed"
|
||||
killThread server
|
||||
client <# ("", "", DOWN testSMPServer ["conn1"])
|
||||
client <#. ("", "", DOWN testSMPServer ["conn1"])
|
||||
withSmpServer (ATransport t) $
|
||||
client <# ("", "conn1", ERR (SMP AUTH)) -- this new server does not have the queue
|
||||
|
||||
@@ -314,15 +332,15 @@ testMsgDeliveryServerRestart t alice bob = do
|
||||
alice #: ("11", "bob", "ACK 4") #> ("11", "bob", OK)
|
||||
alice #:# "nothing else delivered before the server is killed"
|
||||
|
||||
let server = (SMPServer "localhost" testPort2 testKeyHash)
|
||||
alice <# ("", "", DOWN server ["bob"])
|
||||
let server = SMPServer "localhost" testPort2 testKeyHash
|
||||
alice <#. ("", "", DOWN server ["bob"])
|
||||
bob #: ("2", "alice", "SEND F 11\nhello again") #> ("2", "alice", MID 5)
|
||||
bob #:# "nothing else delivered before the server is restarted"
|
||||
alice #:# "nothing else delivered before the server is restarted"
|
||||
|
||||
withServer $ do
|
||||
bob <# ("", "alice", SENT 5)
|
||||
alice <# ("", "", UP server ["bob"])
|
||||
alice <#. ("", "", UP server ["bob"])
|
||||
alice <#= \case ("", "bob", Msg "hello again") -> True; _ -> False
|
||||
alice #: ("12", "bob", "ACK 5") #> ("12", "bob", OK)
|
||||
|
||||
@@ -337,8 +355,8 @@ testServerConnectionAfterError t _ = do
|
||||
withServer $ do
|
||||
connect (bob, "bob") (alice, "alice")
|
||||
|
||||
bob <# ("", "", DOWN server ["alice"])
|
||||
alice <# ("", "", DOWN server ["bob"])
|
||||
bob <#. ("", "", DOWN server ["alice"])
|
||||
alice <#. ("", "", DOWN server ["bob"])
|
||||
alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 4)
|
||||
alice #:# "nothing else delivered before the server is restarted"
|
||||
bob #:# "nothing else delivered before the server is restarted"
|
||||
@@ -348,10 +366,10 @@ testServerConnectionAfterError t _ = do
|
||||
bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
|
||||
alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
|
||||
withServer $ do
|
||||
alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False
|
||||
alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False
|
||||
bob <#= \case ("", "alice", Msg "hello") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False
|
||||
bob <#= \case ("", "alice", Msg "hello") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False
|
||||
alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
|
||||
alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
|
||||
bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
|
||||
bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
|
||||
bob #: ("2", "alice", "ACK 4") #> ("2", "alice", OK)
|
||||
alice #: ("1", "bob", "SEND F 11\nhello again") #> ("1", "bob", MID 5)
|
||||
alice <# ("", "bob", SENT 5)
|
||||
@@ -380,7 +398,7 @@ testMsgDeliveryAgentRestart t bob = do
|
||||
bob #: ("11", "alice", "ACK 4") #> ("11", "alice", OK)
|
||||
bob #:# "nothing else delivered before the server is down"
|
||||
|
||||
bob <# ("", "", DOWN server ["alice"])
|
||||
bob <#. ("", "", DOWN server ["alice"])
|
||||
alice #: ("2", "bob", "SEND F 11\nhello again") #> ("2", "bob", MID 5)
|
||||
alice #:# "nothing else delivered before the server is restarted"
|
||||
bob #:# "nothing else delivered before the server is restarted"
|
||||
@@ -393,8 +411,8 @@ testMsgDeliveryAgentRestart t bob = do
|
||||
(corrId == "3" && cmd == OK)
|
||||
|| (corrId == "" && cmd == SENT 5)
|
||||
_ -> False
|
||||
bob <#= \case ("", "alice", Msg "hello again") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False
|
||||
bob <#= \case ("", "alice", Msg "hello again") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False
|
||||
bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
|
||||
bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
|
||||
bob #: ("12", "alice", "ACK 5") #> ("12", "alice", OK)
|
||||
|
||||
removeFile testStoreLogFile
|
||||
|
||||
@@ -23,6 +23,7 @@ module AgentTests.FunctionalAPITests
|
||||
get',
|
||||
rfGet,
|
||||
sfGet,
|
||||
nGet,
|
||||
(##>),
|
||||
(=##>),
|
||||
pattern Msg,
|
||||
@@ -49,7 +50,6 @@ import Simplex.Messaging.Agent
|
||||
import Simplex.Messaging.Agent.Client (SMPTestFailure (..), SMPTestStep (..))
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..))
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store (UserId)
|
||||
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -68,7 +68,7 @@ type AEntityTransmission e = (ACorrId, ConnId, ACommand 'Agent e)
|
||||
(##>) :: (HasCallStack, MonadIO m) => m (AEntityTransmission e) -> AEntityTransmission e -> m ()
|
||||
a ##> t = a >>= \t' -> liftIO (t' `shouldBe` t)
|
||||
|
||||
(=##>) :: (HasCallStack, MonadIO m) => m (AEntityTransmission e) -> (AEntityTransmission e -> Bool) -> m ()
|
||||
(=##>) :: (Show a, HasCallStack, MonadIO m) => m a -> (a -> Bool) -> m ()
|
||||
a =##> p = a >>= \t -> liftIO (t `shouldSatisfy` p)
|
||||
|
||||
get :: MonadIO m => AgentClient -> m (AEntityTransmission 'AEConn)
|
||||
@@ -80,17 +80,25 @@ rfGet = get' @'AERcvFile
|
||||
sfGet :: MonadIO m => AgentClient -> m (AEntityTransmission 'AESndFile)
|
||||
sfGet = get' @'AESndFile
|
||||
|
||||
nGet :: MonadIO m => AgentClient -> m (AEntityTransmission 'AENone)
|
||||
nGet = get' @'AENone
|
||||
|
||||
get' :: forall e m. (MonadIO m, AEntityI e) => AgentClient -> m (AEntityTransmission e)
|
||||
get' c = do
|
||||
(corrId, connId, APC e cmd) <- atomically (readTBQueue $ subQ c)
|
||||
case cmd of
|
||||
CONNECT {} -> get' c
|
||||
DISCONNECT {} -> get' c
|
||||
_ -> case testEquality e (sAEntity @e) of
|
||||
Just Refl -> pure (corrId, connId, cmd)
|
||||
_ -> error $ "unexpected command " <> show cmd
|
||||
(corrId, connId, APC e cmd) <- pGet c
|
||||
case testEquality e (sAEntity @e) of
|
||||
Just Refl -> pure (corrId, connId, cmd)
|
||||
_ -> error $ "unexpected command " <> show cmd
|
||||
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent 'AEConn
|
||||
pGet :: forall m. MonadIO m => AgentClient -> m (ATransmission 'Agent)
|
||||
pGet c = do
|
||||
t@(_, _, APC _ cmd) <- atomically (readTBQueue $ subQ c)
|
||||
case cmd of
|
||||
CONNECT {} -> pGet c
|
||||
DISCONNECT {} -> pGet c
|
||||
_ -> pure t
|
||||
|
||||
pattern Msg :: MsgBody -> ACommand 'Agent e
|
||||
pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody
|
||||
|
||||
smpCfgV1 :: ProtocolClientConfig
|
||||
@@ -405,12 +413,12 @@ testAsyncServerOffline t = do
|
||||
runRight $ createConnection alice 1 True SCMInvitation Nothing
|
||||
-- connection fails
|
||||
Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True cReq "bob's connInfo"
|
||||
("", "", DOWN srv conns) <- get alice
|
||||
("", "", DOWN srv conns) <- nGet alice
|
||||
srv `shouldBe` testSMPServer
|
||||
conns `shouldBe` [bobId]
|
||||
-- connection succeeds after server start
|
||||
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
|
||||
("", "", UP srv1 conns1) <- get alice
|
||||
("", "", UP srv1 conns1) <- nGet alice
|
||||
liftIO $ do
|
||||
srv1 `shouldBe` testSMPServer
|
||||
conns1 `shouldBe` [bobId]
|
||||
@@ -457,8 +465,8 @@ testDuplicateMessage t = do
|
||||
|
||||
pure (aliceId, bobId, bob1)
|
||||
|
||||
get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
get bob1 =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
nGet bob1 =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
-- commenting two lines below and uncommenting further two lines would also runRight_,
|
||||
-- it is the scenario tested above, when the message was not acknowledged by the user
|
||||
threadDelay 200000
|
||||
@@ -502,7 +510,7 @@ testInactiveClientDisconnected t = do
|
||||
alice <- getSMPAgentClient agentCfg initAgentServers
|
||||
runRight_ $ do
|
||||
(connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing
|
||||
get alice ##> ("", "", DOWN testSMPServer [connId])
|
||||
nGet alice ##> ("", "", DOWN testSMPServer [connId])
|
||||
|
||||
testActiveClientNotDisconnected :: ATransport -> IO ()
|
||||
testActiveClientNotDisconnected t = do
|
||||
@@ -528,7 +536,7 @@ testActiveClientNotDisconnected t = do
|
||||
Nothing <- 800000 `timeout` get alice
|
||||
liftIO $ threadDelay 1200000
|
||||
-- and after 2 sec of inactivity DOWN is sent
|
||||
get alice ##> ("", "", DOWN testSMPServer [connId])
|
||||
nGet alice ##> ("", "", DOWN testSMPServer [connId])
|
||||
milliseconds ts = systemSeconds ts * 1000 + fromIntegral (systemNanoseconds ts `div` 1000000)
|
||||
|
||||
testSuspendingAgent :: IO ()
|
||||
@@ -562,21 +570,21 @@ testSuspendingAgentCompleteSending t = do
|
||||
pure (aId, bId)
|
||||
|
||||
runRight_ $ do
|
||||
("", "", DOWN {}) <- get a
|
||||
("", "", DOWN {}) <- get b
|
||||
("", "", DOWN {}) <- nGet a
|
||||
("", "", DOWN {}) <- nGet b
|
||||
5 <- sendMessage b aId SMP.noMsgFlags "hello too"
|
||||
6 <- sendMessage b aId SMP.noMsgFlags "how are you?"
|
||||
liftIO $ threadDelay 100000
|
||||
suspendAgent b 5000000
|
||||
|
||||
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do
|
||||
get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False
|
||||
get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False
|
||||
get b =##> \case ("", c, SENT 6) -> c == aId; ("", "", UP {}) -> True; _ -> False
|
||||
("", "", SUSPENDED) <- get' @'AENone b
|
||||
pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
|
||||
pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
|
||||
pGet b =##> \case ("", c, APC _ (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
|
||||
("", "", SUSPENDED) <- nGet b
|
||||
|
||||
get a =##> \case ("", c, Msg "hello too") -> c == bId; ("", "", UP {}) -> True; _ -> False
|
||||
get a =##> \case ("", c, Msg "hello too") -> c == bId; ("", "", UP {}) -> True; _ -> False
|
||||
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
|
||||
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
|
||||
ackMessage a bId 5
|
||||
get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False
|
||||
ackMessage a bId 6
|
||||
@@ -594,12 +602,12 @@ testSuspendingAgentTimeout t = do
|
||||
pure (aId, bId)
|
||||
|
||||
runRight_ $ do
|
||||
("", "", DOWN {}) <- get a
|
||||
("", "", DOWN {}) <- get b
|
||||
("", "", DOWN {}) <- nGet a
|
||||
("", "", DOWN {}) <- nGet b
|
||||
5 <- sendMessage b aId SMP.noMsgFlags "hello too"
|
||||
6 <- sendMessage b aId SMP.noMsgFlags "how are you?"
|
||||
suspendAgent b 100000
|
||||
("", "", SUSPENDED) <- get' @'AENone b
|
||||
("", "", SUSPENDED) <- nGet b
|
||||
pure ()
|
||||
|
||||
testBatchedSubscriptions :: ATransport -> IO ()
|
||||
@@ -614,15 +622,15 @@ testBatchedSubscriptions t = do
|
||||
delete b aIds'
|
||||
liftIO $ threadDelay 1000000
|
||||
pure conns
|
||||
("", "", DOWN {}) <- get a
|
||||
("", "", DOWN {}) <- get a
|
||||
("", "", DOWN {}) <- get b
|
||||
("", "", DOWN {}) <- get b
|
||||
("", "", DOWN {}) <- nGet a
|
||||
("", "", DOWN {}) <- nGet a
|
||||
("", "", DOWN {}) <- nGet b
|
||||
("", "", DOWN {}) <- nGet b
|
||||
runServers $ do
|
||||
("", "", UP {}) <- get a
|
||||
("", "", UP {}) <- get a
|
||||
("", "", UP {}) <- get b
|
||||
("", "", UP {}) <- get b
|
||||
("", "", UP {}) <- nGet a
|
||||
("", "", UP {}) <- nGet a
|
||||
("", "", UP {}) <- nGet b
|
||||
("", "", UP {}) <- nGet b
|
||||
liftIO $ threadDelay 1000000
|
||||
let (aIds, bIds) = unzip conns
|
||||
conns' = drop 10 conns
|
||||
@@ -795,7 +803,7 @@ testUsers = do
|
||||
deleteUser a auId True
|
||||
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId'; _ -> False
|
||||
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
|
||||
get' @'AENone a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
|
||||
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
|
||||
exchangeGreetingsMsgId 6 a bId b aId
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
|
||||
@@ -824,18 +832,18 @@ testUsersNoServer t = do
|
||||
(aId', bId') <- makeConnectionForUsers a auId b 1
|
||||
exchangeGreetingsMsgId 4 a bId' b aId'
|
||||
pure (aId, bId, auId, aId', bId')
|
||||
get a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False
|
||||
get a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False
|
||||
get b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
|
||||
nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False
|
||||
nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False
|
||||
nGet b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
|
||||
runRight_ $ do
|
||||
deleteUser a auId True
|
||||
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
|
||||
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
|
||||
get' @'AENone a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
|
||||
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
|
||||
get a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False
|
||||
get b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False
|
||||
nGet a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False
|
||||
nGet b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False
|
||||
exchangeGreetingsMsgId 6 a bId b aId
|
||||
|
||||
testSwitchConnection :: InitialAgentServers -> IO ()
|
||||
@@ -979,8 +987,8 @@ testTwoUsers = do
|
||||
b `hasClients` 1
|
||||
setNetworkConfig a nc {sessionMode = TSMEntity}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 2
|
||||
|
||||
exchangeGreetingsMsgId 6 a bId1 b aId1
|
||||
@@ -988,10 +996,10 @@ testTwoUsers = do
|
||||
liftIO $ threadDelay 250000
|
||||
setNetworkConfig a nc {sessionMode = TSMUser}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 1
|
||||
|
||||
aUserId2 <- createUser a [noAuthSrv testSMPServer]
|
||||
@@ -1003,10 +1011,10 @@ testTwoUsers = do
|
||||
b `hasClients` 1
|
||||
setNetworkConfig a nc {sessionMode = TSMEntity}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 4
|
||||
exchangeGreetingsMsgId 8 a bId1 b aId1
|
||||
exchangeGreetingsMsgId 8 a bId1' b aId1'
|
||||
@@ -1015,14 +1023,14 @@ testTwoUsers = do
|
||||
liftIO $ threadDelay 250000
|
||||
setNetworkConfig a nc {sessionMode = TSMUser}
|
||||
liftIO $ threadDelay 250000
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", DOWN _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", UP _ _) <- get a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", DOWN _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 2
|
||||
exchangeGreetingsMsgId 10 a bId1 b aId1
|
||||
exchangeGreetingsMsgId 10 a bId1' b aId1'
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
module AgentTests.NotificationTests where
|
||||
|
||||
-- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging)
|
||||
import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, runRight, runRight_, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg)
|
||||
import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, nGet, runRight, runRight_, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg)
|
||||
import Control.Concurrent (killThread, threadDelay)
|
||||
import Control.Monad.Except
|
||||
import qualified Data.Aeson as J
|
||||
@@ -466,12 +466,12 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
|
||||
pure (aliceId, bobId)
|
||||
|
||||
runRight_ @AgentErrorType $ do
|
||||
get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
nGet bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
|
||||
withSmpServerStoreLogOn t testPort $ \threadId -> runRight_ $ do
|
||||
get alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False
|
||||
get bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False
|
||||
nGet alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False
|
||||
nGet bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False
|
||||
liftIO $ threadDelay 1000000
|
||||
5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
|
||||
get bob ##> ("", aliceId, SENT 5)
|
||||
|
||||
@@ -29,7 +29,6 @@ import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Server (runSMPAgentBlocking)
|
||||
import Simplex.Messaging.Agent.Store (UserId)
|
||||
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultClientConfig, defaultNetworkConfig)
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth, XFTPServer)
|
||||
|
||||
+21
-26
@@ -9,12 +9,10 @@ import AgentTests.FunctionalAPITests (get, rfGet, runRight, runRight_, sfGet)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Data.Bifunctor (first)
|
||||
import qualified Data.ByteString as LB
|
||||
import Data.List.NonEmpty (nonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import SMPAgentClient (agentCfg, initAgentServers)
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), checkParty)
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.Messaging.Agent (disconnectAgentClient, getSMPAgentClient, xftpReceiveFile, xftpSendFile)
|
||||
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..))
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
@@ -37,7 +35,7 @@ testXFTPAgentReceive = withXFTPServer $ do
|
||||
-- send file using CLI
|
||||
let filePath = senderFiles </> "testfile"
|
||||
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
|
||||
file <- LB.readFile filePath
|
||||
file <- B.readFile filePath
|
||||
getFileSize filePath `shouldReturn` mb 17
|
||||
let fdRcv = filePath <> ".xftp" </> "rcv1.xftp"
|
||||
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
|
||||
@@ -52,18 +50,15 @@ testXFTPAgentReceive = withXFTPServer $ do
|
||||
rcp <- getSMPAgentClient agentCfg initAgentServers
|
||||
runRight_ $ do
|
||||
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
|
||||
fId <- xftpReceiveFile rcp 1 fd recipientFiles
|
||||
fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles
|
||||
("", fId', RFDONE path) <- rfGet rcp
|
||||
liftIO $ do
|
||||
fId' `shouldBe` fId
|
||||
LB.readFile path `shouldReturn` file
|
||||
B.readFile path `shouldReturn` file
|
||||
|
||||
getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient)
|
||||
getFileDescription path = do
|
||||
fd :: AFileDescription <- ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> LB.readFile path
|
||||
vfd <- liftEither . first INTERNAL $ validateFileDescription fd
|
||||
case vfd of
|
||||
AVFD fd' -> either (throwError . INTERNAL) pure $ checkParty fd'
|
||||
getFileDescription path =
|
||||
ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path
|
||||
|
||||
logCfgNoLogs :: LogConfig
|
||||
logCfgNoLogs = LogConfig {lc_file = Nothing, lc_stderr = False}
|
||||
@@ -90,7 +85,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
rcp <- getSMPAgentClient agentCfg initAgentServers
|
||||
fId <- runRight $ do
|
||||
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
|
||||
fId <- xftpReceiveFile rcp 1 fd recipientFiles
|
||||
fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles
|
||||
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
|
||||
pure fId
|
||||
disconnectAgentClient rcp
|
||||
@@ -103,8 +98,8 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
("", fId', RFDONE path) <- rfGet rcp'
|
||||
liftIO $ do
|
||||
fId' `shouldBe` fId
|
||||
file <- LB.readFile filePath
|
||||
LB.readFile path `shouldReturn` file
|
||||
file <- B.readFile filePath
|
||||
B.readFile path `shouldReturn` file
|
||||
|
||||
-- tmp path should be removed after receiving file
|
||||
doesDirectoryExist (recipientFiles </> "xftp.encrypted") `shouldReturn` False
|
||||
@@ -131,7 +126,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
|
||||
rcp <- getSMPAgentClient agentCfg initAgentServers
|
||||
fId <- runRight $ do
|
||||
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
|
||||
fId <- xftpReceiveFile rcp 1 fd recipientFiles
|
||||
fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles
|
||||
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
|
||||
pure fId
|
||||
disconnectAgentClient rcp
|
||||
@@ -153,27 +148,27 @@ testXFTPAgentSendExperimental = do
|
||||
-- create random file using cli
|
||||
let filePath = senderFiles </> "testfile"
|
||||
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
|
||||
file <- LB.readFile filePath
|
||||
file <- B.readFile filePath
|
||||
getFileSize filePath `shouldReturn` mb 17
|
||||
|
||||
-- send file using experimental agent API
|
||||
sndr <- getSMPAgentClient agentCfg initAgentServers
|
||||
rcvDescrs <- runRight $ do
|
||||
sfId <- xftpSendFile sndr 1 2 senderFiles filePath
|
||||
rfd <- runRight $ do
|
||||
sfId <- xftpSendFile sndr 1 filePath 2 $ Just senderFiles
|
||||
("", sfId', SFDONE sndDescr rcvDescrs) <- sfGet sndr
|
||||
liftIO $ do
|
||||
sfId' `shouldBe` sfId
|
||||
sndDescr `shouldBe` senderFiles </> "testfile.descr/testfile.xftp/snd.xftp.private"
|
||||
rcvDescrs `shouldBe` [senderFiles </> "testfile.descr/testfile.xftp/rcv1.xftp", senderFiles </> "testfile.descr/testfile.xftp/rcv2.xftp"]
|
||||
pure rcvDescrs
|
||||
let fdRcv = maybe "" L.head (nonEmpty rcvDescrs)
|
||||
strDecode <$> B.readFile (senderFiles </> "testfile.descr/testfile.xftp/snd.xftp.private") `shouldReturn` Right sndDescr
|
||||
Right rfd1 <- strDecode <$> B.readFile (senderFiles </> "testfile.descr/testfile.xftp/rcv1.xftp")
|
||||
Right rfd2 <- strDecode <$> B.readFile (senderFiles </> "testfile.descr/testfile.xftp/rcv2.xftp")
|
||||
rcvDescrs `shouldBe` [rfd1, rfd2]
|
||||
pure rfd1
|
||||
|
||||
-- receive file using agent
|
||||
rcp <- getSMPAgentClient agentCfg initAgentServers
|
||||
runRight_ $ do
|
||||
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
|
||||
rfId <- xftpReceiveFile rcp 1 fd recipientFiles
|
||||
rfId <- xftpReceiveFile rcp 1 rfd $ Just recipientFiles
|
||||
("", rfId', RFDONE path) <- rfGet rcp
|
||||
liftIO $ do
|
||||
rfId' `shouldBe` rfId
|
||||
LB.readFile path `shouldReturn` file
|
||||
B.readFile path `shouldReturn` file
|
||||
|
||||
Reference in New Issue
Block a user