partial implementation

This commit is contained in:
Evgeny Poberezkin
2024-12-12 13:45:18 +00:00
parent 1bdf8bc3e3
commit 8b656d7dc5
4 changed files with 87 additions and 26 deletions

View File

@@ -51,7 +51,7 @@ import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isNothing)
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
@@ -91,11 +91,11 @@ data QueueStore (s :: MSType) where
} -> QueueStore 'MSMemory
-- maps store cached queues
-- Nothing in map indicates that the queue doesn't exist
-- JQStore ::
-- { queues_ :: TMap RecipientId (Maybe (JournalQueue 'MSJournal)),
-- senders_ :: TMap SenderId (Maybe RecipientId),
-- notifiers_ :: TMap NotifierId (Maybe RecipientId)
-- } -> QueueStore 'MSJournal
JQStore ::
{ queues_ :: TMap RecipientId (Maybe (JournalQueue 'MSJournal)),
senders_ :: TMap SenderId (Maybe RecipientId),
notifiers_ :: TMap NotifierId (Maybe RecipientId)
} -> QueueStore 'MSJournal
data JournalStoreConfig s = JournalStoreConfig
{ storePath :: FilePath,
@@ -224,6 +224,9 @@ instance JournalTypeI t => StrEncoding (JournalState t) where
queueLogFileName :: String
queueLogFileName = "queue_state"
queueRecFileName :: String
queueRecFileName = "queue_rec"
msgLogFileName :: String
msgLogFileName = "messages"
@@ -239,12 +242,16 @@ instance STMQueueStore (JournalMsgStore 'MSMemory) where
notifiers' = notifiers . queueStore
storeLog' = storeLog . queueStore
mkQueue st qr = do
lock <- getMapLock (queueLocks st) $ recipientId qr
q <- newTVar $ Just qr
mq <- newTVar Nothing
activeAt <- newTVar 0
isEmpty <- newTVar Nothing
pure $ JournalQueue lock q mq activeAt isEmpty
lock <- atomically $ getMapLock (queueLocks st) $ recipientId qr
makeQueue lock qr
makeQueue :: Lock -> QueueRec -> IO (JournalQueue s)
makeQueue lock qr = do
q <- newTVarIO $ Just qr
mq <- newTVarIO Nothing
activeAt <- newTVarIO 0
isEmpty <- newTVarIO Nothing
pure $ JournalQueue lock q mq activeAt isEmpty
instance MsgStoreClass (JournalMsgStore s) where
type StoreMonad (JournalMsgStore s) = StoreIO s
@@ -264,20 +271,27 @@ instance MsgStoreClass (JournalMsgStore s) where
storeLog <- newTVarIO Nothing
let queueStore = MQStore {queues, senders, notifiers, storeLog}
pure JournalMsgStore {config, random, queueLocks, queueStore}
SMSJournal -> undefined
SMSJournal -> do
queues_ <- TM.emptyIO
senders_ <- TM.emptyIO
notifiers_ <- TM.emptyIO
let queueStore = JQStore {queues_, senders_, notifiers_}
pure JournalMsgStore {config, random, queueLocks, queueStore}
setStoreLog :: JournalMsgStore s -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = case queueStore st of
MQStore {storeLog} -> atomically $ writeTVar storeLog (Just sl)
JQStore {} -> undefined
closeMsgStore st = case queueStore st of
MQStore {queues, storeLog} -> do
readTVarIO storeLog >>= mapM_ closeStoreLog
readTVarIO queues >>= mapM_ closeMsgQueue
JQStore {} -> undefined
activeMsgQueues st = case queueStore st of
MQStore {queues} -> queues
{-# INLINE activeMsgQueues #-}
JQStore {} -> undefined
-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
-- It is used to export storage to a single file and also to expire messages and validate all queues when server is started.
@@ -345,38 +359,80 @@ instance MsgStoreClass (JournalMsgStore s) where
queueCount <- M.size <$> readTVarIO queues
notifierCount <- M.size <$> readTVarIO notifiers
pure QueueCounts {queueCount, notifierCount}
JQStore {} -> undefined
addQueue :: JournalMsgStore s -> QueueRec -> IO (Either ErrorType (JournalQueue s))
addQueue st qr = case queueStore st of
addQueue st@JournalMsgStore {queueLocks = ls} qr@QueueRec {recipientId = rId, senderId = sId, notifier} = case queueStore st of
MQStore {} -> addQueue' st qr
JQStore {queues_, senders_, notifiers_} -> do
lock <- atomically $ getMapLock ls $ recipientId qr
tryStore "addQueue" rId $
withLock' lock "addQueue" $ withLockMap ls sId "addQueueS" $ withNotifierLock $
ifM hasAnyId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do
q <- makeQueue lock qr
atomically $ TM.insert rId (Just q) queues_
atomically $ TM.insert sId (Just rId) senders_
storeQueue queuePath qr
saveQueueRef sId rId
forM_ notifier $ \NtfCreds {notifierId} -> do
atomically $ TM.insert notifierId (Just rId) notifiers_
saveQueueRef notifierId rId
pure $ Right q
where
dir = msgQueueDirectory st rId
queuePath = queueRecPath dir $ B.unpack (strEncode rId)
storeQueue _ _ = pure () -- TODO
saveQueueRef _ _ = pure () -- TODO
hasAnyId = foldM (fmap . (||)) False [hasId rId queues_, hasId sId senders_, withNotifier (`hasId` notifiers_), hasDir rId, hasDir sId, withNotifier hasDir]
withNotifier p = maybe (pure False) (\NtfCreds {notifierId} -> p notifierId) notifier
withNotifierLock a = maybe a (\NtfCreds {notifierId} -> withLockMap ls notifierId "addQueueN" a) notifier
hasId :: EntityId -> TMap EntityId (Maybe a) -> IO Bool
hasId qId m = maybe False isJust <$> atomically (TM.lookup qId m)
hasDir qId = doesDirectoryExist $ msgQueueDirectory st qId
getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s))
getQueue st party qId = case queueStore st of
MQStore {} -> getQueue' st party qId
JQStore {queues_, senders_, notifiers_} -> maybe (Left AUTH) Right <$> case party of
SRecipient -> getQueue_ qId
SSender -> getQueueRef senders_ $>>= getQueue_
SNotifier -> getQueueRef notifiers_ $>>= getQueue_
where
getQueue_ rId = TM.lookupIO rId queues_ >>= maybe (loadQueue rId) pure
getQueueRef :: TMap EntityId (Maybe RecipientId) -> IO (Maybe RecipientId)
getQueueRef m = TM.lookupIO qId m >>= maybe (loadQueueRef m) pure
loadQueue _rId = undefined -- TODO load, cache, return queue
loadQueueRef _m = undefined -- TODO load, cache, return queue ID
getQueueRec :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s, QueueRec))
getQueueRec st party qId = case queueStore st of
MQStore {} -> getQueueRec' st party qId
JQStore {} -> undefined
secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey = case queueStore st of
MQStore {} -> secureQueue' st sq sKey
JQStore {} -> undefined
addQueueNotifier :: JournalMsgStore s -> JournalQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds = case queueStore st of
MQStore {} -> addQueueNotifier' st sq ntfCreds
JQStore {} -> undefined
deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq = case queueStore st of
MQStore {} -> deleteQueueNotifier' st sq
JQStore {} -> undefined
suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ())
suspendQueue st sq = case queueStore st of
MQStore {} -> suspendQueue' st sq
JQStore {} -> undefined
updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = case queueStore st of
MQStore {} -> updateQueueTime' st sq t
JQStore {} -> undefined
getMsgQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> StoreIO s (JournalMsgQueue s)
getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} =
@@ -386,6 +442,8 @@ instance MsgStoreClass (JournalMsgStore s) where
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
queue = JMQueue {queueDirectory = dir, statePath}
-- TODO this should account for the possibility that the folder exists,
-- but queue files do not
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
atomically $ writeTVar msgQueue_ $ Just q
pure q
@@ -547,10 +605,10 @@ updateActiveAt :: JournalQueue s -> IO ()
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' op rId = tryStore op rId . fmap Right
tryStore' op rId = ExceptT . tryStore op rId . fmap Right
tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure
tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> IO (Either ErrorType a)
tryStore op rId a = E.mask_ $ E.try a >>= either storeErr pure
where
storeErr :: E.SomeException -> IO (Either ErrorType a)
storeErr e =
@@ -558,7 +616,7 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure
in logError ("STORE: " <> T.pack e') $> Left (STORE e')
isolateQueueId :: String -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
isolateQueueId op ms rId = ExceptT . tryStore op rId . withLockMap (queueLocks ms) rId op
openMsgQueue :: JournalMsgStore s -> JMQueue -> IO (JournalMsgQueue s)
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do
@@ -620,6 +678,9 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP
let (seg, s') = B.splitAt 2 s
in seg : splitSegments (n - 1) s'
queueRecPath :: FilePath -> String -> FilePath
queueRecPath dir queueId = dir </> (queueRecFileName <> "." <> queueId <> logFileExt)
msgQueueStatePath :: FilePath -> String -> FilePath
msgQueueStatePath dir queueId = dir </> (queueLogFileName <> "." <> queueId <> logFileExt)
@@ -788,6 +849,7 @@ deleteQueue_ :: forall s. JournalMsgStore s -> RecipientId -> JournalQueue s ->
deleteQueue_ ms rId q =
runExceptT $ isolateQueueId "deleteQueue_" ms rId $ case queueStore ms of
MQStore {} -> deleteQueue' ms rId q >>= mapM remove
JQStore {} -> undefined
where
remove :: (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove r@(_, mq_) = do

View File

@@ -63,7 +63,7 @@ instance STMQueueStore STMMsgStore where
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
mkQueue _ qr = STMQueue <$> newTVarIO (Just qr) <*> newTVarIO Nothing
instance MsgStoreClass STMMsgStore where
type StoreMonad STMMsgStore = STM

View File

@@ -34,7 +34,7 @@ class MsgStoreClass s => STMQueueStore s where
senders' :: s -> TMap SenderId RecipientId
notifiers' :: s -> TMap NotifierId RecipientId
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
mkQueue :: s -> QueueRec -> STM (StoreQueue s)
mkQueue :: s -> QueueRec -> IO (StoreQueue s)
class Monad (StoreMonad s) => MsgStoreClass s where
type StoreMonad s = (m :: Type -> Type) | m -> s

View File

@@ -44,17 +44,16 @@ import System.IO
import UnliftIO.STM
addQueue' :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier}=
atomically add
addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier} =
(mkQueue st qr >>= atomically . add)
$>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr)
where
add = ifM hasId (pure $ Left DUPLICATE_) $ do
q <- mkQueue st qr
add q = ifM hasId (pure $ Left DUPLICATE_) $ do
TM.insert rId q $ queues' st
TM.insert sId rId $ senders' st
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st
pure $ Right q
hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasId = foldM (fmap . (||)) False [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier
getQueue' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))