From afc09a6ec4b73e27082d3a8eebe8249cb87884d6 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 26 Apr 2021 20:34:28 +0100 Subject: [PATCH] Store log (#108) * StoreLog (WIP) * add log records to map * revert Protocol change * revert Server change * fix parseLogRecord * optionally save/restore queues to/from store log * refactor * refactor delQueueAndMsgs * move store log to /var/opt/simplex * use ini file --- apps/dog-food/Main.hs | 1 + apps/smp-server/Main.hs | 116 +++++++++++++++-- package.yaml | 2 + src/Simplex/Messaging/Server.hs | 31 ++++- src/Simplex/Messaging/Server/Env/STM.hs | 22 +++- src/Simplex/Messaging/Server/QueueStore.hs | 2 +- src/Simplex/Messaging/Server/StoreLog.hs | 140 +++++++++++++++++++++ src/Simplex/Messaging/Transport.hs | 9 +- tests/SMPClient.hs | 2 + 9 files changed, 302 insertions(+), 23 deletions(-) create mode 100644 src/Simplex/Messaging/Server/StoreLog.hs diff --git a/apps/dog-food/Main.hs b/apps/dog-food/Main.hs index f4100e9a1..bf874285d 100644 --- a/apps/dog-food/Main.hs +++ b/apps/dog-food/Main.hs @@ -109,6 +109,7 @@ serializeChatResponse = \case Connected c -> [ttyContact c <> " connected"] Confirmation c -> [ttyContact c <> " ok"] ReceivedMessage c t -> prependFirst (ttyFromContact c) $ msgPlain t + -- TODO either add command to re-connect or update message below Disconnected c -> ["disconnected from " <> ttyContact c <> " - try \"/chat " <> bPlain (toBs c) <> "\""] YesYes -> ["you got it!"] ContactError e c -> case e of diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index ee7801166..3a4952ace 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -1,20 +1,28 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} module Main where -import Control.Monad (when) +import Control.Monad (unless, when) import qualified Crypto.Store.PKCS8 as S import qualified Data.ByteString.Char8 as B import Data.Char (toLower) +import Data.Functor (($>)) +import Data.Ini (lookupValue, readIniFile) +import qualified Data.Text as T import Data.X509 (PrivKey (PrivKeyRSA)) +import Options.Applicative import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Server (runSMPServer) import Simplex.Messaging.Server.Env.STM +import Simplex.Messaging.Server.StoreLog (StoreLog, openReadStoreLog) import System.Directory (createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) import System.FilePath (combine) -import System.IO (hFlush, stdout) +import System.IO (IOMode (..), hFlush, stdout) cfg :: ServerConfig cfg = @@ -23,6 +31,7 @@ cfg = tbqSize = 16, queueIdBytes = 12, msgIdBytes = 6, + storeLog = Nothing, -- key is loaded from the file server_key in /etc/opt/simplex directory serverPrivateKey = undefined } @@ -33,12 +42,58 @@ newKeySize = 2048 `div` 8 cfgDir :: FilePath cfgDir = "/etc/opt/simplex" +logDir :: FilePath +logDir = "/var/opt/simplex" + +defaultStoreLogFile :: FilePath +defaultStoreLogFile = combine logDir "smp-server-store.log" + main :: IO () main = do + opts <- getServerOpts + putStrLn "SMP Server (-h for help)" + ini <- readCreateIni opts + storeLog <- openStoreLog ini pk <- readCreateKey - B.putStrLn $ "SMP transport key hash: " <> publicKeyHash (C.publicKey pk) - putStrLn $ "Listening on port " <> tcpPort cfg - runSMPServer cfg {serverPrivateKey = pk} + B.putStrLn $ "transport key hash: " <> publicKeyHash (C.publicKey pk) + putStrLn $ "listening on port " <> tcpPort cfg + runSMPServer cfg {serverPrivateKey = pk, storeLog} + +data IniOpts = IniOpts + { enableStoreLog :: Bool, + storeLogFile :: FilePath + } + +readCreateIni :: ServerOpts -> IO IniOpts +readCreateIni ServerOpts {configFile} = do + createDirectoryIfMissing True cfgDir + doesFileExist configFile >>= (`unless` createIni) + readIni + where + readIni :: IO IniOpts + readIni = do + ini <- either exitError pure =<< readIniFile configFile + let enableStoreLog = (== Right "on") $ lookupValue "STORE_LOG" "enable" ini + storeLogFile = either (const defaultStoreLogFile) T.unpack $ lookupValue "STORE_LOG" "file" ini + pure IniOpts {enableStoreLog, storeLogFile} + exitError e = do + putStrLn $ "error reading config file " <> configFile <> ": " <> e + exitFailure + createIni :: IO () + createIni = do + confirm $ "Save default ini file to " <> configFile + writeFile + configFile + "[STORE_LOG]\n\ + \# The server uses STM memory to store SMP queues and messages,\n\ + \# that will be lost on restart (e.g., as with redis).\n\ + \# This option enables saving SMP queues to append only log,\n\ + \# and restoring them when the server is started.\n\ + \# Log is compacted on start (deleted queues are removed).\n\ + \# The messages in the queues are not logged.\n\ + \\n\ + \# enable: on\n\ + \# file: /var/opt/simplex/smp-server-store.log\n" readCreateKey :: IO C.FullPrivateKey readCreateKey = do @@ -49,16 +104,10 @@ readCreateKey = do where createKey :: FilePath -> IO C.FullPrivateKey createKey path = do - confirm + confirm "Generate new server key pair" (_, pk) <- C.generateKeyPair newKeySize S.writeKeyFile S.TraditionalFormat path [PrivKeyRSA $ C.rsaPrivateKey pk] pure pk - confirm :: IO () - confirm = do - putStr "Generate new server key pair (y/N): " - hFlush stdout - ok <- getLine - when (map toLower ok /= "y") exitFailure readKey :: FilePath -> IO C.FullPrivateKey readKey path = do S.readKeyFile path >>= \case @@ -70,5 +119,48 @@ readCreateKey = do errorExit :: String -> IO b errorExit e = putStrLn (e <> ": " <> path) >> exitFailure +confirm :: String -> IO () +confirm msg = do + putStr $ msg <> " (y/N): " + hFlush stdout + ok <- getLine + when (map toLower ok /= "y") exitFailure + publicKeyHash :: C.PublicKey -> B.ByteString publicKeyHash = C.serializeKeyHash . C.getKeyHash . C.binaryEncodePubKey + +openStoreLog :: IniOpts -> IO (Maybe (StoreLog 'ReadMode)) +openStoreLog IniOpts {enableStoreLog, storeLogFile = f} + | enableStoreLog = do + createDirectoryIfMissing True logDir + putStrLn ("store log: " <> f) + Just <$> openReadStoreLog f + | otherwise = putStrLn "store log disabled" $> Nothing + +newtype ServerOpts = ServerOpts + { configFile :: FilePath + } + +serverOpts :: Parser ServerOpts +serverOpts = + ServerOpts + <$> strOption + ( long "config" + <> short 'c' + <> metavar "INI_FILE" + <> help ("config file (" <> defaultIniFile <> ")") + <> value defaultIniFile + ) + where + defaultIniFile = combine cfgDir "smp-server.ini" + +getServerOpts :: IO ServerOpts +getServerOpts = execParser opts + where + opts = + info + (serverOpts <**> helper) + ( fullDesc + <> header "Simplex Messaging Protocol (SMP) Server" + <> progDesc "Start server with INI_FILE (created on first run)" + ) diff --git a/package.yaml b/package.yaml index 722d647dc..259d1bc4a 100644 --- a/package.yaml +++ b/package.yaml @@ -51,6 +51,8 @@ executables: main: Main.hs dependencies: - cryptostore == 0.2.* + - ini == 0.4.* + - optparse-applicative == 0.15.* - simplex-messaging ghc-options: - -threaded diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 3b6d6d8a3..1fb81bcff 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -31,6 +31,7 @@ import Simplex.Messaging.Server.MsgStore import Simplex.Messaging.Server.MsgStore.STM (MsgQueue) import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM (QueueStore) +import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Transport import Simplex.Messaging.Util import UnliftIO.Async @@ -147,8 +148,8 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = NEW rKey -> createQueue st rKey SUB -> subscribeQueue queueId ACK -> acknowledgeMsg - KEY sKey -> okResp <$> atomically (secureQueue st queueId sKey) - OFF -> okResp <$> atomically (suspendQueue st queueId) + KEY sKey -> secureQueue_ st sKey + OFF -> suspendQueue_ st DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RecipientPublicKey -> m Transmission @@ -158,7 +159,9 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = addSubscribe = addQueueRetry 3 >>= \case Left e -> return $ ERR e - Right (rId, sId) -> subscribeQueue rId $> IDS rId sId + Right (rId, sId) -> do + withLog (`logCreateById` rId) + subscribeQueue rId $> IDS rId sId addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId)) addQueueRetry 0 = return $ Left INTERNAL @@ -169,11 +172,27 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = Left e -> return $ Left e Right _ -> return $ Right ids + logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () + logCreateById s rId = + atomically (getQueue st SRecipient rId) >>= \case + Right q -> logCreateQueue s q + _ -> pure () + getIds :: m (RecipientId, SenderId) getIds = do n <- asks $ queueIdBytes . config liftM2 (,) (randomId n) (randomId n) + secureQueue_ :: QueueStore -> SenderPublicKey -> m Transmission + secureQueue_ st sKey = do + withLog $ \s -> logSecureQueue s queueId sKey + okResp <$> atomically (secureQueue st queueId sKey) + + suspendQueue_ :: QueueStore -> m Transmission + suspendQueue_ st = do + withLog (`logDeleteQueue` queueId) + okResp <$> atomically (suspendQueue st queueId) + subscribeQueue :: RecipientId -> m Transmission subscribeQueue rId = atomically (getSubscription rId) >>= deliverMessage tryPeekMsg rId @@ -260,12 +279,18 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = delQueueAndMsgs :: QueueStore -> m Transmission delQueueAndMsgs st = do + withLog (`logDeleteQueue` queueId) ms <- asks msgStore atomically $ deleteQueue st queueId >>= \case Left e -> return $ err e Right _ -> delMsgQueue ms queueId $> ok + withLog :: (StoreLog 'WriteMode -> IO a) -> m () + withLog action = do + env <- ask + liftIO . mapM_ action $ storeLog (env :: Env) + ok :: Transmission ok = mkResp corrId queueId OK diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 4371fc95f..9a61e0243 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -1,5 +1,7 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Server.Env.STM where @@ -13,7 +15,10 @@ import Numeric.Natural import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.STM +import Simplex.Messaging.Server.QueueStore (QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM +import Simplex.Messaging.Server.StoreLog +import System.IO (IOMode (..)) import UnliftIO.STM data ServerConfig = ServerConfig @@ -21,6 +26,7 @@ data ServerConfig = ServerConfig tbqSize :: Natural, queueIdBytes :: Int, msgIdBytes :: Int, + storeLog :: Maybe (StoreLog 'ReadMode), serverPrivateKey :: C.FullPrivateKey -- serverId :: ByteString } @@ -31,7 +37,8 @@ data Env = Env queueStore :: QueueStore, msgStore :: STMMsgStore, idsDrg :: TVar ChaChaDRG, - serverKeyPair :: C.FullKeyPair + serverKeyPair :: C.FullKeyPair, + storeLog :: Maybe (StoreLog 'WriteMode) } data Server = Server @@ -70,12 +77,21 @@ newSubscription = do delivered <- newEmptyTMVar return Sub {subThread = NoSub, delivered} -newEnv :: (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env +newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env newEnv config = do server <- atomically $ newServer (tbqSize config) queueStore <- atomically newQueueStore msgStore <- atomically newMsgStore idsDrg <- drgNew >>= newTVarIO + s' <- restoreQueues queueStore `mapM` storeLog (config :: ServerConfig) let pk = serverPrivateKey config serverKeyPair = (C.publicKey pk, pk) - return Env {config, server, queueStore, msgStore, idsDrg, serverKeyPair} + return Env {config, server, queueStore, msgStore, idsDrg, serverKeyPair, storeLog = s'} + where + restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode) + restoreQueues queueStore s = do + (queues, s') <- liftIO $ readWriteStoreLog s + atomically $ modifyTVar queueStore $ \d -> d {queues, senders = M.foldr' addSender M.empty queues} + pure s' + addSender :: QueueRec -> Map SenderId RecipientId -> Map SenderId RecipientId + addSender q = M.insert (senderId q) (recipientId q) diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index fd6783106..79eb2daee 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -15,7 +15,7 @@ data QueueRec = QueueRec status :: QueueStatus } -data QueueStatus = QueueActive | QueueOff +data QueueStatus = QueueActive | QueueOff deriving (Eq) class MonadQueueStore s m where addQueue :: s -> RecipientPublicKey -> (RecipientId, SenderId) -> m (Either ErrorType ()) diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs new file mode 100644 index 000000000..5841b23c5 --- /dev/null +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -0,0 +1,140 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} + +module Simplex.Messaging.Server.StoreLog + ( StoreLog, -- constructors are not exported + openWriteStoreLog, + openReadStoreLog, + closeStoreLog, + logCreateQueue, + logSecureQueue, + logDeleteQueue, + readWriteStoreLog, + ) +where + +import Control.Applicative (optional, (<|>)) +import Control.Monad (unless) +import Data.Attoparsec.ByteString.Char8 (Parser) +import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.Bifunctor (first, second) +import Data.ByteString.Base64 (encode) +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Either (partitionEithers) +import Data.Functor (($>)) +import Data.List (foldl') +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as M +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Parsers (base64P, parseAll) +import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.QueueStore (QueueRec (..), QueueStatus (..)) +import Simplex.Messaging.Transport (trimCR) +import System.Directory (doesFileExist) +import System.IO + +-- | opaque container for file handle with a type-safe IOMode +-- constructors are not exported, openWriteStoreLog and openReadStoreLog should be used instead +data StoreLog (a :: IOMode) where + ReadStoreLog :: FilePath -> Handle -> StoreLog 'ReadMode + WriteStoreLog :: FilePath -> Handle -> StoreLog 'WriteMode + +data StoreLogRecord + = CreateQueue QueueRec + | SecureQueue QueueId SenderPublicKey + | DeleteQueue QueueId + +storeLogRecordP :: Parser StoreLogRecord +storeLogRecordP = + "CREATE " *> createQueueP + <|> "SECURE " *> secureQueueP + <|> "DELETE " *> (DeleteQueue <$> base64P) + where + createQueueP = CreateQueue <$> queueRecP + secureQueueP = SecureQueue <$> base64P <* A.space <*> C.pubKeyP + queueRecP = do + recipientId <- "rid=" *> base64P <* A.space + senderId <- "sid=" *> base64P <* A.space + recipientKey <- "rk=" *> C.pubKeyP <* A.space + senderKey <- "sk=" *> optional C.pubKeyP + pure QueueRec {recipientId, senderId, recipientKey, senderKey, status = QueueActive} + +serializeStoreLogRecord :: StoreLogRecord -> ByteString +serializeStoreLogRecord = \case + CreateQueue q -> "CREATE " <> serializeQueue q + SecureQueue rId sKey -> "SECURE " <> encode rId <> " " <> C.serializePubKey sKey + DeleteQueue rId -> "DELETE " <> encode rId + where + serializeQueue QueueRec {recipientId, senderId, recipientKey, senderKey} = + B.unwords + [ "rid=" <> encode recipientId, + "sid=" <> encode senderId, + "rk=" <> C.serializePubKey recipientKey, + "sk=" <> maybe "" C.serializePubKey senderKey + ] + +openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) +openWriteStoreLog f = WriteStoreLog f <$> openFile f WriteMode + +openReadStoreLog :: FilePath -> IO (StoreLog 'ReadMode) +openReadStoreLog f = do + doesFileExist f >>= (`unless` writeFile f "") + ReadStoreLog f <$> openFile f ReadMode + +closeStoreLog :: StoreLog a -> IO () +closeStoreLog = \case + WriteStoreLog _ h -> hClose h + ReadStoreLog _ h -> hClose h + +writeStoreLogRecord :: StoreLog 'WriteMode -> StoreLogRecord -> IO () +writeStoreLogRecord (WriteStoreLog _ h) r = do + B.hPutStrLn h $ serializeStoreLogRecord r + hFlush h + +logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO () +logCreateQueue s = writeStoreLogRecord s . CreateQueue + +logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SenderPublicKey -> IO () +logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey + +logDeleteQueue :: StoreLog 'WriteMode -> QueueId -> IO () +logDeleteQueue s = writeStoreLogRecord s . DeleteQueue + +readWriteStoreLog :: StoreLog 'ReadMode -> IO (Map RecipientId QueueRec, StoreLog 'WriteMode) +readWriteStoreLog s@(ReadStoreLog f _) = do + qs <- readQueues s + closeStoreLog s + s' <- openWriteStoreLog f + writeQueues s' qs + pure (qs, s') + +writeQueues :: StoreLog 'WriteMode -> Map RecipientId QueueRec -> IO () +writeQueues s = mapM_ (writeStoreLogRecord s . CreateQueue) . M.filter active + where + active QueueRec {status} = status == QueueActive + +type LogParsingError = (String, ByteString) + +readQueues :: StoreLog 'ReadMode -> IO (Map RecipientId QueueRec) +readQueues (ReadStoreLog _ h) = LB.hGetContents h >>= returnResult . procStoreLog + where + procStoreLog :: LB.ByteString -> ([LogParsingError], Map RecipientId QueueRec) + procStoreLog = second (foldl' procLogRecord M.empty) . partitionEithers . map parseLogRecord . LB.lines + returnResult :: ([LogParsingError], Map RecipientId QueueRec) -> IO (Map RecipientId QueueRec) + returnResult (errs, res) = mapM_ printError errs $> res + parseLogRecord :: LB.ByteString -> Either LogParsingError StoreLogRecord + parseLogRecord = (\s -> first (,s) $ parseAll storeLogRecordP s) . trimCR . LB.toStrict + procLogRecord :: Map RecipientId QueueRec -> StoreLogRecord -> Map RecipientId QueueRec + procLogRecord m = \case + CreateQueue q -> M.insert (recipientId q) q m + SecureQueue qId sKey -> M.adjust (\q -> q {senderKey = Just sKey}) qId m + DeleteQueue qId -> M.delete qId m + printError :: LogParsingError -> IO () + printError (e, s) = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 5411e3da4..f05731aa4 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -118,10 +118,11 @@ putLn :: Handle -> ByteString -> IO () putLn h = B.hPut h . (<> "\r\n") getLn :: Handle -> IO ByteString -getLn h = trim_cr <$> B.hGetLine h - where - trim_cr "" = "" - trim_cr s = if B.last s == '\r' then B.init s else s +getLn h = trimCR <$> B.hGetLine h + +trimCR :: ByteString -> ByteString +trimCR "" = "" +trimCR s = if B.last s == '\r' then B.init s else s -- * Encrypted transport diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 360771df8..3a92400fd 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} @@ -50,6 +51,7 @@ cfg = tbqSize = 1, queueIdBytes = 12, msgIdBytes = 6, + storeLog = Nothing, serverPrivateKey = -- full RSA private key (only for tests) "MIIFIwIBAAKCAQEArZyrri/NAwt5buvYjwu+B/MQeJUszDBpRgVqNddlI9kNwDXu\