From 202922bceb4cdd281eb9edb971276f2b6669f83c Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Fri, 24 Feb 2023 15:21:07 +0400 Subject: [PATCH] xftp: server store log (#652) --- src/Simplex/FileTransfer/Server.hs | 22 +++++-- src/Simplex/FileTransfer/Server/Store.hs | 14 ++++- src/Simplex/FileTransfer/Server/StoreLog.hs | 6 +- src/Simplex/Messaging/Encoding/String.hs | 6 +- tests/XFTPClient.hs | 9 +++ tests/XFTPServerTests.hs | 69 ++++++++++++++++++++- 6 files changed, 111 insertions(+), 15 deletions(-) diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index cf4b460ec..38955fc4b 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -37,6 +37,7 @@ import Simplex.FileTransfer.Protocol import Simplex.FileTransfer.Server.Env import Simplex.FileTransfer.Server.Stats import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Server.StoreLog import Simplex.FileTransfer.Transport import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC @@ -45,7 +46,6 @@ import Simplex.Messaging.Protocol (CorrId, RcvPublicDhKey, RecipientId) import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdSignature) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.StoreLog (StoreLog, closeStoreLog) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Server import Simplex.Messaging.Util @@ -102,13 +102,16 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do forM_ sIds $ \sId -> do threadDelay 100000 atomically (expiredFilePath st sId old) - >>= mapM_ (remove $ void $ atomically $ deleteFile st sId) + >>= mapM_ (remove $ delete st sId) where - remove delete filePath = + remove del filePath = ifM (doesFileExist filePath) - (removeFile filePath >> delete `catch` \(e :: SomeException) -> logError $ "failed to remove expired file " <> tshow filePath <> ": " <> tshow e) - delete + (removeFile filePath >> del `catch` \(e :: SomeException) -> logError $ "failed to remove expired file " <> tshow filePath <> ": " <> tshow e) + del + delete st sId = do + withFileLog (`logDeleteFile` sId) + void $ atomically $ deleteFile st sId serverStatsThread_ :: XFTPServerConfig -> [M ()] serverStatsThread_ XFTPServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = @@ -215,10 +218,14 @@ processXFTPRequest HTTP2Body {bodyPart} = \case -- TODO retry on duplicate IDs? sId <- getFileId rIds <- mapM (const getFileId) rcps + let rIdsKeys = L.zipWith FileRecipient rIds rcps ts <- liftIO getSystemTime + withFileLog $ \sl -> do + logAddFile sl sId file ts + logAddRecipients sl sId rIdsKeys r <- runExceptT $ do ExceptT $ atomically $ addFile st sId file ts - forM (L.zip rIds rcps) $ \rcp -> + forM rIdsKeys $ \rcp -> ExceptT $ atomically $ addRecipient st sId rcp noFile $ either FRErr (const $ FRSndIds sId rIds) r XFTPReqCmd fId fr (FileCmd _ cmd) -> case cmd of @@ -242,6 +249,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case path <- asks $ filesPath . config let fPath = path B.unpack (B64.encode senderId) FileInfo {size, digest} = fileInfo + withFileLog $ \sl -> logPutFile sl senderId fPath st <- asks store quota_ <- asks $ fileSizeQuota . config liftIO $ @@ -269,6 +277,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case deleteServerFile :: FileRec -> M FileResponse deleteServerFile FileRec {senderId, filePath} = do + withFileLog (`logDeleteFile` senderId) r <- runExceptT $ do path <- readTVarIO filePath ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p)) @@ -282,6 +291,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case ackFileReception :: RecipientId -> FileRec -> M FileResponse ackFileReception rId fr = do + withFileLog (`logAckFile` rId) st <- asks store atomically $ deleteRecipient st rId fr pure FROk diff --git a/src/Simplex/FileTransfer/Server/Store.hs b/src/Simplex/FileTransfer/Server/Store.hs index 7bce800fe..2a19cb219 100644 --- a/src/Simplex/FileTransfer/Server/Store.hs +++ b/src/Simplex/FileTransfer/Server/Store.hs @@ -1,11 +1,13 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} module Simplex.FileTransfer.Server.Store ( FileStore (..), FileRec (..), + FileRecipient (..), newFileStore, addFile, setFilePath, @@ -20,6 +22,7 @@ module Simplex.FileTransfer.Server.Store where import Control.Concurrent.STM +import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Functor (($>)) import Data.Int (Int64) import Data.Set (Set) @@ -27,6 +30,7 @@ import qualified Data.Set as S import Data.Time.Clock.System (SystemTime (..)) import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPErrorType (..), XFTPFileId) import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RcvPublicVerifyKey, RecipientId, SenderId) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -47,6 +51,12 @@ data FileRec = FileRec } deriving (Eq) +data FileRecipient = FileRecipient RecipientId RcvPublicVerifyKey + +instance StrEncoding FileRecipient where + strEncode (FileRecipient rId rKey) = strEncode rId <> ":" <> strEncode rKey + strP = FileRecipient <$> strP <* A.char ':' <*> strP + newFileStore :: STM FileStore newFileStore = do files <- TM.empty @@ -76,8 +86,8 @@ setFilePath' st FileRec {fileInfo, filePath} fPath = do writeTVar filePath (Just fPath) modifyTVar' (usedStorage st) (+ fromIntegral (size fileInfo)) -addRecipient :: FileStore -> SenderId -> (RecipientId, RcvPublicVerifyKey) -> STM (Either XFTPErrorType ()) -addRecipient st@FileStore {recipients} senderId (rId, rKey) = +addRecipient :: FileStore -> SenderId -> FileRecipient -> STM (Either XFTPErrorType ()) +addRecipient st@FileStore {recipients} senderId (FileRecipient rId rKey) = withFile st senderId $ \FileRec {recipientIds} -> do rIds <- readTVar recipientIds mem <- TM.member rId recipients diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index 37e6df390..dcfbb757e 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -40,7 +40,7 @@ import System.IO data FileStoreLogRecord = AddFile SenderId FileInfo SystemTime | PutFile SenderId FilePath - | AddRecipients SenderId (NonEmpty (RecipientId, RcvPublicVerifyKey)) + | AddRecipients SenderId (NonEmpty FileRecipient) | DeleteFile SenderId | AckFile RecipientId @@ -69,7 +69,7 @@ logAddFile s = logFileStoreRecord s .:. AddFile logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO () logPutFile s = logFileStoreRecord s .: PutFile -logAddRecipients :: StoreLog 'WriteMode -> SenderId -> NonEmpty (RecipientId, RcvPublicVerifyKey) -> IO () +logAddRecipients :: StoreLog 'WriteMode -> SenderId -> NonEmpty FileRecipient -> IO () logAddRecipients s = logFileStoreRecord s .: AddRecipients logDeleteFile :: StoreLog 'WriteMode -> SenderId -> IO () @@ -119,6 +119,6 @@ writeFileStore s FileStore {files, recipients} = do where getRcp rId = case M.lookup rId allRcps of Just (sndId, rKey) - | sndId == senderId -> Right (rId, rKey) + | sndId == senderId -> Right $ FileRecipient rId rKey | otherwise -> Left $ "sender ID for recipient ID " <> bshow rId <> " does not match FileRec" Nothing -> Left $ "recipient ID " <> bshow rId <> " not found" diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index f35251348..e81b0da89 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -75,9 +75,9 @@ instance StrEncoding Str where strEncode = unStr strP = Str <$> A.takeTill (== ' ') <* optional A.space -instance StrEncoding FilePath where - strEncode = strEncode - strDecode = strDecode +instance StrEncoding String where + strEncode = strEncode . B.pack + strP = B.unpack <$> strP instance ToJSON Str where toJSON (Str s) = strToJSON s diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index ec1b67432..0a6430bad 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -37,12 +37,18 @@ runXFTPTestN nClients test = withXFTPServer $ run nClients [] run 0 hs = test hs run n hs = testXFTPClient $ \h -> run (n - 1) (h : hs) +withXFTPServerStoreLogOn :: HasCallStack => (HasCallStack => ThreadId -> IO a) -> IO a +withXFTPServerStoreLogOn = withXFTPServerCfg testXFTPServerConfig {storeLogFile = Just testXFTPLogFile} + withXFTPServerCfg :: HasCallStack => XFTPServerConfig -> (HasCallStack => ThreadId -> IO a) -> IO a withXFTPServerCfg cfg = serverBracket (`runXFTPServerBlocking` cfg) (pure ()) +withXFTPServerThreadOn :: HasCallStack => (HasCallStack => ThreadId -> IO a) -> IO a +withXFTPServerThreadOn = withXFTPServerCfg testXFTPServerConfig + withXFTPServer :: IO a -> IO a withXFTPServer = withXFTPServerCfg testXFTPServerConfig . const @@ -70,6 +76,9 @@ xftpServerFiles = "tests/tmp/xftp-server-files" xftpServerFiles2 :: FilePath xftpServerFiles2 = "tests/tmp/xftp-server-files2" +testXFTPLogFile :: FilePath +testXFTPLogFile = "tests/tmp/xftp-server-store.log" + testXFTPServerConfig :: XFTPServerConfig testXFTPServerConfig = XFTPServerConfig diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs index e7a06fd89..7d8129802 100644 --- a/tests/XFTPServerTests.hs +++ b/tests/XFTPServerTests.hs @@ -8,6 +8,7 @@ module XFTPServerTests where import AgentTests.FunctionalAPITests (runRight_) import Control.Concurrent (threadDelay) +import Control.Concurrent.STM import Control.Exception (SomeException) import Control.Monad.Except import Crypto.Random (getRandomBytes) @@ -16,6 +17,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.List (isInfixOf) +import ServerTests (logSize) import Simplex.FileTransfer.Client import Simplex.FileTransfer.Protocol (FileInfo (..), XFTPErrorType (..)) import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) @@ -25,7 +27,7 @@ import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Protocol (SenderId) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..)) -import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) +import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive, removeFile) import System.FilePath (()) import Test.Hspec import XFTPClient @@ -44,6 +46,7 @@ xftpServerTests = it "should acknowledge file chunk reception (2 clients)" testFileChunkAck2 it "should expire chunks after set interval" testFileChunkExpiration it "should not allow uploading chunks after specified storage quota" testFileStorageQuota + it "should store file records to log and restore them after server restart" testFileLog chSize :: Num n => n chSize = 128 * 1024 @@ -188,3 +191,67 @@ testFileStorageQuota = withXFTPServerCfg testXFTPServerConfig {fileSizeQuota = J deleteXFTPChunk c spKey sId1 uploadXFTPChunk c spKey sId3 chunkSpec download rId3 + +testFileLog :: Expectation +testFileLog = do + bytes <- liftIO $ createTestChunk testChunkPath + (sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 + (rcvKey1, rpKey1) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 + (rcvKey2, rpKey2) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 + digest <- liftIO $ LC.sha512Hash <$> LB.readFile testChunkPath + sIdVar <- newTVarIO "" + rIdVar1 <- newTVarIO "" + rIdVar2 <- newTVarIO "" + + withXFTPServerStoreLogOn $ \_ -> testXFTPClient $ \c -> runRight_ $ do + let file = FileInfo {sndKey, size = chSize, digest} + chunkSpec = XFTPChunkSpec {filePath = testChunkPath, chunkOffset = 0, chunkSize = chSize} + + (sId, [rId1, rId2]) <- createXFTPChunk c spKey file [rcvKey1, rcvKey2] + liftIO $ + atomically $ do + writeTVar sIdVar sId + writeTVar rIdVar1 rId1 + writeTVar rIdVar2 rId2 + uploadXFTPChunk c spKey sId chunkSpec + + download c rpKey1 rId1 digest bytes + download c rpKey2 rId2 digest bytes + + logSize testXFTPLogFile `shouldReturn` 3 + + withXFTPServerThreadOn $ \_ -> testXFTPClient $ \c -> runRight_ $ do + sId <- liftIO $ readTVarIO sIdVar + rId1 <- liftIO $ readTVarIO rIdVar1 + rId2 <- liftIO $ readTVarIO rIdVar2 + downloadXFTPChunk c rpKey1 rId1 (XFTPRcvChunkSpec "tests/tmp/received_chunk1" chSize digest) + `catchError` (liftIO . (`shouldBe` PCEProtocolError AUTH)) + downloadXFTPChunk c rpKey2 rId2 (XFTPRcvChunkSpec "tests/tmp/received_chunk1" chSize digest) + `catchError` (liftIO . (`shouldBe` PCEProtocolError AUTH)) + deleteXFTPChunk c spKey sId + `catchError` (liftIO . (`shouldBe` PCEProtocolError AUTH)) + + withXFTPServerStoreLogOn $ \_ -> testXFTPClient $ \c -> runRight_ $ do + rId1 <- liftIO $ readTVarIO rIdVar1 + rId2 <- liftIO $ readTVarIO rIdVar2 + + download c rpKey1 rId1 digest bytes + ackXFTPChunk c rpKey1 rId1 + + download c rpKey2 rId2 digest bytes + + logSize testXFTPLogFile `shouldReturn` 4 + + withXFTPServerStoreLogOn $ \_ -> testXFTPClient $ \c -> runRight_ $ do + sId <- liftIO $ readTVarIO sIdVar + deleteXFTPChunk c spKey sId + + -- logSize testXFTPLogFile `shouldReturn` 0 + + -- liftIO $ threadDelay 60000000 + + removeFile testXFTPLogFile + where + download c rpKey rId digest bytes = do + downloadXFTPChunk c rpKey rId $ XFTPRcvChunkSpec "tests/tmp/received_chunk1" chSize digest + liftIO $ B.readFile "tests/tmp/received_chunk1" `shouldReturn` bytes