mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 05:25:07 +00:00
smp server: combine all queue IDs to one map
This commit is contained in:
@@ -423,9 +423,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
|
||||
<- asks serverStats
|
||||
AMS _ st <- asks msgStore
|
||||
let queues = activeMsgQueues st
|
||||
notifiers = notifiers' st
|
||||
interval = 1000000 * logInterval
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
hSetBuffering h LineBuffering
|
||||
@@ -478,8 +476,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
pMsgFwdsOwn' <- getResetProxyStatsData pMsgFwdsOwn
|
||||
pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0
|
||||
qCount' <- readIORef qCount
|
||||
qCount'' <- M.size <$> readTVarIO queues
|
||||
notifierCount' <- M.size <$> readTVarIO notifiers
|
||||
qCount'' <- readTVarIO $ queueCount' st
|
||||
ntfrCount' <- readTVarIO $ notifierCount' st
|
||||
msgCount' <- readIORef msgCount
|
||||
ntfCount' <- readIORef ntfCount
|
||||
hPutStrLn h $
|
||||
@@ -538,7 +536,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
show ntfSub',
|
||||
show ntfSubAuth',
|
||||
show ntfSubDuplicate',
|
||||
show notifierCount',
|
||||
show ntfrCount',
|
||||
show qDeletedAllB',
|
||||
show qSubAllB',
|
||||
show qSubEnd',
|
||||
@@ -625,9 +623,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
CPStats -> withUserRole $ do
|
||||
ss <- unliftIO u $ asks serverStats
|
||||
AMS _ st <- unliftIO u $ asks msgStore
|
||||
let queues = activeMsgQueues st
|
||||
notifiers = notifiers' st
|
||||
getStat :: (ServerStats -> IORef a) -> IO a
|
||||
let getStat :: (ServerStats -> IORef a) -> IO a
|
||||
getStat var = readIORef (var ss)
|
||||
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
|
||||
putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v
|
||||
@@ -664,9 +660,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
putStat "msgNtfsB" msgNtfsB
|
||||
putStat "msgNtfExpired" msgNtfExpired
|
||||
putStat "qCount" qCount
|
||||
qCount2 <- M.size <$> readTVarIO queues
|
||||
qCount2 <- readTVarIO $ queueCount' st
|
||||
hPutStrLn h $ "qCount 2: " <> show qCount2
|
||||
notifierCount <- M.size <$> readTVarIO notifiers
|
||||
notifierCount <- readTVarIO $ notifierCount' st
|
||||
hPutStrLn h $ "notifiers: " <> show notifierCount
|
||||
putStat "msgCount" msgCount
|
||||
putStat "ntfCount" ntfCount
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore where
|
||||
|
||||
@@ -11,11 +11,11 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.Journal
|
||||
( JournalMsgStore (queues, senders, notifiers, random),
|
||||
( JournalMsgStore (queues, random),
|
||||
JournalQueue,
|
||||
JournalMsgQueue (queue, state),
|
||||
JMQueue (queueDirectory, statePath),
|
||||
@@ -62,9 +62,9 @@ import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
import Simplex.Messaging.Server.QueueStore.STM
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>))
|
||||
import System.Directory
|
||||
import System.Exit
|
||||
@@ -77,9 +77,11 @@ data JournalMsgStore = JournalMsgStore
|
||||
{ config :: JournalStoreConfig,
|
||||
random :: TVar StdGen,
|
||||
queueLocks :: TMap RecipientId Lock,
|
||||
queues :: TMap RecipientId JournalQueue,
|
||||
senders :: TMap SenderId RecipientId,
|
||||
notifiers :: TMap NotifierId RecipientId,
|
||||
queues :: TMap RecipientId (QueueReference JournalQueue),
|
||||
queueCount :: TVar Int,
|
||||
notifierCount :: TVar Int,
|
||||
-- senders :: TMap SenderId RecipientId,
|
||||
-- notifiers :: TMap NotifierId RecipientId,
|
||||
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
|
||||
}
|
||||
|
||||
@@ -220,8 +222,11 @@ newtype StoreIO a = StoreIO {unStoreIO :: IO a}
|
||||
|
||||
instance STMQueueStore JournalMsgStore where
|
||||
queues' = queues
|
||||
senders' = senders
|
||||
notifiers' = notifiers
|
||||
queueCount' = queueCount
|
||||
notifierCount' = notifierCount
|
||||
|
||||
-- senders' = senders
|
||||
-- notifiers' = notifiers
|
||||
storeLog' = storeLog
|
||||
mkQueue st qr = do
|
||||
lock <- getMapLock (queueLocks st) $ recipientId qr
|
||||
@@ -243,17 +248,19 @@ instance MsgStoreClass JournalMsgStore where
|
||||
random <- newTVarIO =<< newStdGen
|
||||
queueLocks <- TM.emptyIO
|
||||
queues <- TM.emptyIO
|
||||
senders <- TM.emptyIO
|
||||
notifiers <- TM.emptyIO
|
||||
queueCount <- newTVarIO 0
|
||||
notifierCount <- newTVarIO 0
|
||||
-- senders <- TM.emptyIO
|
||||
-- notifiers <- TM.emptyIO
|
||||
storeLog <- newTVarIO Nothing
|
||||
pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog}
|
||||
pure JournalMsgStore {config, random, queueLocks, queues, queueCount, notifierCount, storeLog}
|
||||
|
||||
setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO ()
|
||||
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
|
||||
|
||||
closeMsgStore st = do
|
||||
readTVarIO (storeLog st) >>= mapM_ closeStoreLog
|
||||
readTVarIO (queues st) >>= mapM_ closeMsgQueue
|
||||
readTVarIO (queues st) >>= mapM_ (\case QRRecipient q -> closeMsgQueue q; _ -> pure ())
|
||||
|
||||
activeMsgQueues = queues
|
||||
{-# INLINE activeMsgQueues #-}
|
||||
@@ -309,8 +316,9 @@ instance MsgStoreClass JournalMsgStore where
|
||||
logQueueState q =
|
||||
StoreIO . void $
|
||||
readTVarIO (msgQueue_ q)
|
||||
$>>= \mq -> readTVarIO (handles mq)
|
||||
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
|
||||
$>>= \mq ->
|
||||
readTVarIO (handles mq)
|
||||
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
|
||||
|
||||
queueRec' = queueRec
|
||||
{-# INLINE queueRec' #-}
|
||||
@@ -356,24 +364,26 @@ instance MsgStoreClass JournalMsgStore where
|
||||
-- only runs action if queue is not empty
|
||||
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
|
||||
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
|
||||
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
|
||||
Nothing ->
|
||||
E.bracket
|
||||
(unStoreIO $ getPeekMsgQueue ms rId q)
|
||||
(mapM_ $ \_ -> closeMsgQueue q)
|
||||
(maybe (pure (Nothing, 0)) (unStoreIO . run))
|
||||
where
|
||||
run (mq, _) = do
|
||||
r <- action mq
|
||||
sz <- getQueueSize_ mq
|
||||
pure (Just r, sz)
|
||||
Just mq -> do
|
||||
ts <- readTVarIO $ activeAt q
|
||||
r <- if now - ts >= idleInterval config
|
||||
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
|
||||
else pure Nothing
|
||||
sz <- unStoreIO $ getQueueSize_ mq
|
||||
pure (r, sz)
|
||||
StoreIO $
|
||||
readTVarIO (msgQueue_ q) >>= \case
|
||||
Nothing ->
|
||||
E.bracket
|
||||
(unStoreIO $ getPeekMsgQueue ms rId q)
|
||||
(mapM_ $ \_ -> closeMsgQueue q)
|
||||
(maybe (pure (Nothing, 0)) (unStoreIO . run))
|
||||
where
|
||||
run (mq, _) = do
|
||||
r <- action mq
|
||||
sz <- getQueueSize_ mq
|
||||
pure (Just r, sz)
|
||||
Just mq -> do
|
||||
ts <- readTVarIO $ activeAt q
|
||||
r <-
|
||||
if now - ts >= idleInterval config
|
||||
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
|
||||
else pure Nothing
|
||||
sz <- unStoreIO $ getQueueSize_ mq
|
||||
pure (r, sz)
|
||||
|
||||
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue ms rId q =
|
||||
@@ -382,8 +392,9 @@ instance MsgStoreClass JournalMsgStore where
|
||||
deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int))
|
||||
deleteQueueSize ms rId q =
|
||||
deleteQueue_ ms rId q >>= mapM (traverse getSize)
|
||||
-- traverse operates on the second tuple element
|
||||
where
|
||||
-- traverse operates on the second tuple element
|
||||
|
||||
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
|
||||
|
||||
getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message]
|
||||
@@ -428,7 +439,9 @@ instance MsgStoreClass JournalMsgStore where
|
||||
!st' = st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1}
|
||||
hAppend wh (bytePos ws) msgStr
|
||||
updateQueueState q logState hs st' $
|
||||
when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen))
|
||||
when (size == 0) $
|
||||
writeTVar (tipMsg q) $
|
||||
Just (Just (msg, msgLen))
|
||||
where
|
||||
JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q
|
||||
createQueueDir = do
|
||||
@@ -469,12 +482,15 @@ instance MsgStoreClass JournalMsgStore where
|
||||
pure msg
|
||||
|
||||
tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
|
||||
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
|
||||
void $
|
||||
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
|
||||
$>>= (pure . fmap snd)
|
||||
$>>= \len -> readTVarIO handles
|
||||
$>>= \hs -> updateReadPos mq logState len hs $> Just ()
|
||||
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState =
|
||||
StoreIO $
|
||||
(`E.finally` when logState (updateActiveAt q)) $
|
||||
void $
|
||||
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
|
||||
$>>= (pure . fmap snd)
|
||||
$>>= \len ->
|
||||
readTVarIO handles
|
||||
$>>= \hs -> updateReadPos mq logState len hs $> Just ()
|
||||
|
||||
isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
|
||||
isolateQueue rId JournalQueue {queueLock} op =
|
||||
@@ -619,7 +635,8 @@ openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do
|
||||
openJournal JournalState {journalId} =
|
||||
let path = journalFilePath dir journalId
|
||||
in ifM (doesFileExist path) (Right <$> openFile path ReadWriteMode) (pure $ Left path)
|
||||
-- do that for all append operations
|
||||
|
||||
-- do that for all append operations
|
||||
|
||||
fixFileSize :: Handle -> Int64 -> IO ()
|
||||
fixFileSize h pos = do
|
||||
@@ -723,8 +740,9 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
|
||||
|
||||
deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue))
|
||||
deleteQueue_ ms rId q =
|
||||
runExceptT $ isolateQueueId "deleteQueue_" ms rId $
|
||||
deleteQueue' ms rId q >>= mapM remove
|
||||
runExceptT $
|
||||
isolateQueueId "deleteQueue_" ms rId $
|
||||
deleteQueue' ms rId q >>= mapM remove
|
||||
where
|
||||
remove r@(_, mq_) = do
|
||||
mapM_ closeMsgQueueHandles mq_
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.STM
|
||||
( STMMsgStore (..),
|
||||
@@ -28,14 +28,16 @@ import Simplex.Messaging.Server.QueueStore.STM
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util ((<$$>), ($>>=))
|
||||
import Simplex.Messaging.Util (($>>=), (<$$>))
|
||||
import System.IO (IOMode (..))
|
||||
|
||||
data STMMsgStore = STMMsgStore
|
||||
{ storeConfig :: STMStoreConfig,
|
||||
queues :: TMap RecipientId STMQueue,
|
||||
senders :: TMap SenderId RecipientId,
|
||||
notifiers :: TMap NotifierId RecipientId,
|
||||
queues :: TMap RecipientId (QueueReference STMQueue),
|
||||
queueCount :: TVar Int,
|
||||
notifierCount :: TVar Int,
|
||||
-- senders :: TMap SenderId RecipientId,
|
||||
-- notifiers :: TMap NotifierId RecipientId,
|
||||
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
|
||||
}
|
||||
|
||||
@@ -59,8 +61,11 @@ data STMStoreConfig = STMStoreConfig
|
||||
|
||||
instance STMQueueStore STMMsgStore where
|
||||
queues' = queues
|
||||
senders' = senders
|
||||
notifiers' = notifiers
|
||||
queueCount' = queueCount
|
||||
notifierCount' = notifierCount
|
||||
|
||||
-- senders' = senders
|
||||
-- notifiers' = notifiers
|
||||
storeLog' = storeLog
|
||||
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
|
||||
msgQueue_' = msgQueue_
|
||||
@@ -74,10 +79,12 @@ instance MsgStoreClass STMMsgStore where
|
||||
newMsgStore :: STMStoreConfig -> IO STMMsgStore
|
||||
newMsgStore storeConfig = do
|
||||
queues <- TM.emptyIO
|
||||
senders <- TM.emptyIO
|
||||
notifiers <- TM.emptyIO
|
||||
queueCount <- newTVarIO 0
|
||||
notifierCount <- newTVarIO 0
|
||||
-- senders <- TM.emptyIO
|
||||
-- notifiers <- TM.emptyIO
|
||||
storeLog <- newTVarIO Nothing
|
||||
pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog}
|
||||
pure STMMsgStore {storeConfig, queues, queueCount, notifierCount, storeLog}
|
||||
|
||||
setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO ()
|
||||
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
|
||||
@@ -113,20 +120,22 @@ instance MsgStoreClass STMMsgStore where
|
||||
|
||||
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
|
||||
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
|
||||
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
|
||||
Just q -> do
|
||||
r <- action q
|
||||
sz <- getQueueSize_ q
|
||||
pure (Just r, sz)
|
||||
Nothing -> pure (Nothing, 0)
|
||||
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action =
|
||||
readTVar msgQueue_ >>= \case
|
||||
Just q -> do
|
||||
r <- action q
|
||||
sz <- getQueueSize_ q
|
||||
pure (Just r, sz)
|
||||
Nothing -> pure (Nothing, 0)
|
||||
|
||||
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
|
||||
|
||||
deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
|
||||
deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize)
|
||||
-- traverse operates on the second tuple element
|
||||
where
|
||||
-- traverse operates on the second tuple element
|
||||
|
||||
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
|
||||
|
||||
getQueueMessages_ :: Bool -> STMMsgQueue -> STM [Message]
|
||||
|
||||
@@ -30,9 +30,12 @@ import Simplex.Messaging.Util ((<$$>))
|
||||
import System.IO (IOMode (..))
|
||||
|
||||
class MsgStoreClass s => STMQueueStore s where
|
||||
queues' :: s -> TMap RecipientId (StoreQueue s)
|
||||
senders' :: s -> TMap SenderId RecipientId
|
||||
notifiers' :: s -> TMap NotifierId RecipientId
|
||||
queues' :: s -> TMap QueueId (QueueReference (StoreQueue s))
|
||||
queueCount' :: s -> TVar Int
|
||||
notifierCount' :: s -> TVar Int
|
||||
|
||||
-- senders' :: s -> TMap SenderId RecipientId
|
||||
-- notifiers' :: s -> TMap NotifierId RecipientId
|
||||
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
|
||||
mkQueue :: s -> QueueRec -> STM (StoreQueue s)
|
||||
msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s))
|
||||
@@ -45,7 +48,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
|
||||
newMsgStore :: MsgStoreConfig s -> IO s
|
||||
setStoreLog :: s -> StoreLog 'WriteMode -> IO ()
|
||||
closeMsgStore :: s -> IO ()
|
||||
activeMsgQueues :: s -> TMap RecipientId (StoreQueue s)
|
||||
activeMsgQueues :: s -> TMap RecipientId (QueueReference (StoreQueue s))
|
||||
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
|
||||
logQueueStates :: s -> IO ()
|
||||
logQueueState :: StoreQueue s -> StoreMonad s ()
|
||||
@@ -76,9 +79,10 @@ data AMSType = forall s. AMSType (SMSType s)
|
||||
withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
|
||||
withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs
|
||||
where
|
||||
run !acc (k, v) = do
|
||||
run !acc (k, QRRecipient v) = do
|
||||
r <- f k v
|
||||
pure $! acc <> r
|
||||
run acc _ = pure acc
|
||||
|
||||
getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
|
||||
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
|
||||
|
||||
@@ -11,6 +11,11 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
|
||||
data QueueReference q
|
||||
= QRRecipient q
|
||||
| QRSender q
|
||||
| QRNotifier q
|
||||
|
||||
data QueueRec = QueueRec
|
||||
{ recipientId :: !RecipientId,
|
||||
recipientKey :: !RcvPublicAuthKey,
|
||||
|
||||
@@ -50,25 +50,36 @@ import System.IO
|
||||
import UnliftIO.STM
|
||||
|
||||
addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s))
|
||||
addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}=
|
||||
addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier} =
|
||||
atomically add
|
||||
$>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr)
|
||||
where
|
||||
add = ifM hasId (pure $ Left DUPLICATE_) $ do
|
||||
q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing
|
||||
TM.insert rId q $ queues' st
|
||||
TM.insert sId rId $ senders' st
|
||||
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st
|
||||
add = ifM duplicateIds (pure $ Left DUPLICATE_) $ do
|
||||
q <- mkQueue st qr
|
||||
TM.insert rId (QRRecipient q) qs
|
||||
TM.insert sId (QRSender q) qs
|
||||
modifyTVar' (queueCount' st) (+ 1)
|
||||
forM_ notifier $ \NtfCreds {notifierId} -> do
|
||||
TM.insert notifierId (QRNotifier q) qs
|
||||
modifyTVar' (notifierCount' st) (+ 1)
|
||||
pure $ Right q
|
||||
hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
|
||||
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier
|
||||
duplicateIds
|
||||
| rId == sId || sameNtf rId || sameNtf sId = pure False
|
||||
| otherwise = or <$> sequence [TM.member rId qs, TM.member sId qs, hasNotifier]
|
||||
sameNtf qId = maybe False ((qId ==) . notifierId) notifier
|
||||
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId qs) notifier
|
||||
qs = queues' st
|
||||
|
||||
getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
|
||||
getQueue st party qId =
|
||||
maybe (Left AUTH) Right <$> case party of
|
||||
SRecipient -> TM.lookupIO qId $ queues' st
|
||||
SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st)
|
||||
SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st)
|
||||
getQueue st party qId = fmap (maybe (Left AUTH) Right) $ do
|
||||
q_ <- TM.lookupIO qId qs
|
||||
pure $ case (q_, party) of
|
||||
(Just (QRRecipient q), SRecipient) -> Just q
|
||||
(Just (QRSender q), SSender) -> Just q -- -> TM.lookupIO sId qs $>>= \case QRRecipient q -> pure $ Just q; _ -> pure Nothing
|
||||
(Just (QRNotifier q), SNotifier) -> Just q -- -> TM.lookupIO nId qs $>>= \case QRRecipient q -> pure $ Just q; _ -> pure Nothing
|
||||
_ -> Nothing
|
||||
where
|
||||
qs = queues' st
|
||||
|
||||
getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
|
||||
getQueueRec st party qId =
|
||||
@@ -93,11 +104,13 @@ addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
|
||||
$>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
|
||||
where
|
||||
qr = queueRec' sq
|
||||
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
|
||||
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
|
||||
qs = queues' st
|
||||
add q@QueueRec {recipientId = rId} = ifM (TM.member nId qs) (pure $ Left DUPLICATE_) $ do
|
||||
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId qs $> notifierId
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
writeTVar qr $ Just q'
|
||||
TM.insert nId rId $ notifiers' st
|
||||
TM.insert nId (QRNotifier sq) qs
|
||||
modifyTVar' (notifierCount' st) (+ 1)
|
||||
pure $ Right (rId, nId_)
|
||||
|
||||
deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
|
||||
@@ -107,8 +120,9 @@ deleteQueueNotifier st sq =
|
||||
where
|
||||
qr = queueRec' sq
|
||||
delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do
|
||||
TM.delete notifierId $ notifiers' st
|
||||
writeTVar qr $! Just q {notifier = Nothing}
|
||||
TM.delete notifierId $ queues' st
|
||||
modifyTVar' (notifierCount' st) (subtract 1)
|
||||
writeTVar qr $ Just q {notifier = Nothing}
|
||||
pure notifierId
|
||||
|
||||
suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
|
||||
@@ -118,7 +132,7 @@ suspendQueue st sq =
|
||||
where
|
||||
qr = queueRec' sq
|
||||
suspend q = do
|
||||
writeTVar qr $! Just q {status = QueueOff}
|
||||
writeTVar qr $ Just q {status = QueueOff}
|
||||
pure $ recipientId q
|
||||
|
||||
updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
|
||||
@@ -129,7 +143,7 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
|
||||
| updatedAt == Just t = pure (q, False)
|
||||
| otherwise =
|
||||
let !q' = q {updatedAt = Just t}
|
||||
in (writeTVar qr $! Just q') $> (q', True)
|
||||
in writeTVar qr (Just q') $> (q', True)
|
||||
log' (q, changed)
|
||||
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t)
|
||||
| otherwise = pure $ Right q
|
||||
@@ -137,14 +151,19 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
|
||||
deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
|
||||
deleteQueue' st rId sq =
|
||||
atomically (readQueueRec qr >>= mapM delete)
|
||||
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId)
|
||||
>>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing))
|
||||
$>>= \q ->
|
||||
withLog "deleteQueue" st (`logDeleteQueue` rId)
|
||||
>>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing))
|
||||
where
|
||||
qr = queueRec' sq
|
||||
qs = queues' st
|
||||
delete q = do
|
||||
writeTVar qr Nothing
|
||||
TM.delete (senderId q) $ senders' st
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st
|
||||
TM.delete (senderId q) qs
|
||||
modifyTVar' (queueCount' st) (subtract 1)
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
|
||||
TM.delete notifierId qs
|
||||
modifyTVar' (notifierCount' st) (subtract 1)
|
||||
pure q
|
||||
|
||||
readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec)
|
||||
|
||||
@@ -127,7 +127,7 @@ instance StrEncoding StoreLogRecord where
|
||||
SuspendQueue rId -> strEncode (SuspendQueue_, rId)
|
||||
DeleteQueue rId -> strEncode (DeleteQueue_, rId)
|
||||
DeleteNotifier rId -> strEncode (DeleteNotifier_, rId)
|
||||
UpdateTime rId t -> strEncode (UpdateTime_, rId, t)
|
||||
UpdateTime rId t -> strEncode (UpdateTime_, rId, t)
|
||||
|
||||
strP =
|
||||
strP_ >>= \case
|
||||
@@ -226,8 +226,9 @@ readWriteStoreLog readStore writeStore f st =
|
||||
writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO ()
|
||||
writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs
|
||||
where
|
||||
writeQueue (rId, q) =
|
||||
writeQueue (rId, QRRecipient q) =
|
||||
readTVarIO (queueRec' q) >>= \case
|
||||
Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them
|
||||
Nothing -> atomically $ TM.delete rId $ activeMsgQueues st
|
||||
Nothing -> atomically $ TM.delete rId $ queues' st
|
||||
writeQueue _ = pure ()
|
||||
active QueueRec {status} = status == QueueActive
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
@@ -109,4 +110,4 @@ testSMPStoreLog testSuite tests =
|
||||
([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile
|
||||
compacted' `shouldBe` compacted
|
||||
storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec)
|
||||
storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec'))
|
||||
storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (\case QRQueue q -> readTVarIO (queueRec' q); _ -> pure Nothing))
|
||||
|
||||
Reference in New Issue
Block a user