diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 4fecc4bef..429903b70 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -69,6 +69,7 @@ import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) import Data.Semigroup (Sum (..)) +import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import qualified Data.Text.IO as T @@ -104,6 +105,7 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Server.StoreLog (foldLogLines) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -111,7 +113,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import Simplex.Messaging.Version -import System.Exit (exitFailure) +import System.Exit (exitFailure, exitSuccess) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) @@ -162,14 +164,18 @@ newMessageStats :: MessageStats newMessageStats = MessageStats 0 0 0 smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M () -smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do +smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOptions} attachHTTP_ = do s <- asks server pa <- asks proxyAgent - msgStats_ <- processServerMessages + msgStats_ <- processServerMessages startOptions ntfStats <- restoreServerNtfs liftIO $ mapM_ (printMessageStats "messages") msgStats_ liftIO $ printMessageStats "notifications" ntfStats restoreServerStats msgStats_ ntfStats + when (maintenance startOptions) $ do + liftIO $ putStrLn "Server started in 'maintenance' mode, exiting" + stopServer s + liftIO $ exitSuccess raceAny_ ( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ()) @@ -1816,8 +1822,8 @@ exportMessages tty ms f drainMsgs = do exitFailure encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') -processServerMessages :: M (Maybe MessageStats) -processServerMessages = do +processServerMessages :: StartOptions -> M (Maybe MessageStats) +processServerMessages StartOptions {skipWarnings} = do old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) expire <- asks $ expireMessagesOnStart . config asks msgStore >>= liftIO . processMessages old_ expire @@ -1825,7 +1831,7 @@ processServerMessages = do processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats) processMessages old_ expire = \case AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of - Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing) + Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing) Nothing -> pure Nothing AMS SMSJournal ms | expire -> Just <$> case old_ of @@ -1858,44 +1864,56 @@ processServerMessages = do logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e exitFailure --- TODO this function should be called after importing queues from store log -importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats -importMessages tty ms f old_ = do +importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> Bool -> IO MessageStats +importMessages tty ms f old_ skipWarnings = do logInfo $ "restoring messages from file " <> T.pack f - LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case - Left e -> do - when tty $ putStrLn "" - logError . T.pack $ "error restoring messages: " <> e - liftIO exitFailure - Right (lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) -> do - putStrLn $ progress lineCount - renameFile f $ f <> ".bak" - mapM_ setOverQuota_ overQuota - logQueueStates ms - storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms) - pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} + (lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) <- + foldLogLines tty f restoreMsg (0, Nothing, (0, 0, M.empty)) + putStrLn $ progress lineCount + renameFile f $ f <> ".bak" + mapM_ setOverQuota_ overQuota + logQueueStates ms + storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms) + pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} where progress i = "Processed " <> show i <> " lines" - restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> LB.ByteString -> ExceptT String IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) - restoreMsg (!i, q_, (!stored, !expired, !overQuota)) s' = do - when (tty && i `mod` 1000 == 0) $ liftIO $ putStr (progress i <> "\r") >> hFlush stdout - MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s - liftError show $ addToMsgQueue rId msg + restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> Bool -> ByteString -> IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) + restoreMsg (!i, q_, counts@(!stored, !expired, !overQuota)) eof s = do + when (tty && i `mod` 1000 == 0) $ putStr (progress i <> "\r") >> hFlush stdout + case strDecode s of + Right (MLRv3 rId msg) -> runExceptT (addToMsgQueue rId msg) >>= either (exitErr . tshow) pure + Left e + | eof -> warnOrExit (parsingErr e) $> (i + 1, q_, counts) + | otherwise -> exitErr $ parsingErr e where - s = LB.toStrict s' + exitErr e = do + when tty $ putStrLn "" + logError $ "error restoring messages: " <> e + liftIO exitFailure + parsingErr :: String -> Text + parsingErr e = "parsing error (" <> T.pack e <> "): " <> safeDecodeUtf8 (B.take 100 s) addToMsgQueue rId msg = do - q <- case q_ of + qOrErr <- case q_ of -- to avoid lookup when restoring the next message to the same queue - Just (rId', q') | rId' == rId -> pure q' - _ -> ExceptT $ getQueue ms SRecipient rId + Just (rId', q') | rId' == rId -> pure $ Right q' + _ -> liftIO $ getQueue ms SRecipient rId + case qOrErr of + Right q -> addToQueue_ q rId msg + Left AUTH -> liftIO $ do + when tty $ putStrLn "" + warnOrExit $ "queue " <> safeDecodeUtf8 (encode $ unEntityId rId) <> " does not exist" + pure (i + 1, Nothing, counts) + Left e -> throwE e + addToQueue_ q rId msg = (i + 1,Just (rId, q),) <$> case msg of Message {msgTs} | maybe True (systemSeconds msgTs >=) old_ -> do writeMsg ms q False msg >>= \case Just _ -> pure (stored + 1, expired, overQuota) - Nothing -> do + Nothing -> liftIO $ do + when tty $ putStrLn "" logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) - pure (stored, expired, overQuota) + pure counts | otherwise -> pure (stored, expired + 1, overQuota) MessageQuota {} -> -- queue was over quota at some point, @@ -1907,8 +1925,13 @@ importMessages tty ms f old_ = do withPeekMsgQueue ms q "mergeQuotaMsgs" $ maybe (pure ()) $ \case (mq, MessageQuota {}) -> tryDeleteMsg_ q mq False _ -> pure () - msgErr :: Show e => String -> e -> String - msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s) + warnOrExit e + | skipWarnings = logWarn e' + | otherwise = do + logWarn $ e' <> ", start with --skip-warnings option to ignore this error" + exitFailure + where + e' = "warning restoring messages: " <> e printMessageStats :: T.Text -> MessageStats -> IO () printMessageStats name MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} = diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 58d57a4c5..005ce6d9b 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -116,7 +116,13 @@ data ServerConfig = ServerConfig allowSMPProxy :: Bool, -- auth is the same with `newQueueBasicAuth` serverClientConcurrency :: Int, -- | server public information - information :: Maybe ServerPublicInfo + information :: Maybe ServerPublicInfo, + startOptions :: StartOptions + } + +data StartOptions = StartOptions + { maintenance :: Bool, + skipWarnings :: Bool } defMsgExpirationDays :: Int64 diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index a03aaa68d..e35803171 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -73,7 +73,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = True -> exitError $ "Error: server is already initialized (" <> iniFile <> " exists).\nRun `" <> executableName <> " start`." _ -> initializeServer opts OnlineCert certOpts -> withIniFile $ \_ -> genOnline cfgPath certOpts - Start -> withIniFile runServer + Start opts -> withIniFile $ runServer opts Delete -> do confirmOrExit "WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!" @@ -107,7 +107,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = "Messages not imported" ms <- newJournalMsgStore readQueueStore storeLogFile ms - msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration + msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration putStrLn "Import completed" printMessageStats "Messages" msgStats putStrLn $ case readMsgStoreType ini of @@ -322,7 +322,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = <> (webDisabled <> "key: " <> T.pack httpsKeyFile <> "\n") where webDisabled = if disableWeb then "# " else "" - runServer ini = do + runServer startOptions ini = do hSetBuffering stdout LineBuffering hSetBuffering stderr LineBuffering fp <- checkSavedFingerprint cfgPath defaultX509Config @@ -463,7 +463,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = }, allowSMPProxy = True, serverClientConcurrency = readIniDefault defaultProxyClientConcurrency "PROXY" "client_concurrency" ini, - information = serverPublicInfo ini + information = serverPublicInfo ini, + startOptions } textToOwnServers :: Text -> [ByteString] textToOwnServers = map encodeUtf8 . T.words @@ -635,7 +636,7 @@ printSourceCode = \case data CliCommand = Init InitOptions | OnlineCert CertOptions - | Start + | Start StartOptions | Delete | Journal JournalCmd @@ -669,7 +670,7 @@ cliCommandP cfgPath logPath iniFile = hsubparser ( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files")) <> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")")) - <> command "start" (info (pure Start) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) + <> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) <> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files")) <> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage")) ) @@ -811,6 +812,18 @@ cliCommandP cfgPath logPath iniFile = disableWeb, scripted } + startOptionsP = do + maintenance <- + switch + ( long "maintenance" + <> help "Do not start the server, only perform start and stop tasks" + ) + skipWarnings <- + switch + ( long "skip-warnings" + <> help "Start the server with non-critical start warnings" + ) + pure StartOptions {maintenance, skipWarnings} journalCmdP = hsubparser ( command "import" (info (pure JCImport) (progDesc "Import message log file into a new journal storage")) diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index f1347533a..2fd3e9912 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -38,7 +38,6 @@ import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Bitraversable (bimapM) import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) @@ -48,7 +47,8 @@ import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.StoreLog import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$)) +import Simplex.Messaging.Util (ifM, safeDecodeUtf8, tshow, ($>>=), (<$$)) +import System.Exit (exitFailure) import System.IO import UnliftIO.STM @@ -196,12 +196,11 @@ withLog :: STMStoreClass s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> I withLog name = withLog' name . storeLog . stmQueueStore readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO () -readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines +readQueueStore f st = readLogLines False f processLine where - processLine :: LB.ByteString -> IO () - processLine s' = either printError procLogRecord (strDecode s) + processLine :: Bool -> B.ByteString -> IO () + processLine eof s = either printError procLogRecord (strDecode s) where - s = LB.toStrict s' procLogRecord :: StoreLogRecord -> IO () procLogRecord = \case CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue" @@ -214,7 +213,11 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t printError :: String -> IO () - printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s + printError e + | eof = logWarn err + | otherwise = logError err >> exitFailure + where + err = "Error parsing log: " <> T.pack e <> " - " <> safeDecodeUtf8 s withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () withQueue qId op a = runExceptT go >>= qError qId op where diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index e676770f7..2d82014f7 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -28,6 +28,8 @@ module Simplex.Messaging.Server.StoreLog logUpdateQueueTime, readWriteStoreLog, writeQueueStore, + readLogLines, + foldLogLines, ) where @@ -35,6 +37,7 @@ import Control.Applicative (optional, (<|>)) import Control.Concurrent.STM import qualified Control.Exception as E import Control.Logger.Simple +import Control.Monad (when) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) @@ -254,3 +257,21 @@ writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs readTVarIO (queueRec' q) >>= \case Just q' -> logCreateQueue s rId q' Nothing -> atomically $ TM.delete rId qs + +readLogLines :: Bool -> FilePath -> (Bool -> B.ByteString -> IO ()) -> IO () +readLogLines tty f action = foldLogLines tty f (const action) () + +foldLogLines :: Bool -> FilePath -> (a -> Bool -> B.ByteString -> IO a) -> a -> IO a +foldLogLines tty f action initValue = do + (count :: Int, acc) <- withFile f ReadMode $ \h -> ifM (hIsEOF h) (pure (0, initValue)) (loop h 0 initValue) + putStrLn $ progress count + pure acc + where + loop h i acc = do + s <- B.hGetLine h + eof <- hIsEOF h + acc' <- action acc eof s + let i' = i + 1 + when (tty && i' `mod` 100000 == 0) $ putStr (progress i' <> "\r") >> hFlush stdout + if eof then pure (i', acc') else loop h i' acc' + progress i = "Processed: " <> show i <> " lines" diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 72599f193..3484fceb4 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -222,7 +222,7 @@ testExportImportStore ms = do ms' <- newMsgStore cfg readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- - importMessages False ms' testStoreMsgsFile Nothing + importMessages False ms' testStoreMsgsFile Nothing False printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 4 -- state file is backed up, 2 message files @@ -231,7 +231,7 @@ testExportImportStore ms = do stmStore <- newMsgStore testSMTStoreConfig readWriteQueueStore testStoreLogFile stmStore >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- - importMessages False stmStore testStoreMsgsFile2 Nothing + importMessages False stmStore testStoreMsgsFile2 Nothing False exportMessages False stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 5ce0eb7f6..3c732b7a5 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -163,7 +163,8 @@ cfgMS msType = smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 1}, -- seconds allowSMPProxy = False, serverClientConcurrency = 2, - information = Nothing + information = Nothing, + startOptions = StartOptions {maintenance = False, skipWarnings = False} } cfgV7 :: ServerConfig