Files
simplexmq/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs
2026-03-03 21:16:46 +00:00

82 lines
4.2 KiB
Haskell

{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Server.StoreLog.ReadWrite
( writeQueueStore,
readQueueStore,
) where
import Control.Concurrent.STM
import Control.Logger.Simple
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import qualified Data.ByteString.Char8 as B
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ASubscriberParty (..), ErrorType, RecipientId, SParty (..))
import Simplex.Messaging.Server.QueueStore (QueueRec, ServiceRec (..))
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..), STMService (..))
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (tshow)
import System.IO
writeQueueStore :: forall q. StoreQueueClass q => StoreLog 'WriteMode -> STMQueueStore q -> IO ()
writeQueueStore s st = do
readTVarIO (services st) >>= mapM_ (logNewService s . serviceRec)
withLoadedQueues st $ writeQueue
where
writeQueue :: q -> IO ()
writeQueue q = readTVarIO (queueRec q) >>= mapM_ (logCreateQueue s $ recipientId q)
readQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
readQueueStore tty mkQ f st = readLogLines tty f $ \_ -> processLine
where
processLine :: B.ByteString -> IO ()
processLine s = either printError procLogRecord (strDecode s)
where
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue rId qr -> addQueue_ st mkQ rId qr >>= qError rId "CreateQueue"
CreateLink rId lnkId d -> withQueue rId "CreateLink" $ \q -> addQueueLinkData st q lnkId d
DeleteLink rId -> withQueue rId "DeleteLink" $ \q -> deleteQueueLinkData st q
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
UpdateKeys rId rKeys -> withQueue rId "UpdateKeys" $ \q -> updateKeys st q rKeys
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info
UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteStoreQueue st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
NewService sr@ServiceRec {serviceId} -> getCreateService @q st sr >>= \case
Right serviceId'
| serviceId == serviceId' -> pure ()
| otherwise -> logError $ errPfx <> "created with the wrong ID " <> decodeLatin1 (strEncode serviceId')
Left e -> logError $ errPfx <> tshow e
where
errPfx = "STORE: getCreateService, stored service " <> decodeLatin1 (strEncode serviceId) <> ", "
QueueService qId (ASP party) serviceId -> withQueue qId "QueueService" $ \q -> setQueueService st q party serviceId
printError :: String -> IO ()
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
withQueue :: forall a. RecipientId -> T.Text -> (q -> IO (Either ErrorType a)) -> IO ()
withQueue qId op a = runExceptT go >>= qError qId op
where
go = do
q <- ExceptT $ getQueue_ st (\_ -> mkQ) SRecipient qId
liftIO (readTVarIO $ queueRec q) >>= \case
Nothing -> logWarn $ logPfx qId op <> "already deleted"
Just _ -> void $ ExceptT $ a q
qError qId op = \case
Left e -> logError $ logPfx qId op <> tshow e
Right _ -> pure ()
logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", "