diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index b8847cdbf..54a266608 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -29,6 +29,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U +import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Int (Int64) import Data.List (isSuffixOf, partition) @@ -36,7 +37,7 @@ import Data.List.NonEmpty (nonEmpty) import qualified Data.List.NonEmpty as L import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFile) import Simplex.FileTransfer.Description -import Simplex.FileTransfer.Protocol (FileParty (..)) +import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI) import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) import Simplex.FileTransfer.Types import Simplex.FileTransfer.Util (removePath, uniqueCombine) @@ -44,25 +45,26 @@ import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval -import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (XFTPServer) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (tshow) +import Simplex.Messaging.Util (liftIOEither, tshow) import System.FilePath (takeFileName, ()) import UnliftIO import UnliftIO.Concurrent import UnliftIO.Directory import qualified UnliftIO.Exception as E -receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> FilePath -> m RcvFileId -receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpPath = do +receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId +receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath = do g <- asks idsDrg - encPath <- uniqueCombine xftpPath "xftp.encrypted" + workPath <- maybe getTemporaryDirectory pure xftpWorkPath + encPath <- uniqueCombine workPath "xftp.encrypted" createDirectory encPath - fId <- withStore c $ \db -> createRcvFile db g userId fd xftpPath encPath + fId <- withStore c $ \db -> createRcvFile db g userId fd workPath encPath forM_ chunks downloadChunk pure fId where @@ -186,9 +188,10 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do pure $ path : ps getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) = throwError $ INTERNAL "no chunk path" + -- TODO refactor with decrypt in CLI, streaming decryption decrypt :: Int64 -> [FilePath] -> m FilePath decrypt encSize chunkPaths = do - lazyChunks <- readChunks chunkPaths + lazyChunks <- liftIO $ readChunks chunkPaths (authOk, f) <- liftEither . first cryptoError $ LC.sbDecryptTailTag key nonce (encSize - authTagSize) lazyChunks let (fileHdr, f') = LB.splitAt 1024 f -- withFile encPath ReadMode $ \r -> do @@ -205,30 +208,25 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do removeFile path throwError $ INTERNAL "Error decrypting file: incorrect auth tag" pure path - readChunks :: [FilePath] -> m LB.ByteString - readChunks = - foldM - ( \s path -> do - chunk <- liftIO $ LB.readFile path - pure $ s <> chunk - ) - LB.empty + readChunks :: [FilePath] -> IO LB.ByteString + readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) "" -sendFileExperimental :: forall m. AgentMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId -sendFileExperimental AgentClient {subQ} _userId numRecipients xftpPath filePath = do +sendFileExperimental :: forall m. AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId +sendFileExperimental AgentClient {subQ} _userId filePath numRecipients xftpWorkPath = do g <- asks idsDrg - sndFileEntityId <- liftIO $ randomId g 12 - void $ forkIO $ sendCLI sndFileEntityId - pure sndFileEntityId + sndFileId <- liftIO $ randomId g 12 + void $ forkIO $ sendCLI sndFileId + pure sndFileId where randomId :: TVar ChaChaDRG -> Int -> IO ByteString randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n) sendCLI :: SndFileId -> m () - sendCLI sndFileEntityId = do + sendCLI sndFileId = do let fileName = takeFileName filePath - outputDir <- uniqueCombine xftpPath (fileName <> ".descr") + workPath <- maybe getTemporaryDirectory pure xftpWorkPath + outputDir <- uniqueCombine workPath $ fileName <> ".descr" createDirectory outputDir - let tempPath = xftpPath "snd" + let tempPath = workPath "snd" createDirectoryIfMissing False tempPath let sendOptions = SendOptions @@ -242,20 +240,21 @@ sendFileExperimental AgentClient {subQ} _userId numRecipients xftpPath filePath } liftCLI $ cliSendFile sendOptions (sndDescr, rcvDescrs) <- readDescrs outputDir fileName - notify sndFileEntityId $ SFDONE sndDescr rcvDescrs + notify sndFileId $ SFDONE sndDescr rcvDescrs liftCLI :: ExceptT CLIError IO () -> m () liftCLI = either (throwError . INTERNAL . show) pure <=< liftIO . runExceptT - readDescrs :: FilePath -> FilePath -> m (String, [String]) + readDescrs :: FilePath -> FilePath -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient]) readDescrs outDir fileName = do let descrDir = outDir (fileName <> ".xftp") files <- listDirectory descrDir let (sdFiles, rdFiles) = partition ("snd.xftp.private" `isSuffixOf`) files sdFile = maybe "" (\l -> descrDir L.head l) (nonEmpty sdFiles) rdFiles' = map (descrDir ) rdFiles - -- TODO map files to contents - pure (sdFile, rdFiles') + (,) <$> readDescr sdFile <*> mapM readDescr rdFiles' + readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p) + readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f notify :: forall e. AEntityI e => SndFileId -> ACommand 'Agent e -> m () - notify sndFileEntityId cmd = atomically $ writeTBQueue subQ ("", sndFileEntityId, APC (sAEntity @e) cmd) + notify sndFileId cmd = atomically $ writeTBQueue subQ ("", sndFileId, APC (sAEntity @e) cmd) -- _sendFile :: AgentMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId _sendFile :: AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 1c0e30574..6ac46ae61 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -458,7 +458,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, throwError $ CLIError "Error decrypting file: incorrect auth tag" pure path readChunks :: [FilePath] -> IO LB.ByteString - readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) LB.empty + readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) "" {-# NOINLINE readChunks #-} getFilePath :: String -> ExceptT CLIError IO FilePath getFilePath name = @@ -523,9 +523,8 @@ strEnc :: StrEncoding a => a -> String strEnc = B.unpack . strEncode getFileDescription :: FilePath -> ExceptT CLIError IO AValidFileDescription -getFileDescription path = do - fd <- ExceptT $ first (CLIError . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path - liftEither . first CLIError $ validateFileDescription fd +getFileDescription path = + ExceptT $ first (CLIError . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path getFileDescription' :: FilePartyI p => FilePath -> ExceptT CLIError IO (ValidFileDescription p) getFileDescription' path = diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index f9bb5c52d..c995f0dcd 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -27,6 +27,7 @@ module Simplex.FileTransfer.Description validateFileDescription, groupReplicasByServer, replicaServer, + fdSeparator, kb, mb, gb, @@ -75,6 +76,7 @@ data FileDescription (p :: FileParty) = FileDescription data AFileDescription = forall p. FilePartyI p => AFD (FileDescription p) newtype ValidFileDescription p = ValidFD (FileDescription p) + deriving (Eq, Show) pattern ValidFileDescription :: FileDescription p -> ValidFileDescription p pattern ValidFileDescription fd = ValidFD fd @@ -83,6 +85,9 @@ pattern ValidFileDescription fd = ValidFD fd data AValidFileDescription = forall p. FilePartyI p => AVFD (ValidFileDescription p) +fdSeparator :: IsString s => s +fdSeparator = "################################\n" + newtype FileDigest = FileDigest {unFileDigest :: ByteString} deriving (Eq, Show) @@ -170,6 +175,16 @@ data FileServerReplica = FileServerReplica } deriving (Show) +instance FilePartyI p => StrEncoding (ValidFileDescription p) where + strEncode (ValidFD fd) = strEncode fd + strDecode s = strDecode s >>= (\(AVFD fd) -> checkParty fd) + strP = strDecode <$?> A.takeByteString + +instance StrEncoding AValidFileDescription where + strEncode (AVFD fd) = strEncode fd + strDecode = validateFileDescription <=< strDecode + strP = strDecode <$?> A.takeByteString + instance FilePartyI p => StrEncoding (FileDescription p) where strEncode = Y.encode . encodeFileDescription strDecode s = strDecode s >>= (\(AFD fd) -> checkParty fd) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e615d76c8..15ba9fb67 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -339,12 +339,12 @@ toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c -- | Receive XFTP file -xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> FilePath -> m RcvFileId +xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId xftpReceiveFile c = withAgentEnv c .:. receiveFile c -- | Send XFTP file -xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> Int -> FilePath -> FilePath -> m SndFileId -xftpSendFile c userId n xftpPath filePath = withAgentEnv c $ sendFileExperimental c userId n xftpPath filePath +xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId +xftpSendFile c = withAgentEnv c .:: sendFileExperimental c -- | Activate operations activateAgent :: AgentErrorMonad m => AgentClient -> m () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 7ceaedcf4..8e8f2fe90 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -441,8 +441,8 @@ reconnectSMPClient c tSess@(_, srv, _) = let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs mapM_ (throwError . snd) $ listToMaybe tempErrs - notifySub :: ConnId -> ACommand 'Agent 'AEConn -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC SAEConn cmd) + notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () + notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 632b89961..e1263b90d 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -44,7 +44,6 @@ import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConf import Simplex.FileTransfer.Types (DBSndFileId) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval -import Simplex.Messaging.Agent.Store (UserId) import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client diff --git a/src/Simplex/Messaging/Agent/Lock.hs b/src/Simplex/Messaging/Agent/Lock.hs index 06b8a9efd..e0dd22713 100644 --- a/src/Simplex/Messaging/Agent/Lock.hs +++ b/src/Simplex/Messaging/Agent/Lock.hs @@ -14,7 +14,7 @@ createLock :: STM Lock createLock = newEmptyTMVar {-# INLINE createLock #-} -withLock :: MonadUnliftIO m => TMVar String -> String -> m a -> m a +withLock :: MonadUnliftIO m => Lock -> String -> m a -> m a withLock lock name = E.bracket_ (atomically $ putTMVar lock name) diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 9fd41da6a..7d7d1180b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -104,6 +104,7 @@ module Simplex.Messaging.Agent.Protocol MsgIntegrity (..), MsgErrorType (..), QueueStatus (..), + UserId, ACorrId, AgentMsgId, NotificationsMode (..), @@ -163,7 +164,8 @@ import Database.SQLite.Simple.FromField import Database.SQLite.Simple.ToField import GHC.Generics (Generic) import Generic.Random (genericArbitraryU) -import Simplex.FileTransfer.Protocol (XFTPErrorType) +import Simplex.FileTransfer.Description +import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType) import Simplex.Messaging.Agent.QueryString import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri) @@ -224,6 +226,8 @@ type ATransmission p = (ACorrId, EntityId, APartyCmd p) -- | SMP agent protocol transmission or transmission error. type ATransmissionOrError p = (ACorrId, EntityId, Either AgentErrorType (APartyCmd p)) +type UserId = Int64 + type ACorrId = ByteString -- | SMP agent protocol participants. @@ -311,8 +315,8 @@ data ACommand (p :: AParty) (e :: AEntity) where END :: ACommand Agent AEConn CONNECT :: AProtocolType -> TransportHost -> ACommand Agent AENone DISCONNECT :: AProtocolType -> TransportHost -> ACommand Agent AENone - DOWN :: SMPServer -> [ConnId] -> ACommand Agent AEConn - UP :: SMPServer -> [ConnId] -> ACommand Agent AEConn + DOWN :: SMPServer -> [ConnId] -> ACommand Agent AENone + UP :: SMPServer -> [ConnId] -> ACommand Agent AENone SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> ACommand Agent AEConn SEND :: MsgFlags -> MsgBody -> ACommand Client AEConn MID :: AgentMsgId -> ACommand Agent AEConn @@ -335,7 +339,8 @@ data ACommand (p :: AParty) (e :: AEntity) where RFPROG :: Int -> Int -> ACommand Agent AERcvFile RFDONE :: FilePath -> ACommand Agent AERcvFile RFERR :: AgentErrorType -> ACommand Agent AERcvFile - SFDONE :: String -> [String] -> ACommand Agent AESndFile + SFPROG :: Int -> Int -> ACommand Agent AESndFile + SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> ACommand Agent AESndFile deriving instance Eq (ACommand p e) @@ -367,8 +372,8 @@ data ACommandTag (p :: AParty) (e :: AEntity) where END_ :: ACommandTag Agent AEConn CONNECT_ :: ACommandTag Agent AENone DISCONNECT_ :: ACommandTag Agent AENone - DOWN_ :: ACommandTag Agent AEConn - UP_ :: ACommandTag Agent AEConn + DOWN_ :: ACommandTag Agent AENone + UP_ :: ACommandTag Agent AENone SWITCH_ :: ACommandTag Agent AEConn SEND_ :: ACommandTag Client AEConn MID_ :: ACommandTag Agent AEConn @@ -391,6 +396,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where RFDONE_ :: ACommandTag Agent AERcvFile RFPROG_ :: ACommandTag Agent AERcvFile RFERR_ :: ACommandTag Agent AERcvFile + SFPROG_ :: ACommandTag Agent AESndFile SFDONE_ :: ACommandTag Agent AESndFile deriving instance Eq (ACommandTag p e) @@ -439,6 +445,7 @@ aCommandTag = \case RFPROG {} -> RFPROG_ RFDONE {} -> RFDONE_ RFERR {} -> RFERR_ + SFPROG {} -> SFPROG_ SFDONE {} -> SFDONE_ data QueueDirection = QDRcv | QDSnd @@ -1305,10 +1312,10 @@ instance StrEncoding ACmdTag where "CON" -> ct CON_ "SUB" -> t SUB_ "END" -> ct END_ - "CONNECT" -> at SAENone CONNECT_ - "DISCONNECT" -> at SAENone DISCONNECT_ - "DOWN" -> ct DOWN_ - "UP" -> ct UP_ + "CONNECT" -> nt CONNECT_ + "DISCONNECT" -> nt DISCONNECT_ + "DOWN" -> nt DOWN_ + "UP" -> nt UP_ "SWITCH" -> ct SWITCH_ "SEND" -> t SEND_ "MID" -> ct MID_ @@ -1321,20 +1328,23 @@ instance StrEncoding ACmdTag where "DEL" -> t DEL_ "DEL_RCVQ" -> ct DEL_RCVQ_ "DEL_CONN" -> ct DEL_CONN_ - "DEL_USER" -> at SAENone DEL_USER_ + "DEL_USER" -> nt DEL_USER_ "CHK" -> t CHK_ "STAT" -> ct STAT_ "OK" -> ct OK_ "ERR" -> ct ERR_ - "SUSPENDED" -> at SAENone SUSPENDED_ + "SUSPENDED" -> nt SUSPENDED_ "RFPROG" -> at SAERcvFile RFPROG_ "RFDONE" -> at SAERcvFile RFDONE_ "RFERR" -> at SAERcvFile RFERR_ + "SFPROG" -> at SAESndFile SFPROG_ + "SFDONE" -> at SAESndFile SFDONE_ _ -> fail "bad ACmdTag" where t = pure . ACmdTag SClient SAEConn at e = pure . ACmdTag SAgent e ct = at SAEConn + nt = at SAENone instance APartyI p => StrEncoding (APartyCmdTag p) where strEncode (APCT _ cmd) = strEncode cmd @@ -1379,6 +1389,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where RFPROG_ -> "RFPROG" RFDONE_ -> "RFDONE" RFERR_ -> "RFERR" + SFPROG_ -> "SFPROG" SFDONE_ -> "SFDONE" strP = (\(APCT _ t) -> checkEntity t) <$?> strP @@ -1438,14 +1449,19 @@ commandP binaryP = RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal) RFDONE_ -> s (RFDONE <$> strP) RFERR_ -> s (RFERR <$> strP) - SFDONE_ -> s (SFDONE <$> strP <*> rcvDescrs) + SFPROG_ -> s (SFPROG <$> A.decimal <* A.space <*> A.decimal) + SFDONE_ -> s (sfDone . safeDecodeUtf8 <$?> binaryP) where s :: Parser a -> Parser a s p = A.space *> p connections :: Parser [ConnId] connections = strP `A.sepBy'` A.char ',' - rcvDescrs :: Parser [String] - rcvDescrs = strP `A.sepBy'` A.char ',' -- TODO consider separator + sfDone :: Text -> Either String (ACommand 'Agent 'AESndFile) + sfDone t = + let ds = T.splitOn fdSeparator t + in case ds of + [] -> Left "no sender file description" + sd : rds -> SFDONE <$> strDecode (encodeUtf8 sd) <*> mapM (strDecode . encodeUtf8) rds msgMetaP = do integrity <- strP recipient <- " R=" *> partyMeta A.decimal @@ -1497,7 +1513,8 @@ serializeCommand = \case RFPROG rcvd total -> s (RFPROG_, rcvd, total) RFDONE fPath -> s (RFDONE_, fPath) RFERR e -> s (RFERR_, e) - SFDONE sd rds -> B.unwords [s SFDONE_, s sd, rcvDescrs rds] + SFPROG sent total -> s (SFPROG_, sent, total) + SFDONE sd rds -> B.unwords [s SFDONE_, serializeBinary (sfDone sd rds)] where s :: StrEncoding a => a -> ByteString s = strEncode @@ -1505,8 +1522,7 @@ serializeCommand = \case showTs = B.pack . formatISO8601Millis connections :: [ConnId] -> ByteString connections = B.intercalate "," . map strEncode - rcvDescrs :: [String] -> ByteString - rcvDescrs = B.intercalate "," . map strEncode -- TODO consider separator + sfDone sd rds = B.intercalate fdSeparator $ strEncode sd : map strEncode rds serializeMsgMeta :: MsgMeta -> ByteString serializeMsgMeta MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} = B.unwords diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index ab6e66820..cf139c96b 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -264,8 +264,6 @@ data PendingCommand = PendingCommand data AgentCmdType = ACClient | ACInternal -type UserId = Int64 - instance StrEncoding AgentCmdType where strEncode = \case ACClient -> "CLIENT" diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs index aad0f16e6..bc116c2e3 100644 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ b/src/Simplex/Messaging/Agent/TRcvQueues.hs @@ -16,8 +16,8 @@ import Control.Concurrent.STM import qualified Data.Map.Strict as M import Data.Set (Set) import qualified Data.Set as S -import Simplex.Messaging.Agent.Protocol (ConnId) -import Simplex.Messaging.Agent.Store (RcvQueue (..), UserId) +import Simplex.Messaging.Agent.Protocol (ConnId, UserId) +import Simplex.Messaging.Agent.Store (RcvQueue (..)) import Simplex.Messaging.Protocol (RecipientId, SMPServer) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 43eb0c132..6023ae398 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -92,19 +92,31 @@ tGetAgent = tGetAgent' tGetAgent' :: forall c e. (Transport c, AEntityI e) => c -> IO (AEntityTransmissionOrError 'Agent e) tGetAgent' h = do - (corrId, connId, cmdOrErr) <- tGet SAgent h + (corrId, connId, cmdOrErr) <- pGetAgent h case cmdOrErr of - Right (APC _ CONNECT {}) -> tGetAgent' h - Right (APC _ DISCONNECT {}) -> tGetAgent' h Right (APC e cmd) -> case testEquality e (sAEntity @e) of Just Refl -> pure (corrId, connId, Right cmd) _ -> error $ "unexpected command " <> show cmd Left err -> pure (corrId, connId, Left err) +pGetAgent :: forall c. Transport c => c -> IO (ATransmissionOrError 'Agent) +pGetAgent h = do + (corrId, connId, cmdOrErr) <- tGet SAgent h + case cmdOrErr of + Right (APC _ CONNECT {}) -> pGetAgent h + Right (APC _ DISCONNECT {}) -> pGetAgent h + cmd -> pure (corrId, connId, cmd) + -- | receive message to handle `h` (<#:) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn) (<#:) = tGetAgent +(<#:?) :: Transport c => c -> IO (ATransmissionOrError 'Agent) +(<#:?) = pGetAgent + +(<#:.) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AENone) +(<#:.) = tGetAgent' + -- | send transmission `t` to handle `h` and get response (#:) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn) h #: t = tPutRaw h t >> (<#:) h @@ -119,7 +131,7 @@ action #> (corrId, connId, cmd) = action `shouldReturn` (corrId, connId, Right c (=#>) :: IO (AEntityTransmissionOrError 'Agent 'AEConn) -> (AEntityTransmission 'Agent 'AEConn -> Bool) -> Expectation action =#> p = action >>= (`shouldSatisfy` p . correctTransmission) -correctTransmission :: AEntityTransmissionOrError p e -> AEntityTransmission p e +correctTransmission :: (ACorrId, ConnId, Either AgentErrorType cmd) -> (ACorrId, ConnId, cmd) correctTransmission (corrId, connId, cmdOrErr) = case cmdOrErr of Right cmd -> (corrId, connId, cmd) Left e -> error $ show e @@ -128,10 +140,16 @@ correctTransmission (corrId, connId, cmdOrErr) = case cmdOrErr of (<#) :: Transport c => c -> AEntityTransmission 'Agent 'AEConn -> Expectation h <# (corrId, connId, cmd) = (h <#:) `shouldReturn` (corrId, connId, Right cmd) +(<#.) :: Transport c => c -> AEntityTransmission 'Agent 'AENone -> Expectation +h <#. (corrId, connId, cmd) = (h <#:.) `shouldReturn` (corrId, connId, Right cmd) + -- | receive message to handle `h` and validate it using predicate `p` (<#=) :: Transport c => c -> (AEntityTransmission 'Agent 'AEConn -> Bool) -> Expectation h <#= p = (h <#:) >>= (`shouldSatisfy` p . correctTransmission) +(<#=?) :: Transport c => c -> (ATransmission 'Agent -> Bool) -> Expectation +h <#=? p = (h <#:?) >>= (`shouldSatisfy` p . correctTransmission) + -- | test that nothing is delivered to handle `h` during 10ms (#:#) :: Transport c => c -> String -> Expectation h #:# err = tryGet `shouldReturn` () @@ -141,7 +159,7 @@ h #:# err = tryGet `shouldReturn` () Just _ -> error err _ -> return () -pattern Msg :: MsgBody -> ACommand 'Agent 'AEConn +pattern Msg :: MsgBody -> ACommand 'Agent e pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO () @@ -300,7 +318,7 @@ testSubscrNotification t (server, _) client = do client #: ("1", "conn1", "NEW T INV") =#> \case ("1", "conn1", INV {}) -> True; _ -> False client #:# "nothing should be delivered to client before the server is killed" killThread server - client <# ("", "", DOWN testSMPServer ["conn1"]) + client <#. ("", "", DOWN testSMPServer ["conn1"]) withSmpServer (ATransport t) $ client <# ("", "conn1", ERR (SMP AUTH)) -- this new server does not have the queue @@ -314,15 +332,15 @@ testMsgDeliveryServerRestart t alice bob = do alice #: ("11", "bob", "ACK 4") #> ("11", "bob", OK) alice #:# "nothing else delivered before the server is killed" - let server = (SMPServer "localhost" testPort2 testKeyHash) - alice <# ("", "", DOWN server ["bob"]) + let server = SMPServer "localhost" testPort2 testKeyHash + alice <#. ("", "", DOWN server ["bob"]) bob #: ("2", "alice", "SEND F 11\nhello again") #> ("2", "alice", MID 5) bob #:# "nothing else delivered before the server is restarted" alice #:# "nothing else delivered before the server is restarted" withServer $ do bob <# ("", "alice", SENT 5) - alice <# ("", "", UP server ["bob"]) + alice <#. ("", "", UP server ["bob"]) alice <#= \case ("", "bob", Msg "hello again") -> True; _ -> False alice #: ("12", "bob", "ACK 5") #> ("12", "bob", OK) @@ -337,8 +355,8 @@ testServerConnectionAfterError t _ = do withServer $ do connect (bob, "bob") (alice, "alice") - bob <# ("", "", DOWN server ["alice"]) - alice <# ("", "", DOWN server ["bob"]) + bob <#. ("", "", DOWN server ["alice"]) + alice <#. ("", "", DOWN server ["bob"]) alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 4) alice #:# "nothing else delivered before the server is restarted" bob #:# "nothing else delivered before the server is restarted" @@ -348,10 +366,10 @@ testServerConnectionAfterError t _ = do bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT withServer $ do - alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False - alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False - bob <#= \case ("", "alice", Msg "hello") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False - bob <#= \case ("", "alice", Msg "hello") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False + alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False + alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False bob #: ("2", "alice", "ACK 4") #> ("2", "alice", OK) alice #: ("1", "bob", "SEND F 11\nhello again") #> ("1", "bob", MID 5) alice <# ("", "bob", SENT 5) @@ -380,7 +398,7 @@ testMsgDeliveryAgentRestart t bob = do bob #: ("11", "alice", "ACK 4") #> ("11", "alice", OK) bob #:# "nothing else delivered before the server is down" - bob <# ("", "", DOWN server ["alice"]) + bob <#. ("", "", DOWN server ["alice"]) alice #: ("2", "bob", "SEND F 11\nhello again") #> ("2", "bob", MID 5) alice #:# "nothing else delivered before the server is restarted" bob #:# "nothing else delivered before the server is restarted" @@ -393,8 +411,8 @@ testMsgDeliveryAgentRestart t bob = do (corrId == "3" && cmd == OK) || (corrId == "" && cmd == SENT 5) _ -> False - bob <#= \case ("", "alice", Msg "hello again") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False - bob <#= \case ("", "alice", Msg "hello again") -> True; ("", "", UP s ["alice"]) -> s == server; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False bob #: ("12", "alice", "ACK 5") #> ("12", "alice", OK) removeFile testStoreLogFile diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index c4adaabaf..8ef94eced 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -23,6 +23,7 @@ module AgentTests.FunctionalAPITests get', rfGet, sfGet, + nGet, (##>), (=##>), pattern Msg, @@ -49,7 +50,6 @@ import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Client (SMPTestFailure (..), SMPTestStep (..)) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..)) import Simplex.Messaging.Agent.Protocol -import Simplex.Messaging.Agent.Store (UserId) import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -68,7 +68,7 @@ type AEntityTransmission e = (ACorrId, ConnId, ACommand 'Agent e) (##>) :: (HasCallStack, MonadIO m) => m (AEntityTransmission e) -> AEntityTransmission e -> m () a ##> t = a >>= \t' -> liftIO (t' `shouldBe` t) -(=##>) :: (HasCallStack, MonadIO m) => m (AEntityTransmission e) -> (AEntityTransmission e -> Bool) -> m () +(=##>) :: (Show a, HasCallStack, MonadIO m) => m a -> (a -> Bool) -> m () a =##> p = a >>= \t -> liftIO (t `shouldSatisfy` p) get :: MonadIO m => AgentClient -> m (AEntityTransmission 'AEConn) @@ -80,17 +80,25 @@ rfGet = get' @'AERcvFile sfGet :: MonadIO m => AgentClient -> m (AEntityTransmission 'AESndFile) sfGet = get' @'AESndFile +nGet :: MonadIO m => AgentClient -> m (AEntityTransmission 'AENone) +nGet = get' @'AENone + get' :: forall e m. (MonadIO m, AEntityI e) => AgentClient -> m (AEntityTransmission e) get' c = do - (corrId, connId, APC e cmd) <- atomically (readTBQueue $ subQ c) - case cmd of - CONNECT {} -> get' c - DISCONNECT {} -> get' c - _ -> case testEquality e (sAEntity @e) of - Just Refl -> pure (corrId, connId, cmd) - _ -> error $ "unexpected command " <> show cmd + (corrId, connId, APC e cmd) <- pGet c + case testEquality e (sAEntity @e) of + Just Refl -> pure (corrId, connId, cmd) + _ -> error $ "unexpected command " <> show cmd -pattern Msg :: MsgBody -> ACommand 'Agent 'AEConn +pGet :: forall m. MonadIO m => AgentClient -> m (ATransmission 'Agent) +pGet c = do + t@(_, _, APC _ cmd) <- atomically (readTBQueue $ subQ c) + case cmd of + CONNECT {} -> pGet c + DISCONNECT {} -> pGet c + _ -> pure t + +pattern Msg :: MsgBody -> ACommand 'Agent e pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody smpCfgV1 :: ProtocolClientConfig @@ -405,12 +413,12 @@ testAsyncServerOffline t = do runRight $ createConnection alice 1 True SCMInvitation Nothing -- connection fails Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True cReq "bob's connInfo" - ("", "", DOWN srv conns) <- get alice + ("", "", DOWN srv conns) <- nGet alice srv `shouldBe` testSMPServer conns `shouldBe` [bobId] -- connection succeeds after server start withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - ("", "", UP srv1 conns1) <- get alice + ("", "", UP srv1 conns1) <- nGet alice liftIO $ do srv1 `shouldBe` testSMPServer conns1 `shouldBe` [bobId] @@ -457,8 +465,8 @@ testDuplicateMessage t = do pure (aliceId, bobId, bob1) - get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False - get bob1 =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False + nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False + nGet bob1 =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False -- commenting two lines below and uncommenting further two lines would also runRight_, -- it is the scenario tested above, when the message was not acknowledged by the user threadDelay 200000 @@ -502,7 +510,7 @@ testInactiveClientDisconnected t = do alice <- getSMPAgentClient agentCfg initAgentServers runRight_ $ do (connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing - get alice ##> ("", "", DOWN testSMPServer [connId]) + nGet alice ##> ("", "", DOWN testSMPServer [connId]) testActiveClientNotDisconnected :: ATransport -> IO () testActiveClientNotDisconnected t = do @@ -528,7 +536,7 @@ testActiveClientNotDisconnected t = do Nothing <- 800000 `timeout` get alice liftIO $ threadDelay 1200000 -- and after 2 sec of inactivity DOWN is sent - get alice ##> ("", "", DOWN testSMPServer [connId]) + nGet alice ##> ("", "", DOWN testSMPServer [connId]) milliseconds ts = systemSeconds ts * 1000 + fromIntegral (systemNanoseconds ts `div` 1000000) testSuspendingAgent :: IO () @@ -562,21 +570,21 @@ testSuspendingAgentCompleteSending t = do pure (aId, bId) runRight_ $ do - ("", "", DOWN {}) <- get a - ("", "", DOWN {}) <- get b + ("", "", DOWN {}) <- nGet a + ("", "", DOWN {}) <- nGet b 5 <- sendMessage b aId SMP.noMsgFlags "hello too" 6 <- sendMessage b aId SMP.noMsgFlags "how are you?" liftIO $ threadDelay 100000 suspendAgent b 5000000 withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do - get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False - get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False - get b =##> \case ("", c, SENT 6) -> c == aId; ("", "", UP {}) -> True; _ -> False - ("", "", SUSPENDED) <- get' @'AENone b + pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False + pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False + pGet b =##> \case ("", c, APC _ (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False + ("", "", SUSPENDED) <- nGet b - get a =##> \case ("", c, Msg "hello too") -> c == bId; ("", "", UP {}) -> True; _ -> False - get a =##> \case ("", c, Msg "hello too") -> c == bId; ("", "", UP {}) -> True; _ -> False + pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False + pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False ackMessage a bId 5 get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False ackMessage a bId 6 @@ -594,12 +602,12 @@ testSuspendingAgentTimeout t = do pure (aId, bId) runRight_ $ do - ("", "", DOWN {}) <- get a - ("", "", DOWN {}) <- get b + ("", "", DOWN {}) <- nGet a + ("", "", DOWN {}) <- nGet b 5 <- sendMessage b aId SMP.noMsgFlags "hello too" 6 <- sendMessage b aId SMP.noMsgFlags "how are you?" suspendAgent b 100000 - ("", "", SUSPENDED) <- get' @'AENone b + ("", "", SUSPENDED) <- nGet b pure () testBatchedSubscriptions :: ATransport -> IO () @@ -614,15 +622,15 @@ testBatchedSubscriptions t = do delete b aIds' liftIO $ threadDelay 1000000 pure conns - ("", "", DOWN {}) <- get a - ("", "", DOWN {}) <- get a - ("", "", DOWN {}) <- get b - ("", "", DOWN {}) <- get b + ("", "", DOWN {}) <- nGet a + ("", "", DOWN {}) <- nGet a + ("", "", DOWN {}) <- nGet b + ("", "", DOWN {}) <- nGet b runServers $ do - ("", "", UP {}) <- get a - ("", "", UP {}) <- get a - ("", "", UP {}) <- get b - ("", "", UP {}) <- get b + ("", "", UP {}) <- nGet a + ("", "", UP {}) <- nGet a + ("", "", UP {}) <- nGet b + ("", "", UP {}) <- nGet b liftIO $ threadDelay 1000000 let (aIds, bIds) = unzip conns conns' = drop 10 conns @@ -795,7 +803,7 @@ testUsers = do deleteUser a auId True get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId'; _ -> False get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False - get' @'AENone a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False + nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False exchangeGreetingsMsgId 6 a bId b aId liftIO $ noMessages a "nothing else should be delivered to alice" @@ -824,18 +832,18 @@ testUsersNoServer t = do (aId', bId') <- makeConnectionForUsers a auId b 1 exchangeGreetingsMsgId 4 a bId' b aId' pure (aId, bId, auId, aId', bId') - get a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False - get a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False - get b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False + nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False + nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId || c == bId'; _ -> False + nGet b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False runRight_ $ do deleteUser a auId True get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False - get' @'AENone a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False + nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice" withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - get a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False - get b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False + nGet a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False + nGet b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False exchangeGreetingsMsgId 6 a bId b aId testSwitchConnection :: InitialAgentServers -> IO () @@ -979,8 +987,8 @@ testTwoUsers = do b `hasClients` 1 setNetworkConfig a nc {sessionMode = TSMEntity} liftIO $ threadDelay 250000 - ("", "", DOWN _ _) <- get a - ("", "", UP _ _) <- get a + ("", "", DOWN _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 2 exchangeGreetingsMsgId 6 a bId1 b aId1 @@ -988,10 +996,10 @@ testTwoUsers = do liftIO $ threadDelay 250000 setNetworkConfig a nc {sessionMode = TSMUser} liftIO $ threadDelay 250000 - ("", "", DOWN _ _) <- get a - ("", "", DOWN _ _) <- get a - ("", "", UP _ _) <- get a - ("", "", UP _ _) <- get a + ("", "", DOWN _ _) <- nGet a + ("", "", DOWN _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 1 aUserId2 <- createUser a [noAuthSrv testSMPServer] @@ -1003,10 +1011,10 @@ testTwoUsers = do b `hasClients` 1 setNetworkConfig a nc {sessionMode = TSMEntity} liftIO $ threadDelay 250000 - ("", "", DOWN _ _) <- get a - ("", "", DOWN _ _) <- get a - ("", "", UP _ _) <- get a - ("", "", UP _ _) <- get a + ("", "", DOWN _ _) <- nGet a + ("", "", DOWN _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 4 exchangeGreetingsMsgId 8 a bId1 b aId1 exchangeGreetingsMsgId 8 a bId1' b aId1' @@ -1015,14 +1023,14 @@ testTwoUsers = do liftIO $ threadDelay 250000 setNetworkConfig a nc {sessionMode = TSMUser} liftIO $ threadDelay 250000 - ("", "", DOWN _ _) <- get a - ("", "", DOWN _ _) <- get a - ("", "", DOWN _ _) <- get a - ("", "", DOWN _ _) <- get a - ("", "", UP _ _) <- get a - ("", "", UP _ _) <- get a - ("", "", UP _ _) <- get a - ("", "", UP _ _) <- get a + ("", "", DOWN _ _) <- nGet a + ("", "", DOWN _ _) <- nGet a + ("", "", DOWN _ _) <- nGet a + ("", "", DOWN _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGet a a `hasClients` 2 exchangeGreetingsMsgId 10 a bId1 b aId1 exchangeGreetingsMsgId 10 a bId1' b aId1' diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index a3abd2242..490873b9e 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -9,7 +9,7 @@ module AgentTests.NotificationTests where -- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging) -import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, runRight, runRight_, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg) +import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, nGet, runRight, runRight_, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg) import Control.Concurrent (killThread, threadDelay) import Control.Monad.Except import qualified Data.Aeson as J @@ -466,12 +466,12 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do pure (aliceId, bobId) runRight_ @AgentErrorType $ do - get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False - get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False + nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False + nGet bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False withSmpServerStoreLogOn t testPort $ \threadId -> runRight_ $ do - get alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False - get bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False + nGet alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False + nGet bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False liftIO $ threadDelay 1000000 5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again" get bob ##> ("", aliceId, SENT 5) diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index cc80cb098..053f973e7 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -29,7 +29,6 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Server (runSMPAgentBlocking) -import Simplex.Messaging.Agent.Store (UserId) import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultClientConfig, defaultNetworkConfig) import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (ProtoServerWithAuth, XFTPServer) diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index 0cabdd123..3d6406c14 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -9,12 +9,10 @@ import AgentTests.FunctionalAPITests (get, rfGet, runRight, runRight_, sfGet) import Control.Logger.Simple import Control.Monad.Except import Data.Bifunctor (first) -import qualified Data.ByteString as LB -import Data.List.NonEmpty (nonEmpty) -import qualified Data.List.NonEmpty as L +import qualified Data.ByteString.Char8 as B import SMPAgentClient (agentCfg, initAgentServers) import Simplex.FileTransfer.Description -import Simplex.FileTransfer.Protocol (FileParty (..), checkParty) +import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.Messaging.Agent (disconnectAgentClient, getSMPAgentClient, xftpReceiveFile, xftpSendFile) import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..)) import Simplex.Messaging.Encoding.String (StrEncoding (..)) @@ -37,7 +35,7 @@ testXFTPAgentReceive = withXFTPServer $ do -- send file using CLI let filePath = senderFiles "testfile" xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - file <- LB.readFile filePath + file <- B.readFile filePath getFileSize filePath `shouldReturn` mb 17 let fdRcv = filePath <> ".xftp" "rcv1.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" @@ -52,18 +50,15 @@ testXFTPAgentReceive = withXFTPServer $ do rcp <- getSMPAgentClient agentCfg initAgentServers runRight_ $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd recipientFiles + fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles ("", fId', RFDONE path) <- rfGet rcp liftIO $ do fId' `shouldBe` fId - LB.readFile path `shouldReturn` file + B.readFile path `shouldReturn` file getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient) -getFileDescription path = do - fd :: AFileDescription <- ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> LB.readFile path - vfd <- liftEither . first INTERNAL $ validateFileDescription fd - case vfd of - AVFD fd' -> either (throwError . INTERNAL) pure $ checkParty fd' +getFileDescription path = + ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> B.readFile path logCfgNoLogs :: LogConfig logCfgNoLogs = LogConfig {lc_file = Nothing, lc_stderr = False} @@ -90,7 +85,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do rcp <- getSMPAgentClient agentCfg initAgentServers fId <- runRight $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd recipientFiles + fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure fId disconnectAgentClient rcp @@ -103,8 +98,8 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do ("", fId', RFDONE path) <- rfGet rcp' liftIO $ do fId' `shouldBe` fId - file <- LB.readFile filePath - LB.readFile path `shouldReturn` file + file <- B.readFile filePath + B.readFile path `shouldReturn` file -- tmp path should be removed after receiving file doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` False @@ -131,7 +126,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do rcp <- getSMPAgentClient agentCfg initAgentServers fId <- runRight $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd recipientFiles + fId <- xftpReceiveFile rcp 1 fd $ Just recipientFiles liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure fId disconnectAgentClient rcp @@ -153,27 +148,27 @@ testXFTPAgentSendExperimental = do -- create random file using cli let filePath = senderFiles "testfile" xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - file <- LB.readFile filePath + file <- B.readFile filePath getFileSize filePath `shouldReturn` mb 17 -- send file using experimental agent API sndr <- getSMPAgentClient agentCfg initAgentServers - rcvDescrs <- runRight $ do - sfId <- xftpSendFile sndr 1 2 senderFiles filePath + rfd <- runRight $ do + sfId <- xftpSendFile sndr 1 filePath 2 $ Just senderFiles ("", sfId', SFDONE sndDescr rcvDescrs) <- sfGet sndr liftIO $ do sfId' `shouldBe` sfId - sndDescr `shouldBe` senderFiles "testfile.descr/testfile.xftp/snd.xftp.private" - rcvDescrs `shouldBe` [senderFiles "testfile.descr/testfile.xftp/rcv1.xftp", senderFiles "testfile.descr/testfile.xftp/rcv2.xftp"] - pure rcvDescrs - let fdRcv = maybe "" L.head (nonEmpty rcvDescrs) + strDecode <$> B.readFile (senderFiles "testfile.descr/testfile.xftp/snd.xftp.private") `shouldReturn` Right sndDescr + Right rfd1 <- strDecode <$> B.readFile (senderFiles "testfile.descr/testfile.xftp/rcv1.xftp") + Right rfd2 <- strDecode <$> B.readFile (senderFiles "testfile.descr/testfile.xftp/rcv2.xftp") + rcvDescrs `shouldBe` [rfd1, rfd2] + pure rfd1 -- receive file using agent rcp <- getSMPAgentClient agentCfg initAgentServers runRight_ $ do - fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - rfId <- xftpReceiveFile rcp 1 fd recipientFiles + rfId <- xftpReceiveFile rcp 1 rfd $ Just recipientFiles ("", rfId', RFDONE path) <- rfGet rcp liftIO $ do rfId' `shouldBe` rfId - LB.readFile path `shouldReturn` file + B.readFile path `shouldReturn` file