mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 12:05:49 +00:00
xftp server: restore file status from log (#1461)
* xftp server: restore file blocking info from log * fix parse * rework * update * rename
This commit is contained in:
@@ -415,7 +415,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
|
||||
sId <- ExceptT $ addFileRetry st file 3 ts
|
||||
rcps <- mapM (ExceptT . addRecipientRetry st 3 sId) rks
|
||||
lift $ withFileLog $ \sl -> do
|
||||
logAddFile sl sId file ts
|
||||
logAddFile sl sId file ts EntityActive
|
||||
logAddRecipients sl sId rcps
|
||||
stats <- asks serverStats
|
||||
lift $ incFileStat filesCreated
|
||||
@@ -426,7 +426,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
|
||||
addFileRetry :: FileStore -> FileInfo -> Int -> RoundedSystemTime -> M (Either XFTPErrorType XFTPFileId)
|
||||
addFileRetry st file n ts =
|
||||
retryAdd n $ \sId -> runExceptT $ do
|
||||
ExceptT $ addFile st sId file ts
|
||||
ExceptT $ addFile st sId file ts EntityActive
|
||||
pure sId
|
||||
addRecipientRetry :: FileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient)
|
||||
addRecipientRetry st n sId rpk =
|
||||
|
||||
@@ -70,18 +70,18 @@ newFileStore = do
|
||||
usedStorage <- newTVarIO 0
|
||||
pure FileStore {files, recipients, usedStorage}
|
||||
|
||||
addFile :: FileStore -> SenderId -> FileInfo -> RoundedSystemTime -> STM (Either XFTPErrorType ())
|
||||
addFile FileStore {files} sId fileInfo createdAt =
|
||||
addFile :: FileStore -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM (Either XFTPErrorType ())
|
||||
addFile FileStore {files} sId fileInfo createdAt status =
|
||||
ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do
|
||||
f <- newFileRec sId fileInfo createdAt
|
||||
f <- newFileRec sId fileInfo createdAt status
|
||||
TM.insert sId f files
|
||||
pure $ Right ()
|
||||
|
||||
newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> STM FileRec
|
||||
newFileRec senderId fileInfo createdAt = do
|
||||
newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM FileRec
|
||||
newFileRec senderId fileInfo createdAt status = do
|
||||
recipientIds <- newTVar S.empty
|
||||
filePath <- newTVar Nothing
|
||||
fileStatus <- newTVar EntityActive
|
||||
fileStatus <- newTVar status
|
||||
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus}
|
||||
|
||||
setFilePath :: FileStore -> SenderId -> FilePath -> STM (Either XFTPErrorType ())
|
||||
|
||||
@@ -19,12 +19,13 @@ module Simplex.FileTransfer.Server.StoreLog
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad.Except
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Composition ((.:), (.:.))
|
||||
import Data.Composition ((.:), (.::))
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
@@ -33,13 +34,13 @@ import Simplex.FileTransfer.Protocol (FileInfo (..))
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..))
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Util (bshow)
|
||||
import System.IO
|
||||
|
||||
data FileStoreLogRecord
|
||||
= AddFile SenderId FileInfo RoundedSystemTime
|
||||
= AddFile SenderId FileInfo RoundedSystemTime ServerEntityStatus
|
||||
| PutFile SenderId FilePath
|
||||
| AddRecipients SenderId (NonEmpty FileRecipient)
|
||||
| DeleteFile SenderId
|
||||
@@ -49,7 +50,7 @@ data FileStoreLogRecord
|
||||
|
||||
instance StrEncoding FileStoreLogRecord where
|
||||
strEncode = \case
|
||||
AddFile sId file createdAt -> strEncode (Str "FNEW", sId, file, createdAt)
|
||||
AddFile sId file createdAt status -> strEncode (Str "FNEW", sId, file, createdAt, status)
|
||||
PutFile sId path -> strEncode (Str "FPUT", sId, path)
|
||||
AddRecipients sId rcps -> strEncode (Str "FADD", sId, rcps)
|
||||
DeleteFile sId -> strEncode (Str "FDEL", sId)
|
||||
@@ -57,7 +58,7 @@ instance StrEncoding FileStoreLogRecord where
|
||||
AckFile rId -> strEncode (Str "FACK", rId)
|
||||
strP =
|
||||
A.choice
|
||||
[ "FNEW " *> (AddFile <$> strP_ <*> strP_ <*> strP),
|
||||
[ "FNEW " *> (AddFile <$> strP_ <*> strP_ <*> strP <*> (_strP <|> pure EntityActive)),
|
||||
"FPUT " *> (PutFile <$> strP_ <*> strP),
|
||||
"FADD " *> (AddRecipients <$> strP_ <*> strP),
|
||||
"FDEL " *> (DeleteFile <$> strP),
|
||||
@@ -68,8 +69,8 @@ instance StrEncoding FileStoreLogRecord where
|
||||
logFileStoreRecord :: StoreLog 'WriteMode -> FileStoreLogRecord -> IO ()
|
||||
logFileStoreRecord = writeStoreLogRecord
|
||||
|
||||
logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedSystemTime -> IO ()
|
||||
logAddFile s = logFileStoreRecord s .:. AddFile
|
||||
logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> IO ()
|
||||
logAddFile s = logFileStoreRecord s .:: AddFile
|
||||
|
||||
logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO ()
|
||||
logPutFile s = logFileStoreRecord s .: PutFile
|
||||
@@ -99,7 +100,7 @@ readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.re
|
||||
Left e -> B.putStrLn $ "Log processing error (" <> bshow e <> "): " <> B.take 100 s
|
||||
_ -> pure ()
|
||||
addToStore = \case
|
||||
AddFile sId file createdAt -> addFile st sId file createdAt
|
||||
AddFile sId file createdAt status -> addFile st sId file createdAt status
|
||||
PutFile qId path -> setFilePath st qId path
|
||||
AddRecipients sId rcps -> runExceptT $ addRecipients sId rcps
|
||||
DeleteFile sId -> deleteFile st sId
|
||||
@@ -113,8 +114,9 @@ writeFileStore s FileStore {files, recipients} = do
|
||||
readTVarIO files >>= mapM_ (logFile allRcps)
|
||||
where
|
||||
logFile :: Map RecipientId (SenderId, RcvPublicAuthKey) -> FileRec -> IO ()
|
||||
logFile allRcps FileRec {senderId, fileInfo, filePath, recipientIds, createdAt} = do
|
||||
logAddFile s senderId fileInfo createdAt
|
||||
logFile allRcps FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus} = do
|
||||
status <- readTVarIO fileStatus
|
||||
logAddFile s senderId fileInfo createdAt status
|
||||
(rcpErrs, rcps) <- M.mapEither getRcp . M.fromSet id <$> readTVarIO recipientIds
|
||||
mapM_ (logAddRecipients s senderId) $ L.nonEmpty $ M.elems rcps
|
||||
mapM_ (B.putStrLn . ("Error storing log: " <>)) rcpErrs
|
||||
|
||||
Reference in New Issue
Block a user