mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 14:06:30 +00:00
Store log (#108)
* StoreLog (WIP) * add log records to map * revert Protocol change * revert Server change * fix parseLogRecord * optionally save/restore queues to/from store log * refactor * refactor delQueueAndMsgs * move store log to /var/opt/simplex * use ini file
This commit is contained in:
committed by
GitHub
parent
816703527a
commit
afc09a6ec4
@@ -31,6 +31,7 @@ import Simplex.Messaging.Server.MsgStore
|
||||
import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util
|
||||
import UnliftIO.Async
|
||||
@@ -147,8 +148,8 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
NEW rKey -> createQueue st rKey
|
||||
SUB -> subscribeQueue queueId
|
||||
ACK -> acknowledgeMsg
|
||||
KEY sKey -> okResp <$> atomically (secureQueue st queueId sKey)
|
||||
OFF -> okResp <$> atomically (suspendQueue st queueId)
|
||||
KEY sKey -> secureQueue_ st sKey
|
||||
OFF -> suspendQueue_ st
|
||||
DEL -> delQueueAndMsgs st
|
||||
where
|
||||
createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
|
||||
@@ -158,7 +159,9 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
addSubscribe =
|
||||
addQueueRetry 3 >>= \case
|
||||
Left e -> return $ ERR e
|
||||
Right (rId, sId) -> subscribeQueue rId $> IDS rId sId
|
||||
Right (rId, sId) -> do
|
||||
withLog (`logCreateById` rId)
|
||||
subscribeQueue rId $> IDS rId sId
|
||||
|
||||
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
|
||||
addQueueRetry 0 = return $ Left INTERNAL
|
||||
@@ -169,11 +172,27 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
Left e -> return $ Left e
|
||||
Right _ -> return $ Right ids
|
||||
|
||||
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
logCreateById s rId =
|
||||
atomically (getQueue st SRecipient rId) >>= \case
|
||||
Right q -> logCreateQueue s q
|
||||
_ -> pure ()
|
||||
|
||||
getIds :: m (RecipientId, SenderId)
|
||||
getIds = do
|
||||
n <- asks $ queueIdBytes . config
|
||||
liftM2 (,) (randomId n) (randomId n)
|
||||
|
||||
secureQueue_ :: QueueStore -> SenderPublicKey -> m Transmission
|
||||
secureQueue_ st sKey = do
|
||||
withLog $ \s -> logSecureQueue s queueId sKey
|
||||
okResp <$> atomically (secureQueue st queueId sKey)
|
||||
|
||||
suspendQueue_ :: QueueStore -> m Transmission
|
||||
suspendQueue_ st = do
|
||||
withLog (`logDeleteQueue` queueId)
|
||||
okResp <$> atomically (suspendQueue st queueId)
|
||||
|
||||
subscribeQueue :: RecipientId -> m Transmission
|
||||
subscribeQueue rId =
|
||||
atomically (getSubscription rId) >>= deliverMessage tryPeekMsg rId
|
||||
@@ -260,12 +279,18 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
|
||||
delQueueAndMsgs :: QueueStore -> m Transmission
|
||||
delQueueAndMsgs st = do
|
||||
withLog (`logDeleteQueue` queueId)
|
||||
ms <- asks msgStore
|
||||
atomically $
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> return $ err e
|
||||
Right _ -> delMsgQueue ms queueId $> ok
|
||||
|
||||
withLog :: (StoreLog 'WriteMode -> IO a) -> m ()
|
||||
withLog action = do
|
||||
env <- ask
|
||||
liftIO . mapM_ action $ storeLog (env :: Env)
|
||||
|
||||
ok :: Transmission
|
||||
ok = mkResp corrId queueId OK
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Server.Env.STM where
|
||||
|
||||
@@ -13,7 +15,10 @@ import Numeric.Natural
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.MsgStore.STM
|
||||
import Simplex.Messaging.Server.QueueStore (QueueRec (..))
|
||||
import Simplex.Messaging.Server.QueueStore.STM
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import System.IO (IOMode (..))
|
||||
import UnliftIO.STM
|
||||
|
||||
data ServerConfig = ServerConfig
|
||||
@@ -21,6 +26,7 @@ data ServerConfig = ServerConfig
|
||||
tbqSize :: Natural,
|
||||
queueIdBytes :: Int,
|
||||
msgIdBytes :: Int,
|
||||
storeLog :: Maybe (StoreLog 'ReadMode),
|
||||
serverPrivateKey :: C.FullPrivateKey
|
||||
-- serverId :: ByteString
|
||||
}
|
||||
@@ -31,7 +37,8 @@ data Env = Env
|
||||
queueStore :: QueueStore,
|
||||
msgStore :: STMMsgStore,
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
serverKeyPair :: C.FullKeyPair
|
||||
serverKeyPair :: C.FullKeyPair,
|
||||
storeLog :: Maybe (StoreLog 'WriteMode)
|
||||
}
|
||||
|
||||
data Server = Server
|
||||
@@ -70,12 +77,21 @@ newSubscription = do
|
||||
delivered <- newEmptyTMVar
|
||||
return Sub {subThread = NoSub, delivered}
|
||||
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
|
||||
newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
|
||||
newEnv config = do
|
||||
server <- atomically $ newServer (tbqSize config)
|
||||
queueStore <- atomically newQueueStore
|
||||
msgStore <- atomically newMsgStore
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
s' <- restoreQueues queueStore `mapM` storeLog (config :: ServerConfig)
|
||||
let pk = serverPrivateKey config
|
||||
serverKeyPair = (C.publicKey pk, pk)
|
||||
return Env {config, server, queueStore, msgStore, idsDrg, serverKeyPair}
|
||||
return Env {config, server, queueStore, msgStore, idsDrg, serverKeyPair, storeLog = s'}
|
||||
where
|
||||
restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
|
||||
restoreQueues queueStore s = do
|
||||
(queues, s') <- liftIO $ readWriteStoreLog s
|
||||
atomically $ modifyTVar queueStore $ \d -> d {queues, senders = M.foldr' addSender M.empty queues}
|
||||
pure s'
|
||||
addSender :: QueueRec -> Map SenderId RecipientId -> Map SenderId RecipientId
|
||||
addSender q = M.insert (senderId q) (recipientId q)
|
||||
|
||||
@@ -15,7 +15,7 @@ data QueueRec = QueueRec
|
||||
status :: QueueStatus
|
||||
}
|
||||
|
||||
data QueueStatus = QueueActive | QueueOff
|
||||
data QueueStatus = QueueActive | QueueOff deriving (Eq)
|
||||
|
||||
class MonadQueueStore s m where
|
||||
addQueue :: s -> RecipientPublicKey -> (RecipientId, SenderId) -> m (Either ErrorType ())
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Server.StoreLog
|
||||
( StoreLog, -- constructors are not exported
|
||||
openWriteStoreLog,
|
||||
openReadStoreLog,
|
||||
closeStoreLog,
|
||||
logCreateQueue,
|
||||
logSecureQueue,
|
||||
logDeleteQueue,
|
||||
readWriteStoreLog,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Monad (unless)
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first, second)
|
||||
import Data.ByteString.Base64 (encode)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.List (foldl')
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Parsers (base64P, parseAll)
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.QueueStore (QueueRec (..), QueueStatus (..))
|
||||
import Simplex.Messaging.Transport (trimCR)
|
||||
import System.Directory (doesFileExist)
|
||||
import System.IO
|
||||
|
||||
-- | opaque container for file handle with a type-safe IOMode
|
||||
-- constructors are not exported, openWriteStoreLog and openReadStoreLog should be used instead
|
||||
data StoreLog (a :: IOMode) where
|
||||
ReadStoreLog :: FilePath -> Handle -> StoreLog 'ReadMode
|
||||
WriteStoreLog :: FilePath -> Handle -> StoreLog 'WriteMode
|
||||
|
||||
data StoreLogRecord
|
||||
= CreateQueue QueueRec
|
||||
| SecureQueue QueueId SenderPublicKey
|
||||
| DeleteQueue QueueId
|
||||
|
||||
storeLogRecordP :: Parser StoreLogRecord
|
||||
storeLogRecordP =
|
||||
"CREATE " *> createQueueP
|
||||
<|> "SECURE " *> secureQueueP
|
||||
<|> "DELETE " *> (DeleteQueue <$> base64P)
|
||||
where
|
||||
createQueueP = CreateQueue <$> queueRecP
|
||||
secureQueueP = SecureQueue <$> base64P <* A.space <*> C.pubKeyP
|
||||
queueRecP = do
|
||||
recipientId <- "rid=" *> base64P <* A.space
|
||||
senderId <- "sid=" *> base64P <* A.space
|
||||
recipientKey <- "rk=" *> C.pubKeyP <* A.space
|
||||
senderKey <- "sk=" *> optional C.pubKeyP
|
||||
pure QueueRec {recipientId, senderId, recipientKey, senderKey, status = QueueActive}
|
||||
|
||||
serializeStoreLogRecord :: StoreLogRecord -> ByteString
|
||||
serializeStoreLogRecord = \case
|
||||
CreateQueue q -> "CREATE " <> serializeQueue q
|
||||
SecureQueue rId sKey -> "SECURE " <> encode rId <> " " <> C.serializePubKey sKey
|
||||
DeleteQueue rId -> "DELETE " <> encode rId
|
||||
where
|
||||
serializeQueue QueueRec {recipientId, senderId, recipientKey, senderKey} =
|
||||
B.unwords
|
||||
[ "rid=" <> encode recipientId,
|
||||
"sid=" <> encode senderId,
|
||||
"rk=" <> C.serializePubKey recipientKey,
|
||||
"sk=" <> maybe "" C.serializePubKey senderKey
|
||||
]
|
||||
|
||||
openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode)
|
||||
openWriteStoreLog f = WriteStoreLog f <$> openFile f WriteMode
|
||||
|
||||
openReadStoreLog :: FilePath -> IO (StoreLog 'ReadMode)
|
||||
openReadStoreLog f = do
|
||||
doesFileExist f >>= (`unless` writeFile f "")
|
||||
ReadStoreLog f <$> openFile f ReadMode
|
||||
|
||||
closeStoreLog :: StoreLog a -> IO ()
|
||||
closeStoreLog = \case
|
||||
WriteStoreLog _ h -> hClose h
|
||||
ReadStoreLog _ h -> hClose h
|
||||
|
||||
writeStoreLogRecord :: StoreLog 'WriteMode -> StoreLogRecord -> IO ()
|
||||
writeStoreLogRecord (WriteStoreLog _ h) r = do
|
||||
B.hPutStrLn h $ serializeStoreLogRecord r
|
||||
hFlush h
|
||||
|
||||
logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO ()
|
||||
logCreateQueue s = writeStoreLogRecord s . CreateQueue
|
||||
|
||||
logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SenderPublicKey -> IO ()
|
||||
logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey
|
||||
|
||||
logDeleteQueue :: StoreLog 'WriteMode -> QueueId -> IO ()
|
||||
logDeleteQueue s = writeStoreLogRecord s . DeleteQueue
|
||||
|
||||
readWriteStoreLog :: StoreLog 'ReadMode -> IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
|
||||
readWriteStoreLog s@(ReadStoreLog f _) = do
|
||||
qs <- readQueues s
|
||||
closeStoreLog s
|
||||
s' <- openWriteStoreLog f
|
||||
writeQueues s' qs
|
||||
pure (qs, s')
|
||||
|
||||
writeQueues :: StoreLog 'WriteMode -> Map RecipientId QueueRec -> IO ()
|
||||
writeQueues s = mapM_ (writeStoreLogRecord s . CreateQueue) . M.filter active
|
||||
where
|
||||
active QueueRec {status} = status == QueueActive
|
||||
|
||||
type LogParsingError = (String, ByteString)
|
||||
|
||||
readQueues :: StoreLog 'ReadMode -> IO (Map RecipientId QueueRec)
|
||||
readQueues (ReadStoreLog _ h) = LB.hGetContents h >>= returnResult . procStoreLog
|
||||
where
|
||||
procStoreLog :: LB.ByteString -> ([LogParsingError], Map RecipientId QueueRec)
|
||||
procStoreLog = second (foldl' procLogRecord M.empty) . partitionEithers . map parseLogRecord . LB.lines
|
||||
returnResult :: ([LogParsingError], Map RecipientId QueueRec) -> IO (Map RecipientId QueueRec)
|
||||
returnResult (errs, res) = mapM_ printError errs $> res
|
||||
parseLogRecord :: LB.ByteString -> Either LogParsingError StoreLogRecord
|
||||
parseLogRecord = (\s -> first (,s) $ parseAll storeLogRecordP s) . trimCR . LB.toStrict
|
||||
procLogRecord :: Map RecipientId QueueRec -> StoreLogRecord -> Map RecipientId QueueRec
|
||||
procLogRecord m = \case
|
||||
CreateQueue q -> M.insert (recipientId q) q m
|
||||
SecureQueue qId sKey -> M.adjust (\q -> q {senderKey = Just sKey}) qId m
|
||||
DeleteQueue qId -> M.delete qId m
|
||||
printError :: LogParsingError -> IO ()
|
||||
printError (e, s) = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
|
||||
@@ -118,10 +118,11 @@ putLn :: Handle -> ByteString -> IO ()
|
||||
putLn h = B.hPut h . (<> "\r\n")
|
||||
|
||||
getLn :: Handle -> IO ByteString
|
||||
getLn h = trim_cr <$> B.hGetLine h
|
||||
where
|
||||
trim_cr "" = ""
|
||||
trim_cr s = if B.last s == '\r' then B.init s else s
|
||||
getLn h = trimCR <$> B.hGetLine h
|
||||
|
||||
trimCR :: ByteString -> ByteString
|
||||
trimCR "" = ""
|
||||
trimCR s = if B.last s == '\r' then B.init s else s
|
||||
|
||||
-- * Encrypted transport
|
||||
|
||||
|
||||
Reference in New Issue
Block a user