diff --git a/src/Simplex/FileTransfer/Protocol.hs b/src/Simplex/FileTransfer/Protocol.hs index 5899f9c3e..9bf552732 100644 --- a/src/Simplex/FileTransfer/Protocol.hs +++ b/src/Simplex/FileTransfer/Protocol.hs @@ -25,7 +25,7 @@ import Data.List.NonEmpty (NonEmpty (..)) import Data.Maybe (isNothing) import Data.Type.Equality import Data.Word (Word32) -import Simplex.FileTransfer.Transport (XFTPErrorType (..), XFTPVersion, xftpClientHandshakeStub) +import Simplex.FileTransfer.Transport (XFTPErrorType (..), XFTPVersion, blockedFilesXFTPVersion, xftpClientHandshakeStub) import Simplex.Messaging.Client (authTransmission) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -276,12 +276,14 @@ data FileResponse instance ProtocolEncoding XFTPVersion XFTPErrorType FileResponse where type Tag FileResponse = FileResponseTag - encodeProtocol _v = \case + encodeProtocol v = \case FRSndIds fId rIds -> e (FRSndIds_, ' ', fId, rIds) FRRcvIds rIds -> e (FRRcvIds_, ' ', rIds) FRFile rDhKey nonce -> e (FRFile_, ' ', rDhKey, nonce) FROk -> e FROk_ - FRErr err -> e (FRErr_, ' ', err) + FRErr err -> case err of + BLOCKED _ | v < blockedFilesXFTPVersion -> e (FRErr_, ' ', AUTH) + _ -> e (FRErr_, ' ', err) FRPong -> e FRPong_ where e :: Encoding a => a -> ByteString diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 29cc5bf6a..935afe2f9 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -53,11 +53,11 @@ import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (CorrId (..), EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, TransmissionAuth, pattern NoEntity) +import Simplex.Messaging.Protocol (CorrId (..), BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, TransmissionAuth, pattern NoEntity) import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization) import Simplex.Messaging.Server.Control (CPClientRole (..)) import Simplex.Messaging.Server.Expiration -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, getRoundedSystemTime) +import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime) import Simplex.Messaging.Server.Stats import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -287,11 +287,15 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira CPDelete fileId -> withUserRole $ unliftIO u $ do fs <- asks store r <- runExceptT $ do - let asSender = ExceptT . atomically $ getFile fs SFSender fileId - let asRecipient = ExceptT . atomically $ getFile fs SFRecipient fileId - (fr, _) <- asSender `catchError` const asRecipient + (fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId ExceptT $ deleteServerFile_ fr liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r + CPBlock fileId info -> withUserRole $ unliftIO u $ do + fs <- asks store + r <- runExceptT $ do + (fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId + ExceptT $ blockServerFile fr info + liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r CPHelp -> hPutStrLn h "commands: stats-rts, delete, help, quit" CPQuit -> pure () CPSkip -> pure () @@ -321,7 +325,7 @@ processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHea let THandleParams {thAuth} = thParams verifyXFTPTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) sig_ signed fId cmd >>= \case VRVerified req -> uncurry send =<< processXFTPRequest body req - VRFailed -> send (FRErr AUTH) Nothing + VRFailed e -> send (FRErr e) Nothing Left e -> send (FRErr e) Nothing where send resp = sendXFTPResponse (corrId, fId, resp) @@ -355,7 +359,7 @@ randomDelay = do threadDelay $ (d * (1000 + pc)) `div` 1000 #endif -data VerificationResult = VRVerified XFTPRequest | VRFailed +data VerificationResult = VRVerified XFTPRequest | VRFailed XFTPErrorType verifyXFTPTransmission :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> XFTPFileId -> FileCmd -> M VerificationResult verifyXFTPTransmission auth_ tAuth authorized fId cmd = @@ -367,13 +371,19 @@ verifyXFTPTransmission auth_ tAuth authorized fId cmd = verifyCmd :: SFileParty p -> M VerificationResult verifyCmd party = do st <- asks store - atomically $ verify <$> getFile st party fId + atomically $ verify =<< getFile st party fId where verify = \case - Right (fr, k) -> XFTPReqCmd fId fr cmd `verifyWith` k - _ -> maybe False (dummyVerifyCmd Nothing authorized) tAuth `seq` VRFailed + Right (fr, k) -> result <$> readTVar (fileStatus fr) + where + result = \case + EntityActive -> XFTPReqCmd fId fr cmd `verifyWith` k + EntityBlocked info -> VRFailed $ BLOCKED info + EntityOff -> noFileAuth + Left _ -> pure noFileAuth + noFileAuth = maybe False (dummyVerifyCmd Nothing authorized) tAuth `seq` VRFailed AUTH -- TODO verify with DH authorization - req `verifyWith` k = if verifyCmdAuthorization auth_ tAuth authorized k then VRVerified req else VRFailed + req `verifyWith` k = if verifyCmdAuthorization auth_ tAuth authorized k then VRVerified req else VRFailed AUTH processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile) processXFTPRequest HTTP2Body {bodyPart} = \case @@ -390,7 +400,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case FACK -> noFile =<< ackFileReception fId fr -- it should never get to the commands below, they are passed in other constructors of XFTPRequest FNEW {} -> noFile $ FRErr INTERNAL - PING -> noFile FRPong + PING -> noFile $ FRErr INTERNAL XFTPReqPing -> noFile FRPong where noFile resp = pure (resp, Nothing) @@ -518,15 +528,24 @@ processXFTPRequest HTTP2Body {bodyPart} = \case pure FROk deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ()) -deleteServerFile_ FileRec {senderId, fileInfo, filePath} = do +deleteServerFile_ fr@FileRec {senderId} = do withFileLog (`logDeleteFile` senderId) - runExceptT $ do + deleteOrBlockServerFile_ fr filesDeleted (`deleteFile` senderId) + +-- this also deletes the file from storage, but doesn't include it in delete statistics +blockServerFile :: FileRec -> BlockingInfo -> M (Either XFTPErrorType ()) +blockServerFile fr@FileRec {senderId} info = do + withFileLog $ \sl -> logBlockFile sl senderId info + deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True + +deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (FileStore -> STM (Either XFTPErrorType ())) -> M (Either XFTPErrorType ()) +deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do path <- readTVarIO filePath stats <- asks serverStats ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats)) st <- asks store - void $ atomically $ deleteFile st senderId - lift $ incFileStat filesDeleted + void $ atomically $ storeAction st + lift $ incFileStat stat where deletedStats stats = do liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1) diff --git a/src/Simplex/FileTransfer/Server/Control.hs b/src/Simplex/FileTransfer/Server/Control.hs index 3a84ace49..2be8113db 100644 --- a/src/Simplex/FileTransfer/Server/Control.hs +++ b/src/Simplex/FileTransfer/Server/Control.hs @@ -6,12 +6,13 @@ module Simplex.FileTransfer.Server.Control where import qualified Data.Attoparsec.ByteString.Char8 as A import Simplex.FileTransfer.Protocol (XFTPFileId) import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (BasicAuth) +import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo) data ControlProtocol = CPAuth BasicAuth | CPStatsRTS | CPDelete XFTPFileId + | CPBlock XFTPFileId BlockingInfo | CPHelp | CPQuit | CPSkip @@ -21,6 +22,7 @@ instance StrEncoding ControlProtocol where CPAuth tok -> "auth " <> strEncode tok CPStatsRTS -> "stats-rts" CPDelete fId -> strEncode (Str "delete", fId) + CPBlock fId info -> strEncode (Str "block", fId, info) CPHelp -> "help" CPQuit -> "quit" CPSkip -> "" @@ -29,6 +31,7 @@ instance StrEncoding ControlProtocol where "auth" -> CPAuth <$> _strP "stats-rts" -> pure CPStatsRTS "delete" -> CPDelete <$> _strP + "block" -> CPBlock <$> _strP <*> _strP "help" -> pure CPHelp "quit" -> pure CPQuit "" -> pure CPSkip diff --git a/src/Simplex/FileTransfer/Server/Stats.hs b/src/Simplex/FileTransfer/Server/Stats.hs index a7951a65a..2f38c7447 100644 --- a/src/Simplex/FileTransfer/Server/Stats.hs +++ b/src/Simplex/FileTransfer/Server/Stats.hs @@ -20,6 +20,7 @@ data FileServerStats = FileServerStats filesUploaded :: IORef Int, filesExpired :: IORef Int, filesDeleted :: IORef Int, + filesBlocked :: IORef Int, filesDownloaded :: PeriodStats, fileDownloads :: IORef Int, fileDownloadAcks :: IORef Int, @@ -34,6 +35,7 @@ data FileServerStatsData = FileServerStatsData _filesUploaded :: Int, _filesExpired :: Int, _filesDeleted :: Int, + _filesBlocked :: Int, _filesDownloaded :: PeriodStatsData, _fileDownloads :: Int, _fileDownloadAcks :: Int, @@ -50,12 +52,13 @@ newFileServerStats ts = do filesUploaded <- newIORef 0 filesExpired <- newIORef 0 filesDeleted <- newIORef 0 + filesBlocked <- newIORef 0 filesDownloaded <- newPeriodStats fileDownloads <- newIORef 0 fileDownloadAcks <- newIORef 0 filesCount <- newIORef 0 filesSize <- newIORef 0 - pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize} + pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesBlocked, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize} getFileServerStatsData :: FileServerStats -> IO FileServerStatsData getFileServerStatsData s = do @@ -65,12 +68,13 @@ getFileServerStatsData s = do _filesUploaded <- readIORef $ filesUploaded s _filesExpired <- readIORef $ filesExpired s _filesDeleted <- readIORef $ filesDeleted s + _filesBlocked <- readIORef $ filesBlocked s _filesDownloaded <- getPeriodStatsData $ filesDownloaded s _fileDownloads <- readIORef $ fileDownloads s _fileDownloadAcks <- readIORef $ fileDownloadAcks s _filesCount <- readIORef $ filesCount s _filesSize <- readIORef $ filesSize s - pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} + pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} -- this function is not thread safe, it is used on server start only setFileServerStats :: FileServerStats -> FileServerStatsData -> IO () @@ -81,6 +85,7 @@ setFileServerStats s d = do writeIORef (filesUploaded s) $! _filesUploaded d writeIORef (filesExpired s) $! _filesExpired d writeIORef (filesDeleted s) $! _filesDeleted d + writeIORef (filesBlocked s) $! _filesBlocked d setPeriodStats (filesDownloaded s) $! _filesDownloaded d writeIORef (fileDownloads s) $! _fileDownloads d writeIORef (fileDownloadAcks s) $! _fileDownloadAcks d @@ -88,7 +93,7 @@ setFileServerStats s d = do writeIORef (filesSize s) $! _filesSize d instance StrEncoding FileServerStatsData where - strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} = + strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} = B.unlines [ "fromTime=" <> strEncode _fromTime, "filesCreated=" <> strEncode _filesCreated, @@ -96,6 +101,7 @@ instance StrEncoding FileServerStatsData where "filesUploaded=" <> strEncode _filesUploaded, "filesExpired=" <> strEncode _filesExpired, "filesDeleted=" <> strEncode _filesDeleted, + "filesBlocked=" <> strEncode _filesBlocked, "filesCount=" <> strEncode _filesCount, "filesSize=" <> strEncode _filesSize, "filesDownloaded:", @@ -110,9 +116,10 @@ instance StrEncoding FileServerStatsData where _filesUploaded <- "filesUploaded=" *> strP <* A.endOfLine _filesExpired <- "filesExpired=" *> strP <* A.endOfLine <|> pure 0 _filesDeleted <- "filesDeleted=" *> strP <* A.endOfLine + _filesBlocked <- "filesBlocked=" *> strP <* A.endOfLine _filesCount <- "filesCount=" *> strP <* A.endOfLine <|> pure 0 _filesSize <- "filesSize=" *> strP <* A.endOfLine <|> pure 0 _filesDownloaded <- "filesDownloaded:" *> A.endOfLine *> strP <* A.endOfLine _fileDownloads <- "fileDownloads=" *> strP <* A.endOfLine _fileDownloadAcks <- "fileDownloadAcks=" *> strP <* A.endOfLine - pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} + pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} diff --git a/src/Simplex/FileTransfer/Server/Store.hs b/src/Simplex/FileTransfer/Server/Store.hs index 46513ea96..c4536a2b5 100644 --- a/src/Simplex/FileTransfer/Server/Store.hs +++ b/src/Simplex/FileTransfer/Server/Store.hs @@ -13,6 +13,7 @@ module Simplex.FileTransfer.Server.Store setFilePath, addRecipient, deleteFile, + blockFile, deleteRecipient, expiredFilePath, getFile, @@ -22,6 +23,7 @@ module Simplex.FileTransfer.Server.Store where import Control.Concurrent.STM +import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Int (Int64) import Data.Set (Set) @@ -30,8 +32,8 @@ import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId import Simplex.FileTransfer.Transport (XFTPErrorType (..)) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..)) +import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) +import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..), ServerEntityStatus (..)) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (ifM, ($>>=)) @@ -47,7 +49,8 @@ data FileRec = FileRec fileInfo :: FileInfo, filePath :: TVar (Maybe FilePath), recipientIds :: TVar (Set RecipientId), - createdAt :: RoundedSystemTime + createdAt :: RoundedSystemTime, + fileStatus :: TVar ServerEntityStatus } fileTimePrecision :: Int64 @@ -78,7 +81,8 @@ newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> STM FileRec newFileRec senderId fileInfo createdAt = do recipientIds <- newTVar S.empty filePath <- newTVar Nothing - pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt} + fileStatus <- newTVar EntityActive + pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus} setFilePath :: FileStore -> SenderId -> FilePath -> STM (Either XFTPErrorType ()) setFilePath st sId fPath = @@ -109,6 +113,14 @@ deleteFile FileStore {files, recipients, usedStorage} senderId = do pure $ Right () _ -> pure $ Left AUTH +-- this function must be called after the file is deleted from the file system +blockFile :: FileStore -> SenderId -> BlockingInfo -> Bool -> STM (Either XFTPErrorType ()) +blockFile st@FileStore {usedStorage} senderId info deleted = + withFile st senderId $ \FileRec {fileInfo, fileStatus} -> do + when deleted $ modifyTVar' usedStorage $ subtract (fromIntegral $ size fileInfo) + writeTVar fileStatus $! EntityBlocked info + pure $ Right () + deleteRecipient :: FileStore -> RecipientId -> FileRec -> STM () deleteRecipient FileStore {recipients} rId FileRec {recipientIds} = do TM.delete rId recipients diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index 4c485c170..a229f62e7 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -14,6 +14,7 @@ module Simplex.FileTransfer.Server.StoreLog logPutFile, logAddRecipients, logDeleteFile, + logBlockFile, logAckFile, ) where @@ -31,7 +32,7 @@ import qualified Data.Map.Strict as M import Simplex.FileTransfer.Protocol (FileInfo (..)) import Simplex.FileTransfer.Server.Store import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId) +import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (bshow) @@ -42,7 +43,8 @@ data FileStoreLogRecord | PutFile SenderId FilePath | AddRecipients SenderId (NonEmpty FileRecipient) | DeleteFile SenderId - | AckFile RecipientId + | BlockFile SenderId BlockingInfo + | AckFile RecipientId -- TODO add senderId as well? deriving (Show) instance StrEncoding FileStoreLogRecord where @@ -51,6 +53,7 @@ instance StrEncoding FileStoreLogRecord where PutFile sId path -> strEncode (Str "FPUT", sId, path) AddRecipients sId rcps -> strEncode (Str "FADD", sId, rcps) DeleteFile sId -> strEncode (Str "FDEL", sId) + BlockFile sId info -> strEncode (Str "FBLK", sId, info) AckFile rId -> strEncode (Str "FACK", rId) strP = A.choice @@ -58,6 +61,7 @@ instance StrEncoding FileStoreLogRecord where "FPUT " *> (PutFile <$> strP_ <*> strP), "FADD " *> (AddRecipients <$> strP_ <*> strP), "FDEL " *> (DeleteFile <$> strP), + "FBLK " *> (BlockFile <$> strP_ <*> strP), "FACK " *> (AckFile <$> strP) ] @@ -76,6 +80,9 @@ logAddRecipients s = logFileStoreRecord s .: AddRecipients logDeleteFile :: StoreLog 'WriteMode -> SenderId -> IO () logDeleteFile s = logFileStoreRecord s . DeleteFile +logBlockFile :: StoreLog 'WriteMode -> SenderId -> BlockingInfo -> IO () +logBlockFile s fId = logFileStoreRecord s . BlockFile fId + logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO () logAckFile s = logFileStoreRecord s . AckFile @@ -96,6 +103,7 @@ readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.re PutFile qId path -> setFilePath st qId path AddRecipients sId rcps -> runExceptT $ addRecipients sId rcps DeleteFile sId -> deleteFile st sId + BlockFile sId info -> blockFile st sId info True AckFile rId -> ackFile st rId addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps diff --git a/src/Simplex/FileTransfer/Transport.hs b/src/Simplex/FileTransfer/Transport.hs index a028a3773..7f90b2879 100644 --- a/src/Simplex/FileTransfer/Transport.hs +++ b/src/Simplex/FileTransfer/Transport.hs @@ -11,6 +11,7 @@ module Simplex.FileTransfer.Transport ( supportedFileServerVRange, authCmdsXFTPVersion, + blockedFilesXFTPVersion, xftpClientHandshakeStub, supportedXFTPhandshakes, XFTPClientHandshake (..), @@ -33,7 +34,6 @@ module Simplex.FileTransfer.Transport ) where -import Control.Applicative ((<|>)) import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad @@ -57,7 +57,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers -import Simplex.Messaging.Protocol (CommandError) +import Simplex.Messaging.Protocol (BlockingInfo, CommandError) import Simplex.Messaging.Transport (ALPN, SessionId, THandle (..), THandleParams (..), TransportError (..), TransportPeer (..)) import Simplex.Messaging.Transport.HTTP2.File import Simplex.Messaging.Util (bshow, tshow) @@ -92,8 +92,11 @@ initialXFTPVersion = VersionXFTP 1 authCmdsXFTPVersion :: VersionXFTP authCmdsXFTPVersion = VersionXFTP 2 +blockedFilesXFTPVersion :: VersionXFTP +blockedFilesXFTPVersion = VersionXFTP 3 + currentXFTPVersion :: VersionXFTP -currentXFTPVersion = VersionXFTP 2 +currentXFTPVersion = VersionXFTP 3 supportedFileServerVRange :: VersionRangeXFTP supportedFileServerVRange = mkVersionRange initialXFTPVersion currentXFTPVersion @@ -211,6 +214,8 @@ data XFTPErrorType CMD {cmdErr :: CommandError} | -- | command authorization error - bad signature or non-existing SMP queue AUTH + | -- | command with the entity that was blocked + BLOCKED {blockInfo :: BlockingInfo} | -- | incorrent file size SIZE | -- | storage quota exceeded @@ -231,15 +236,46 @@ data XFTPErrorType INTERNAL | -- | used internally, never returned by the server (to be removed) DUPLICATE_ -- not part of SMP protocol, used internally - deriving (Eq, Read, Show) + deriving (Eq, Show) instance StrEncoding XFTPErrorType where strEncode = \case + BLOCK -> "BLOCK" + SESSION -> "SESSION" + HANDSHAKE -> "HANDSHAKE" CMD e -> "CMD " <> bshow e - e -> bshow e + AUTH -> "AUTH" + BLOCKED info -> "BLOCKED " <> strEncode info + SIZE -> "SIZE" + QUOTA -> "QUOTA" + DIGEST -> "DIGEST" + CRYPTO -> "CRYPTO" + NO_FILE -> "NO_FILE" + HAS_FILE -> "HAS_FILE" + FILE_IO -> "FILE_IO" + TIMEOUT -> "TIMEOUT" + INTERNAL -> "INTERNAL" + DUPLICATE_ -> "DUPLICATE_" + strP = - "CMD " *> (CMD <$> parseRead1) - <|> parseRead1 + A.takeTill (== ' ') >>= \case + "BLOCK" -> pure BLOCK + "SESSION" -> pure SESSION + "HANDSHAKE" -> pure HANDSHAKE + "CMD" -> CMD <$> parseRead1 + "AUTH" -> pure AUTH + "BLOCKED" -> BLOCKED <$> _strP + "SIZE" -> pure SIZE + "QUOTA" -> pure QUOTA + "DIGEST" -> pure DIGEST + "CRYPTO" -> pure CRYPTO + "NO_FILE" -> pure NO_FILE + "HAS_FILE" -> pure HAS_FILE + "FILE_IO" -> pure FILE_IO + "TIMEOUT" -> pure TIMEOUT + "INTERNAL" -> pure INTERNAL + "DUPLICATE_" -> pure DUPLICATE_ + _ -> fail "bad error type" instance Encoding XFTPErrorType where smpEncode = \case @@ -248,6 +284,7 @@ instance Encoding XFTPErrorType where HANDSHAKE -> "HANDSHAKE" CMD err -> "CMD " <> smpEncode err AUTH -> "AUTH" + BLOCKED info -> "BLOCKED " <> smpEncode info SIZE -> "SIZE" QUOTA -> "QUOTA" DIGEST -> "DIGEST" @@ -266,6 +303,7 @@ instance Encoding XFTPErrorType where "HANDSHAKE" -> pure HANDSHAKE "CMD" -> CMD <$> _smpP "AUTH" -> pure AUTH + "BLOCKED" -> BLOCKED <$> _smpP "SIZE" -> pure SIZE "QUOTA" -> pure QUOTA "DIGEST" -> pure DIGEST diff --git a/src/Simplex/Messaging/Parsers.hs b/src/Simplex/Messaging/Parsers.hs index a75efe0ee..008acd1d5 100644 --- a/src/Simplex/Messaging/Parsers.hs +++ b/src/Simplex/Messaging/Parsers.hs @@ -78,6 +78,7 @@ parseRead2 = parseRead $ do wordEnd :: Char -> Bool wordEnd c = c == ' ' || c == '\n' +{-# INLINE wordEnd #-} parseString :: (ByteString -> Either String a) -> (String -> a) parseString p = either error id . p . B.pack diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 03234aad4..2a76faa05 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -70,6 +70,8 @@ module Simplex.Messaging.Protocol CommandError (..), ProxyError (..), BrokerErrorType (..), + BlockingInfo (..), + BlockingReason (..), Transmission, TransmissionAuth (..), SignedTransmission, @@ -1194,6 +1196,8 @@ data ErrorType PROXY {proxyErr :: ProxyError} | -- | command authorization error - bad signature or non-existing SMP queue AUTH + | -- | command with the entity that was blocked + BLOCKED {blockInfo :: BlockingInfo} | -- | encryption/decryption error in proxy protocol CRYPTO | -- | SMP queue capacity is exceeded on the server @@ -1210,17 +1214,41 @@ data ErrorType INTERNAL | -- | used internally, never returned by the server (to be removed) DUPLICATE_ -- not part of SMP protocol, used internally - deriving (Eq, Read, Show) + deriving (Eq, Show) instance StrEncoding ErrorType where strEncode = \case + BLOCK -> "BLOCK" + SESSION -> "SESSION" CMD e -> "CMD " <> bshow e PROXY e -> "PROXY " <> strEncode e - e -> bshow e + AUTH -> "AUTH" + BLOCKED info -> "BLOCKED " <> strEncode info + CRYPTO -> "CRYPTO" + QUOTA -> "QUOTA" + STORE e -> "STORE " <> encodeUtf8 (T.pack e) + NO_MSG -> "NO_MSG" + LARGE_MSG -> "LARGE_MSG" + EXPIRED -> "EXPIRED" + INTERNAL -> "INTERNAL" + DUPLICATE_ -> "DUPLICATE_" strP = - "CMD " *> (CMD <$> parseRead1) - <|> "PROXY " *> (PROXY <$> strP) - <|> parseRead1 + A.choice + [ "BLOCK" $> BLOCK, + "SESSION" $> SESSION, + "CMD " *> (CMD <$> parseRead1), + "PROXY " *> (PROXY <$> strP), + "AUTH" $> AUTH, + "BLOCKED " *> strP, + "CRYPTO" $> CRYPTO, + "QUOTA" $> QUOTA, + "STORE " *> (STORE . T.unpack . safeDecodeUtf8 <$> A.takeByteString), + "NO_MSG" $> NO_MSG, + "LARGE_MSG" $> LARGE_MSG, + "EXPIRED" $> EXPIRED, + "INTERNAL" $> INTERNAL, + "DUPLICATE_" $> DUPLICATE_ + ] -- | SMP command error type. data CommandError @@ -1248,7 +1276,7 @@ data ProxyError BASIC_AUTH | -- no destination server error NO_SESSION - deriving (Eq, Read, Show) + deriving (Eq, Show) -- | SMP server errors. data BrokerErrorType @@ -1266,6 +1294,37 @@ data BrokerErrorType TIMEOUT deriving (Eq, Read, Show, Exception) +data BlockingInfo = BlockingInfo + { reason :: BlockingReason + } + deriving (Eq, Show) + +data BlockingReason = BRSpam | BRContent + deriving (Eq, Show) + +instance StrEncoding BlockingInfo where + strEncode BlockingInfo {reason} = "reason=" <> strEncode reason + strP = do + reason <- "reason=" *> strP + pure BlockingInfo {reason} + +instance Encoding BlockingInfo where + smpEncode = strEncode + smpP = strP + +instance StrEncoding BlockingReason where + strEncode = \case + BRSpam -> "spam" + BRContent -> "content" + strP = "spam" $> BRSpam <|> "content" $> BRContent + +instance ToJSON BlockingReason where + toJSON = strToJSON + toEncoding = strToJEncoding + +instance FromJSON BlockingReason where + parseJSON = strParseJSON "BlockingReason" + -- | SMP transmission parser. transmissionP :: THandleParams v p -> Parser RawTransmission transmissionP THandleParams {sessionId, implySessId} = do @@ -1435,7 +1494,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where | otherwise -> e END_ INFO info -> e (INFO_, ' ', info) OK -> e OK_ - ERR err -> e (ERR_, ' ', err) + ERR err -> case err of + BLOCKED _ | v < blockedEntityErrorSMPVersion -> e (ERR_, ' ', AUTH) + _ -> e (ERR_, ' ', err) PONG -> e PONG_ where e :: Encoding a => a -> ByteString @@ -1513,6 +1574,7 @@ instance Encoding ErrorType where CMD err -> "CMD " <> smpEncode err PROXY err -> "PROXY " <> smpEncode err AUTH -> "AUTH" + BLOCKED info -> "BLOCKED " <> smpEncode info CRYPTO -> "CRYPTO" QUOTA -> "QUOTA" STORE err -> "STORE " <> smpEncode err @@ -1529,6 +1591,7 @@ instance Encoding ErrorType where "CMD" -> CMD <$> _smpP "PROXY" -> PROXY <$> _smpP "AUTH" -> pure AUTH + "BLOCKED" -> BLOCKED <$> _smpP "CRYPTO" -> pure CRYPTO "QUOTA" -> pure QUOTA "STORE" -> STORE <$> _smpP @@ -1759,9 +1822,11 @@ tDecodeParseValidate THandleParams {sessionId, thVersion = v, implySessId} = \ca $(J.deriveJSON defaultJSON ''MsgFlags) -$(J.deriveJSON (sumTypeJSON id) ''CommandError) +$(J.deriveJSON (taggedObjectJSON id) ''CommandError) -$(J.deriveJSON (sumTypeJSON id) ''BrokerErrorType) +$(J.deriveJSON (taggedObjectJSON id) ''BrokerErrorType) + +$(J.deriveJSON defaultJSON ''BlockingInfo) -- run deriveJSON in one TH splice to allow mutual instance $(concat <$> mapM @[] (J.deriveJSON (sumTypeJSON id)) [''ProxyError, ''ErrorType]) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index afec6e332..d31a50e34 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -849,16 +849,41 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT SubPending -> (c1, c2 + 1, c3, c4) SubThread _ -> (c1, c2, c3 + 1, c4) ProhibitSub -> pure (c1, c2, c3, c4 + 1) - CPDelete qId -> withUserRole $ unliftIO u $ do + CPDelete sId -> withUserRole $ unliftIO u $ do AMS _ st <- asks msgStore r <- liftIO $ runExceptT $ do - (q, qr) <- ExceptT (getQueueRec st SSender qId) `catchE` \_ -> ExceptT (getQueueRec st SRecipient qId) + (q, qr) <- ExceptT $ getQueueRec st SSender sId ExceptT $ deleteQueueSize st (recipientId qr) q case r of Left e -> liftIO $ hPutStrLn h $ "error: " <> show e Right (qr, numDeleted) -> do updateDeletedStats qr liftIO $ hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted" + CPStatus sId -> withUserRole $ unliftIO u $ do + AMS _ st <- asks msgStore + q <- liftIO $ getQueueRec st SSender sId + liftIO $ hPutStrLn h $ case q of + Left e -> "error: " <> show e + Right (_, QueueRec {sndSecure, status, updatedAt}) -> + "status: " <> show status <> ", updatedAt: " <> show updatedAt <> ", sndSecure: " <> show sndSecure + CPBlock sId info -> withUserRole $ unliftIO u $ do + AMS _ st <- asks msgStore + r <- liftIO $ runExceptT $ do + q <- ExceptT $ getQueue st SSender sId + ExceptT $ blockQueue st q info + case r of + Left e -> liftIO $ hPutStrLn h $ "error: " <> show e + Right () -> do + incStat . qBlocked =<< asks serverStats + liftIO $ hPutStrLn h "ok" + CPUnblock sId -> withUserRole $ unliftIO u $ do + AMS _ st <- asks msgStore + r <- liftIO $ runExceptT $ do + q <- ExceptT $ getQueue st SSender sId + ExceptT $ unblockQueue st q + liftIO $ hPutStrLn h $ case r of + Left e -> "error: " <> show e + Right () -> "ok" CPSave -> withAdminRole $ withLock' (savingLock srv) "control" $ do hPutStrLn h "saving server state..." unliftIO u $ saveServer False @@ -1225,7 +1250,7 @@ client SKEY sKey -> withQueue $ \q QueueRec {sndSecure} -> (corrId,entId,) <$> if sndSecure then secureQueue_ q sKey else pure $ ERR AUTH - SEND flags msgBody -> withQueue $ sendMessage flags msgBody + SEND flags msgBody -> withQueue_ False $ sendMessage flags msgBody PING -> pure (corrId, NoEntity, PONG) RFWD encBlock -> (corrId, NoEntity,) <$> processForwardedCommand encBlock Cmd SNotifier NSUB -> Just <$> subscribeNotifications @@ -1247,7 +1272,7 @@ client NKEY nKey dhKey -> withQueue $ \q _ -> addQueueNotifier_ q nKey dhKey NDEL -> withQueue $ \q _ -> deleteQueueNotifier_ q OFF -> maybe (pure $ err INTERNAL) suspendQueue_ q_ - DEL -> maybe (pure $ err INTERNAL) delQueueAndMsgs q_ + DEL -> maybe (pure $ err INTERNAL) delQueueAndMsgs q_ QUE -> withQueue $ \q qr -> (corrId,entId,) <$> getQueueInfo q qr where createQueue :: RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg) @@ -1264,7 +1289,7 @@ client rcvDhSecret, senderKey = Nothing, notifier = Nothing, - status = QueueActive, + status = EntityActive, sndSecure, updatedAt } @@ -1406,13 +1431,18 @@ client Nothing -> incStat (msgGetNoMsg stats) $> ok withQueue :: (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) - withQueue action = case q_ of + withQueue = withQueue_ True + + withQueue_ :: Bool -> (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) + withQueue_ queueNotBlocked action = case q_ of Nothing -> pure $ err INTERNAL - Just (q, qr@QueueRec {updatedAt}) -> do - t <- liftIO getSystemDate - if updatedAt == Just t - then action q qr - else liftIO (updateQueueTime ms q t) >>= either (pure . err) (action q) + Just (q, qr@QueueRec {status, updatedAt}) -> case status of + EntityBlocked info | queueNotBlocked -> pure $ err $ BLOCKED info + _ -> do + t <- liftIO getSystemDate + if updatedAt == Just t + then action q qr + else liftIO (updateQueueTime ms q t) >>= either (pure . err) (action q) subscribeNotifications :: M (Transmission BrokerMsg) subscribeNotifications = do @@ -1483,10 +1513,13 @@ client | otherwise = do stats <- asks serverStats case status qr of - QueueOff -> do + EntityOff -> do incStat $ msgSentAuth stats pure $ err AUTH - QueueActive -> + EntityBlocked info -> do + incStat $ msgSentBlock stats + pure $ err $ BLOCKED info + EntityActive -> case C.maxLenBS msgBody of Left _ -> pure $ err LARGE_MSG Right body -> do @@ -1734,7 +1767,7 @@ updateDeletedStats q = do let delSel = if isNothing (senderKey q) then qDeletedNew else qDeletedSecured incStat $ delSel stats incStat $ qDeletedAll stats - incStat $ qCount stats + liftIO $ atomicModifyIORef'_ (qCount stats) (subtract 1) incStat :: MonadIO m => IORef Int -> m () incStat r = liftIO $ atomicModifyIORef'_ r (+ 1) diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index e1d1b5d12..318bce1cc 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -5,7 +5,7 @@ module Simplex.Messaging.Server.Control where import qualified Data.Attoparsec.ByteString.Char8 as A import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (BasicAuth, SenderId) +import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo, SenderId) data CPClientRole = CPRNone | CPRUser | CPRAdmin deriving (Eq) @@ -22,6 +22,9 @@ data ControlProtocol | CPSocketThreads | CPServerInfo | CPDelete SenderId + | CPStatus SenderId + | CPBlock SenderId BlockingInfo + | CPUnblock SenderId | CPSave | CPHelp | CPQuit @@ -39,14 +42,17 @@ instance StrEncoding ControlProtocol where CPSockets -> "sockets" CPSocketThreads -> "socket-threads" CPServerInfo -> "server-info" - CPDelete bs -> "delete " <> strEncode bs + CPDelete sId -> "delete " <> strEncode sId + CPStatus sId -> "status " <> strEncode sId + CPBlock sId info -> "block " <> strEncode sId <> " " <> strEncode info + CPUnblock sId -> "unblock " <> strEncode sId CPSave -> "save" CPHelp -> "help" CPQuit -> "quit" CPSkip -> "" strP = A.takeTill (== ' ') >>= \case - "auth" -> CPAuth <$> (A.space *> strP) + "auth" -> CPAuth <$> _strP "suspend" -> pure CPSuspend "resume" -> pure CPResume "clients" -> pure CPClients @@ -56,7 +62,10 @@ instance StrEncoding ControlProtocol where "sockets" -> pure CPSockets "socket-threads" -> pure CPSocketThreads "server-info" -> pure CPServerInfo - "delete" -> CPDelete <$> (A.space *> strP) + "delete" -> CPDelete <$> _strP + "status" -> CPStatus <$> _strP + "block" -> CPBlock <$> _strP <*> _strP + "unblock" -> CPUnblock <$> _strP "save" -> pure CPSave "help" -> pure CPHelp "quit" -> pure CPQuit diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 16bb02888..cb9c68d04 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -56,6 +56,7 @@ prometheusMetrics sm rtm ts = _qDeletedAllB, _qDeletedNew, _qDeletedSecured, + _qBlocked, _qSub, _qSubAllB, _qSubAuth, @@ -74,6 +75,7 @@ prometheusMetrics sm rtm ts = _msgSentAuth, _msgSentQuota, _msgSentLarge, + _msgSentBlock, _msgRecv, _msgRecvGet, _msgGet, @@ -122,6 +124,10 @@ prometheusMetrics sm rtm ts = \simplex_smp_queues_deleted{type=\"new\"} " <> mshow _qDeletedNew <> "\n# qDeletedNew\n\ \simplex_smp_queues_deleted{type=\"secured\"} " <> mshow _qDeletedSecured <> "\n# qDeletedSecured\n\ \\n\ + \# HELP simplex_smp_queues_deleted Deleted queues\n\ + \# TYPE simplex_smp_queues_deleted counter\n\ + \simplex_smp_queues_blocked " <> mshow _qBlocked <> "\n# qBlocked\n\ + \\n\ \# HELP simplex_smp_queues_deleted_batch Batched requests to delete queues\n\ \# TYPE simplex_smp_queues_deleted_batch counter\n\ \simplex_smp_queues_deleted_batch " <> mshow _qDeletedAllB <> "\n# qDeletedAllB\n\ @@ -197,6 +203,7 @@ prometheusMetrics sm rtm ts = \simplex_smp_messages_sent_errors{type=\"auth\"} " <> mshow _msgSentAuth <> "\n# msgSentAuth\n\ \simplex_smp_messages_sent_errors{type=\"quota\"} " <> mshow _msgSentQuota <> "\n# msgSentQuota\n\ \simplex_smp_messages_sent_errors{type=\"large\"} " <> mshow _msgSentLarge <> "\n# msgSentLarge\n\ + \simplex_smp_messages_sent_errors{type=\"block\"} " <> mshow _msgSentBlock <> "\n# msgSentBlock\n\ \\n\ \# HELP simplex_smp_messages_received Received messages.\n\ \# TYPE simplex_smp_messages_received counter\n\ diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 3f7da8d29..a40875680 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -1,11 +1,15 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} module Simplex.Messaging.Server.QueueStore where +import Control.Applicative ((<|>)) +import Data.Functor (($>)) import Data.Int (Int64) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Encoding.String @@ -19,7 +23,7 @@ data QueueRec = QueueRec senderKey :: !(Maybe SndPublicAuthKey), sndSecure :: !SenderCanSecure, notifier :: !(Maybe NtfCreds), - status :: !ServerQueueStatus, + status :: !ServerEntityStatus, updatedAt :: !(Maybe RoundedSystemTime) } deriving (Show) @@ -37,7 +41,21 @@ instance StrEncoding NtfCreds where (notifierId, notifierKey, rcvNtfDhSecret) <- strP pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show) +data ServerEntityStatus + = EntityActive + | EntityBlocked BlockingInfo + | EntityOff + deriving (Eq, Show) + +instance StrEncoding ServerEntityStatus where + strEncode = \case + EntityActive -> "active" + EntityBlocked info -> "blocked," <> strEncode info + EntityOff -> "off" + strP = + "active" $> EntityActive + <|> "blocked," *> (EntityBlocked <$> strP) + <|> "off" $> EntityOff newtype RoundedSystemTime = RoundedSystemTime Int64 deriving (Eq, Ord, Show) diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 7bf4f3a4a..a073b5500 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -21,6 +21,8 @@ module Simplex.Messaging.Server.QueueStore.STM addQueueNotifier, deleteQueueNotifier, suspendQueue, + blockQueue, + unblockQueue, updateQueueTime, deleteQueue', readQueueStore, @@ -118,7 +120,27 @@ suspendQueue st sq = where qr = queueRec' sq suspend q = do - writeTVar qr $! Just q {status = QueueOff} + writeTVar qr $! Just q {status = EntityOff} + pure $ recipientId q + +blockQueue :: STMQueueStore s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ()) +blockQueue st sq info = + atomically (readQueueRec qr >>= mapM block) + $>>= \rId -> withLog "blockQueue" st (\sl -> logBlockQueue sl rId info) + where + qr = queueRec' sq + block q = do + writeTVar qr $ Just q {status = EntityBlocked info} + pure $ recipientId q + +unblockQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +unblockQueue st sq = + atomically (readQueueRec qr >>= mapM unblock) + $>>= \rId -> withLog "unblockQueue" st (`logUnblockQueue` rId) + where + qr = queueRec' sq + unblock q = do + writeTVar qr $ Just q {status = EntityActive} pure $ recipientId q updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) @@ -177,6 +199,8 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st + BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info + UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index b384ad9b9..bbab6d8d2 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -34,6 +34,7 @@ data ServerStats = ServerStats qDeletedAllB :: IORef Int, qDeletedNew :: IORef Int, qDeletedSecured :: IORef Int, + qBlocked :: IORef Int, qSub :: IORef Int, -- only includes subscriptions when there were pending messages -- qSubNoMsg :: IORef Int, -- this stat creates too many STM transactions qSubAllB :: IORef Int, -- count of all subscription batches (with and without pending messages) @@ -53,6 +54,7 @@ data ServerStats = ServerStats msgSentAuth :: IORef Int, msgSentQuota :: IORef Int, msgSentLarge :: IORef Int, + msgSentBlock :: IORef Int, msgRecv :: IORef Int, msgRecvGet :: IORef Int, msgGet :: IORef Int, @@ -89,6 +91,7 @@ data ServerStatsData = ServerStatsData _qDeletedAllB :: Int, _qDeletedNew :: Int, _qDeletedSecured :: Int, + _qBlocked :: Int, _qSub :: Int, _qSubAllB :: Int, _qSubAuth :: Int, @@ -107,6 +110,7 @@ data ServerStatsData = ServerStatsData _msgSentAuth :: Int, _msgSentQuota :: Int, _msgSentLarge :: Int, + _msgSentBlock :: Int, _msgRecv :: Int, _msgRecvGet :: Int, _msgGet :: Int, @@ -144,6 +148,7 @@ newServerStats ts = do qDeletedAllB <- newIORef 0 qDeletedNew <- newIORef 0 qDeletedSecured <- newIORef 0 + qBlocked <- newIORef 0 qSub <- newIORef 0 qSubAllB <- newIORef 0 qSubAuth <- newIORef 0 @@ -162,6 +167,7 @@ newServerStats ts = do msgSentAuth <- newIORef 0 msgSentQuota <- newIORef 0 msgSentLarge <- newIORef 0 + msgSentBlock <- newIORef 0 msgRecv <- newIORef 0 msgRecvGet <- newIORef 0 msgGet <- newIORef 0 @@ -196,6 +202,7 @@ newServerStats ts = do qDeletedAllB, qDeletedNew, qDeletedSecured, + qBlocked, qSub, qSubAllB, qSubAuth, @@ -214,6 +221,7 @@ newServerStats ts = do msgSentAuth, msgSentQuota, msgSentLarge, + msgSentBlock, msgRecv, msgRecvGet, msgGet, @@ -250,6 +258,7 @@ getServerStatsData s = do _qDeletedAllB <- readIORef $ qDeletedAllB s _qDeletedNew <- readIORef $ qDeletedNew s _qDeletedSecured <- readIORef $ qDeletedSecured s + _qBlocked <- readIORef $ qBlocked s _qSub <- readIORef $ qSub s _qSubAllB <- readIORef $ qSubAllB s _qSubAuth <- readIORef $ qSubAuth s @@ -268,6 +277,7 @@ getServerStatsData s = do _msgSentAuth <- readIORef $ msgSentAuth s _msgSentQuota <- readIORef $ msgSentQuota s _msgSentLarge <- readIORef $ msgSentLarge s + _msgSentBlock <- readIORef $ msgSentBlock s _msgRecv <- readIORef $ msgRecv s _msgRecvGet <- readIORef $ msgRecvGet s _msgGet <- readIORef $ msgGet s @@ -302,6 +312,7 @@ getServerStatsData s = do _qDeletedAllB, _qDeletedNew, _qDeletedSecured, + _qBlocked, _qSub, _qSubAllB, _qSubAuth, @@ -320,6 +331,7 @@ getServerStatsData s = do _msgSentAuth, _msgSentQuota, _msgSentLarge, + _msgSentBlock, _msgRecv, _msgRecvGet, _msgGet, @@ -357,6 +369,7 @@ setServerStats s d = do writeIORef (qDeletedAllB s) $! _qDeletedAllB d writeIORef (qDeletedNew s) $! _qDeletedNew d writeIORef (qDeletedSecured s) $! _qDeletedSecured d + writeIORef (qBlocked s) $! _qBlocked d writeIORef (qSub s) $! _qSub d writeIORef (qSubAllB s) $! _qSubAllB d writeIORef (qSubAuth s) $! _qSubAuth d @@ -375,6 +388,7 @@ setServerStats s d = do writeIORef (msgSentAuth s) $! _msgSentAuth d writeIORef (msgSentQuota s) $! _msgSentQuota d writeIORef (msgSentLarge s) $! _msgSentLarge d + writeIORef (msgSentBlock s) $! _msgSentBlock d writeIORef (msgRecv s) $! _msgRecv d writeIORef (msgRecvGet s) $! _msgRecvGet d writeIORef (msgGet s) $! _msgGet d @@ -411,6 +425,7 @@ instance StrEncoding ServerStatsData where "qDeletedNew=" <> strEncode (_qDeletedNew d), "qDeletedSecured=" <> strEncode (_qDeletedSecured d), "qDeletedAllB=" <> strEncode (_qDeletedAllB d), + "qBlocked=" <> strEncode (_qBlocked d), "qCount=" <> strEncode (_qCount d), "qSub=" <> strEncode (_qSub d), "qSubAllB=" <> strEncode (_qSubAllB d), @@ -430,6 +445,7 @@ instance StrEncoding ServerStatsData where "msgSentAuth=" <> strEncode (_msgSentAuth d), "msgSentQuota=" <> strEncode (_msgSentQuota d), "msgSentLarge=" <> strEncode (_msgSentLarge d), + "msgSentBlock=" <> strEncode (_msgSentBlock d), "msgRecv=" <> strEncode (_msgRecv d), "msgRecvGet=" <> strEncode (_msgRecvGet d), "msgGet=" <> strEncode (_msgGet d), @@ -467,6 +483,7 @@ instance StrEncoding ServerStatsData where (,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine) <|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine)) _qDeletedAllB <- opt "qDeletedAllB=" + _qBlocked <- opt "qBlocked=" _qCount <- opt "qCount=" _qSub <- opt "qSub=" _qSubNoMsg <- skipInt "qSubNoMsg=" -- skipping it for backward compatibility @@ -487,6 +504,7 @@ instance StrEncoding ServerStatsData where _msgSentAuth <- opt "msgSentAuth=" _msgSentQuota <- opt "msgSentQuota=" _msgSentLarge <- opt "msgSentLarge=" + _msgSentBlock <- opt "msgSentBlock=" _msgRecv <- "msgRecv=" *> strP <* A.endOfLine _msgRecvGet <- opt "msgRecvGet=" _msgGet <- opt "msgGet=" @@ -532,6 +550,7 @@ instance StrEncoding ServerStatsData where _qDeletedAllB, _qDeletedNew, _qDeletedSecured, + _qBlocked, _qSub, _qSubAllB, _qSubAuth, @@ -550,6 +569,7 @@ instance StrEncoding ServerStatsData where _msgSentAuth, _msgSentQuota, _msgSentLarge, + _msgSentBlock, _msgRecv, _msgRecvGet, _msgGet, diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 2da3398f2..8dee31940 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -21,6 +21,8 @@ module Simplex.Messaging.Server.StoreLog logSecureQueue, logAddNotifier, logSuspendQueue, + logBlockQueue, + logUnblockQueue, logDeleteQueue, logDeleteNotifier, logUpdateQueueTime, @@ -33,9 +35,9 @@ import Control.Applicative (optional, (<|>)) import Control.Concurrent.STM import qualified Control.Exception as E import Control.Logger.Simple -import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.Functor (($>)) import qualified Data.Map.Strict as M import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -56,6 +58,8 @@ data StoreLogRecord | SecureQueue QueueId SndPublicAuthKey | AddNotifier QueueId NtfCreds | SuspendQueue QueueId + | BlockQueue QueueId BlockingInfo + | UnblockQueue QueueId | DeleteQueue QueueId | DeleteNotifier QueueId | UpdateTime QueueId RoundedSystemTime @@ -66,12 +70,14 @@ data SLRTag | SecureQueue_ | AddNotifier_ | SuspendQueue_ + | BlockQueue_ + | UnblockQueue_ | DeleteQueue_ | DeleteNotifier_ | UpdateTime_ instance StrEncoding QueueRec where - strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} = + strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = B.unwords [ "rid=" <> strEncode recipientId, "rk=" <> strEncode recipientKey, @@ -82,10 +88,14 @@ instance StrEncoding QueueRec where <> sndSecureStr <> maybe "" notifierStr notifier <> maybe "" updatedAtStr updatedAt + <> statusStr where sndSecureStr = if sndSecure then " sndSecure=" <> strEncode sndSecure else "" notifierStr ntfCreds = " notifier=" <> strEncode ntfCreds updatedAtStr t = " updated_at=" <> strEncode t + statusStr = case status of + EntityActive -> "" + _ -> " status=" <> strEncode status strP = do recipientId <- "rid=" *> strP_ @@ -96,7 +106,8 @@ instance StrEncoding QueueRec where sndSecure <- (" sndSecure=" *> strP) <|> pure False notifier <- optional $ " notifier=" *> strP updatedAt <- optional $ " updated_at=" *> strP - pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt} + status <- (" status=" *> strP) <|> pure EntityActive + pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} instance StrEncoding SLRTag where strEncode = \case @@ -104,20 +115,24 @@ instance StrEncoding SLRTag where SecureQueue_ -> "SECURE" AddNotifier_ -> "NOTIFIER" SuspendQueue_ -> "SUSPEND" + BlockQueue_ -> "BLOCK" + UnblockQueue_ -> "UNBLOCK" DeleteQueue_ -> "DELETE" DeleteNotifier_ -> "NDELETE" UpdateTime_ -> "TIME" strP = - A.takeTill (== ' ') >>= \case - "CREATE" -> pure CreateQueue_ - "SECURE" -> pure SecureQueue_ - "NOTIFIER" -> pure AddNotifier_ - "SUSPEND" -> pure SuspendQueue_ - "DELETE" -> pure DeleteQueue_ - "NDELETE" -> pure DeleteNotifier_ - "TIME" -> pure UpdateTime_ - s -> fail $ "invalid log record tag: " <> B.unpack s + A.choice + [ "CREATE" $> CreateQueue_, + "SECURE" $> SecureQueue_, + "NOTIFIER" $> AddNotifier_, + "SUSPEND" $> SuspendQueue_, + "BLOCK" $> BlockQueue_, + "UNBLOCK" $> UnblockQueue_, + "DELETE" $> DeleteQueue_, + "NDELETE" $> DeleteNotifier_, + "TIME" $> UpdateTime_ + ] instance StrEncoding StoreLogRecord where strEncode = \case @@ -125,6 +140,8 @@ instance StrEncoding StoreLogRecord where SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey) AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds) SuspendQueue rId -> strEncode (SuspendQueue_, rId) + BlockQueue rId info -> strEncode (BlockQueue_, rId, info) + UnblockQueue rId -> strEncode (UnblockQueue_, rId) DeleteQueue rId -> strEncode (DeleteQueue_, rId) DeleteNotifier rId -> strEncode (DeleteNotifier_, rId) UpdateTime rId t -> strEncode (UpdateTime_, rId, t) @@ -135,6 +152,8 @@ instance StrEncoding StoreLogRecord where SecureQueue_ -> SecureQueue <$> strP_ <*> strP AddNotifier_ -> AddNotifier <$> strP_ <*> strP SuspendQueue_ -> SuspendQueue <$> strP + BlockQueue_ -> BlockQueue <$> strP_ <*> strP + UnblockQueue_ -> UnblockQueue <$> strP DeleteQueue_ -> DeleteQueue <$> strP DeleteNotifier_ -> DeleteNotifier <$> strP UpdateTime_ -> UpdateTime <$> strP_ <*> strP @@ -179,6 +198,12 @@ logAddNotifier s qId ntfCreds = writeStoreLogRecord s $ AddNotifier qId ntfCreds logSuspendQueue :: StoreLog 'WriteMode -> QueueId -> IO () logSuspendQueue s = writeStoreLogRecord s . SuspendQueue +logBlockQueue :: StoreLog 'WriteMode -> QueueId -> BlockingInfo -> IO () +logBlockQueue s qId info = writeStoreLogRecord s $ BlockQueue qId info + +logUnblockQueue :: StoreLog 'WriteMode -> QueueId -> IO () +logUnblockQueue s = writeStoreLogRecord s . UnblockQueue + logDeleteQueue :: StoreLog 'WriteMode -> QueueId -> IO () logDeleteQueue s = writeStoreLogRecord s . DeleteQueue @@ -228,6 +253,5 @@ writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M. where writeQueue (rId, q) = readTVarIO (queueRec' q) >>= \case - Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them + Just q' -> logCreateQueue s q' Nothing -> atomically $ TM.delete rId $ activeMsgQueues st - active QueueRec {status} = status == QueueActive diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index e64485964..1bb4cff58 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -49,6 +49,7 @@ module Simplex.Messaging.Transport sndAuthKeySMPVersion, deletedEventSMPVersion, encryptedBlockSMPVersion, + blockedEntityErrorSMPVersion, simplexMQVersion, smpBlockSize, TransportConfig (..), @@ -142,6 +143,7 @@ smpBlockSize = 16384 -- 9 - faster handshake: SKEY command for sender to secure queue -- 10 - DELD event to subscriber when queue is deleted via another connnection -- 11 - additional encryption of transport blocks with forward secrecy (9/14/2024) +-- 12 - BLOCKED error for blocked queues (1/11/2025) data SMPVersion @@ -178,14 +180,17 @@ deletedEventSMPVersion = VersionSMP 10 encryptedBlockSMPVersion :: VersionSMP encryptedBlockSMPVersion = VersionSMP 11 +blockedEntityErrorSMPVersion :: VersionSMP +blockedEntityErrorSMPVersion = VersionSMP 12 + currentClientSMPRelayVersion :: VersionSMP -currentClientSMPRelayVersion = VersionSMP 11 +currentClientSMPRelayVersion = VersionSMP 12 legacyServerSMPRelayVersion :: VersionSMP legacyServerSMPRelayVersion = VersionSMP 6 currentServerSMPRelayVersion :: VersionSMP -currentServerSMPRelayVersion = VersionSMP 11 +currentServerSMPRelayVersion = VersionSMP 12 -- Max SMP protocol version to be used in e2e encrypted -- connection between client and server, as defined by SMP proxy. @@ -193,7 +198,7 @@ currentServerSMPRelayVersion = VersionSMP 11 -- to prevent client version fingerprinting by the -- destination relays when clients upgrade at different times. proxiedSMPRelayVersion :: VersionSMP -proxiedSMPRelayVersion = VersionSMP 9 +proxiedSMPRelayVersion = VersionSMP 11 -- minimal supported protocol version is 4 -- TODO remove code that supports sending commands without batching diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 35c27c22e..f9afecf5a 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -112,7 +112,7 @@ testNewQueueRec g sndSecure = do senderKey = Nothing, sndSecure, notifier = Nothing, - status = QueueActive, + status = EntityActive, updatedAt = Nothing } pure (rId, qr) diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index b0bd17dff..9cde80286 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -41,11 +41,12 @@ import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..)) import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore) import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) -import Simplex.Messaging.Server.StoreLog (closeStoreLog) +import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..), closeStoreLog) import Simplex.Messaging.Transport import Simplex.Messaging.Util (whenM) import Simplex.Messaging.Version (mkVersionRange) import System.Directory (doesDirectoryExist, doesFileExist, removeDirectoryRecursive, removeFile) +import System.IO (IOMode (..), withFile) import System.TimeIt (timeItT) import System.Timeout import Test.HUnit @@ -78,6 +79,7 @@ serverTests = do testMsgExpireOnSend testMsgExpireOnInterval testMsgNOTExpireOnInterval + describe "Blocking queues" $ testBlockMessageQueue pattern Resp :: CorrId -> QueueId -> BrokerMsg -> SignedTransmission ErrorType BrokerMsg pattern Resp corrId queueId command <- (_, _, (corrId, queueId, Right command)) @@ -688,7 +690,7 @@ testRestoreMessages = logSize testStoreLogFile `shouldReturn` 2 -- logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 74 + logSize testServerStatsBackupFile `shouldReturn` 76 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -706,7 +708,7 @@ testRestoreMessages = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd -- logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 74 + logSize testServerStatsBackupFile `shouldReturn` 76 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -724,7 +726,7 @@ testRestoreMessages = pure () logSize testStoreLogFile `shouldReturn` 1 -- logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 74 + logSize testServerStatsBackupFile `shouldReturn` 76 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5 @@ -996,7 +998,7 @@ testMsgExpireOnInterval = testMsgNOTExpireOnInterval :: SpecWith (ATransport, AMSType) testMsgNOTExpireOnInterval = - it "should NOT expire messages that are not received before messageTTL if expiry interval is large" $ \(ATransport (t :: TProxy c), msType) -> do + it "should block and unblock message queues" $ \(ATransport (t :: TProxy c), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 10000}} @@ -1013,6 +1015,30 @@ testMsgNOTExpireOnInterval = Nothing -> return () Just _ -> error "nothing else should be delivered" +testBlockMessageQueue :: SpecWith (ATransport, AMSType) +testBlockMessageQueue = + it "should return BLOCKED error when queue is blocked" $ \(at@(ATransport (t :: TProxy c)), msType) -> do + g <- C.newRandom + (rId, sId) <- withSmpServerStoreLogOnMS at msType testPort $ runTest t $ \h -> do + (rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g + (dhPub, _dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + Resp "abcd" rId1 (Ids rId sId _srvDh) <- signSendRecv h rKey ("abcd", NoEntity, NEW rPub dhPub Nothing SMSubscribe True) + (rId1, NoEntity) #== "creates queue" + pure (rId, sId) + + withFile testStoreLogFile AppendMode $ \h -> B.hPutStrLn h $ strEncode $ BlockQueue rId $ BlockingInfo BRContent + + withSmpServerStoreLogOnMS at msType testPort $ runTest t $ \h -> do + (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g + Resp "dabc" sId2 (ERR (BLOCKED (BlockingInfo BRContent))) <- signSendRecv h sKey ("dabc", sId, SKEY sPub) + (sId2, sId) #== "same queue ID in response" + where + runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO a) -> ThreadId -> IO a + runTest _ test' server = do + a <- testSMPClient test' + killThread server + pure a + samplePubKey :: C.APublicVerifyKey samplePubKey = C.APublicVerifyKey C.SEd25519 "MCowBQYDK2VwAyEAfAOflyvbJv1fszgzkQ6buiZJVgSpQWsucXq7U6zjMgY=" diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs index cc40ee3f7..5193e56cf 100644 --- a/tests/XFTPServerTests.hs +++ b/tests/XFTPServerTests.hs @@ -289,7 +289,7 @@ testFileLog = do download g c rpKey1 rId1 digest bytes download g c rpKey2 rId2 digest bytes logSize testXFTPLogFile `shouldReturn` 3 - logSize testXFTPStatsBackupFile `shouldReturn` 14 + logSize testXFTPStatsBackupFile `shouldReturn` 15 threadDelay 100000 @@ -316,7 +316,7 @@ testFileLog = do -- recipient 2 can download download g c rpKey2 rId2 digest bytes logSize testXFTPLogFile `shouldReturn` 4 - logSize testXFTPStatsBackupFile `shouldReturn` 14 + logSize testXFTPStatsBackupFile `shouldReturn` 15 threadDelay 100000 @@ -337,13 +337,13 @@ testFileLog = do -- sender can delete - +1 to log deleteXFTPChunk c spKey sId logSize testXFTPLogFile `shouldReturn` 4 - logSize testXFTPStatsBackupFile `shouldReturn` 14 + logSize testXFTPStatsBackupFile `shouldReturn` 15 threadDelay 100000 withXFTPServerStoreLogOn $ \_ -> pure () -- compacts on start logSize testXFTPLogFile `shouldReturn` 0 - logSize testXFTPStatsBackupFile `shouldReturn` 14 + logSize testXFTPStatsBackupFile `shouldReturn` 15 threadDelay 100000