diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 988639f5c..9a7503576 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -423,9 +423,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats AMS _ st <- asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st - interval = 1000000 * logInterval + let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -478,8 +476,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT pMsgFwdsOwn' <- getResetProxyStatsData pMsgFwdsOwn pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0 qCount' <- readIORef qCount - qCount'' <- M.size <$> readTVarIO queues - notifierCount' <- M.size <$> readTVarIO notifiers + qCount'' <- readTVarIO $ queueCount' st + ntfrCount' <- readTVarIO $ notifierCount' st msgCount' <- readIORef msgCount ntfCount' <- readIORef ntfCount hPutStrLn h $ @@ -538,7 +536,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT show ntfSub', show ntfSubAuth', show ntfSubDuplicate', - show notifierCount', + show ntfrCount', show qDeletedAllB', show qSubAllB', show qSubEnd', @@ -625,9 +623,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats AMS _ st <- unliftIO u $ asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st - getStat :: (ServerStats -> IORef a) -> IO a + let getStat :: (ServerStats -> IORef a) -> IO a getStat var = readIORef (var ss) putStat :: Show a => String -> (ServerStats -> IORef a) -> IO () putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v @@ -664,9 +660,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT putStat "msgNtfsB" msgNtfsB putStat "msgNtfExpired" msgNtfExpired putStat "qCount" qCount - qCount2 <- M.size <$> readTVarIO queues + qCount2 <- readTVarIO $ queueCount' st hPutStrLn h $ "qCount 2: " <> show qCount2 - notifierCount <- M.size <$> readTVarIO notifiers + notifierCount <- readTVarIO $ notifierCount' st hPutStrLn h $ "notifiers: " <> show notifierCount putStat "msgCount" msgCount putStat "ntfCount" ntfCount diff --git a/src/Simplex/Messaging/Server/MsgStore.hs b/src/Simplex/Messaging/Server/MsgStore.hs index 7bc1417c0..93cc1b70d 100644 --- a/src/Simplex/Messaging/Server/MsgStore.hs +++ b/src/Simplex/Messaging/Server/MsgStore.hs @@ -1,4 +1,3 @@ -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} module Simplex.Messaging.Server.MsgStore where diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 4e5496f66..633abef0f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -11,11 +11,11 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeFamilies #-} module Simplex.Messaging.Server.MsgStore.Journal - ( JournalMsgStore (queues, senders, notifiers, random), + ( JournalMsgStore (queues, random), JournalQueue, JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), @@ -62,9 +62,9 @@ import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM +import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>)) import System.Directory import System.Exit @@ -77,9 +77,11 @@ data JournalMsgStore = JournalMsgStore { config :: JournalStoreConfig, random :: TVar StdGen, queueLocks :: TMap RecipientId Lock, - queues :: TMap RecipientId JournalQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, + queues :: TMap RecipientId (QueueReference JournalQueue), + queueCount :: TVar Int, + notifierCount :: TVar Int, + -- senders :: TMap SenderId RecipientId, + -- notifiers :: TMap NotifierId RecipientId, storeLog :: TVar (Maybe (StoreLog 'WriteMode)) } @@ -220,8 +222,11 @@ newtype StoreIO a = StoreIO {unStoreIO :: IO a} instance STMQueueStore JournalMsgStore where queues' = queues - senders' = senders - notifiers' = notifiers + queueCount' = queueCount + notifierCount' = notifierCount + + -- senders' = senders + -- notifiers' = notifiers storeLog' = storeLog mkQueue st qr = do lock <- getMapLock (queueLocks st) $ recipientId qr @@ -243,17 +248,19 @@ instance MsgStoreClass JournalMsgStore where random <- newTVarIO =<< newStdGen queueLocks <- TM.emptyIO queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO + queueCount <- newTVarIO 0 + notifierCount <- newTVarIO 0 + -- senders <- TM.emptyIO + -- notifiers <- TM.emptyIO storeLog <- newTVarIO Nothing - pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog} + pure JournalMsgStore {config, random, queueLocks, queues, queueCount, notifierCount, storeLog} setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO () setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) closeMsgStore st = do readTVarIO (storeLog st) >>= mapM_ closeStoreLog - readTVarIO (queues st) >>= mapM_ closeMsgQueue + readTVarIO (queues st) >>= mapM_ (\case QRRecipient q -> closeMsgQueue q; _ -> pure ()) activeMsgQueues = queues {-# INLINE activeMsgQueues #-} @@ -309,8 +316,9 @@ instance MsgStoreClass JournalMsgStore where logQueueState q = StoreIO . void $ readTVarIO (msgQueue_ q) - $>>= \mq -> readTVarIO (handles mq) - $>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ()) + $>>= \mq -> + readTVarIO (handles mq) + $>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ()) queueRec' = queueRec {-# INLINE queueRec' #-} @@ -356,24 +364,26 @@ instance MsgStoreClass JournalMsgStore where -- only runs action if queue is not empty withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int) withIdleMsgQueue now ms@JournalMsgStore {config} rId q action = - StoreIO $ readTVarIO (msgQueue_ q) >>= \case - Nothing -> - E.bracket - (unStoreIO $ getPeekMsgQueue ms rId q) - (mapM_ $ \_ -> closeMsgQueue q) - (maybe (pure (Nothing, 0)) (unStoreIO . run)) - where - run (mq, _) = do - r <- action mq - sz <- getQueueSize_ mq - pure (Just r, sz) - Just mq -> do - ts <- readTVarIO $ activeAt q - r <- if now - ts >= idleInterval config - then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q - else pure Nothing - sz <- unStoreIO $ getQueueSize_ mq - pure (r, sz) + StoreIO $ + readTVarIO (msgQueue_ q) >>= \case + Nothing -> + E.bracket + (unStoreIO $ getPeekMsgQueue ms rId q) + (mapM_ $ \_ -> closeMsgQueue q) + (maybe (pure (Nothing, 0)) (unStoreIO . run)) + where + run (mq, _) = do + r <- action mq + sz <- getQueueSize_ mq + pure (Just r, sz) + Just mq -> do + ts <- readTVarIO $ activeAt q + r <- + if now - ts >= idleInterval config + then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q + else pure Nothing + sz <- unStoreIO $ getQueueSize_ mq + pure (r, sz) deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = @@ -382,8 +392,9 @@ instance MsgStoreClass JournalMsgStore where deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int)) deleteQueueSize ms rId q = deleteQueue_ ms rId q >>= mapM (traverse getSize) - -- traverse operates on the second tuple element where + -- traverse operates on the second tuple element + getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message] @@ -428,7 +439,9 @@ instance MsgStoreClass JournalMsgStore where !st' = st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1} hAppend wh (bytePos ws) msgStr updateQueueState q logState hs st' $ - when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen)) + when (size == 0) $ + writeTVar (tipMsg q) $ + Just (Just (msg, msgLen)) where JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q createQueueDir = do @@ -469,12 +482,15 @@ instance MsgStoreClass JournalMsgStore where pure msg tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO () - tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $ - void $ - readTVarIO tipMsg -- if there is no cached tipMsg, do nothing - $>>= (pure . fmap snd) - $>>= \len -> readTVarIO handles - $>>= \hs -> updateReadPos mq logState len hs $> Just () + tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = + StoreIO $ + (`E.finally` when logState (updateActiveAt q)) $ + void $ + readTVarIO tipMsg -- if there is no cached tipMsg, do nothing + $>>= (pure . fmap snd) + $>>= \len -> + readTVarIO handles + $>>= \hs -> updateReadPos mq logState len hs $> Just () isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a isolateQueue rId JournalQueue {queueLock} op = @@ -619,7 +635,8 @@ openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do openJournal JournalState {journalId} = let path = journalFilePath dir journalId in ifM (doesFileExist path) (Right <$> openFile path ReadWriteMode) (pure $ Left path) - -- do that for all append operations + +-- do that for all append operations fixFileSize :: Handle -> Int64 -> IO () fixFileSize h pos = do @@ -723,8 +740,9 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue)) deleteQueue_ ms rId q = - runExceptT $ isolateQueueId "deleteQueue_" ms rId $ - deleteQueue' ms rId q >>= mapM remove + runExceptT $ + isolateQueueId "deleteQueue_" ms rId $ + deleteQueue' ms rId q >>= mapM remove where remove r@(_, mq_) = do mapM_ closeMsgQueueHandles mq_ diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index cbeb75f9c..fc240c53c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -7,8 +7,8 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeFamilies #-} module Simplex.Messaging.Server.MsgStore.STM ( STMMsgStore (..), @@ -28,14 +28,16 @@ import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util ((<$$>), ($>>=)) +import Simplex.Messaging.Util (($>>=), (<$$>)) import System.IO (IOMode (..)) data STMMsgStore = STMMsgStore { storeConfig :: STMStoreConfig, - queues :: TMap RecipientId STMQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, + queues :: TMap RecipientId (QueueReference STMQueue), + queueCount :: TVar Int, + notifierCount :: TVar Int, + -- senders :: TMap SenderId RecipientId, + -- notifiers :: TMap NotifierId RecipientId, storeLog :: TVar (Maybe (StoreLog 'WriteMode)) } @@ -59,8 +61,11 @@ data STMStoreConfig = STMStoreConfig instance STMQueueStore STMMsgStore where queues' = queues - senders' = senders - notifiers' = notifiers + queueCount' = queueCount + notifierCount' = notifierCount + + -- senders' = senders + -- notifiers' = notifiers storeLog' = storeLog mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing msgQueue_' = msgQueue_ @@ -74,10 +79,12 @@ instance MsgStoreClass STMMsgStore where newMsgStore :: STMStoreConfig -> IO STMMsgStore newMsgStore storeConfig = do queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO + queueCount <- newTVarIO 0 + notifierCount <- newTVarIO 0 + -- senders <- TM.emptyIO + -- notifiers <- TM.emptyIO storeLog <- newTVarIO Nothing - pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog} + pure STMMsgStore {storeConfig, queues, queueCount, notifierCount, storeLog} setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO () setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) @@ -113,20 +120,22 @@ instance MsgStoreClass STMMsgStore where -- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue) withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int) - withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case - Just q -> do - r <- action q - sz <- getQueueSize_ q - pure (Just r, sz) - Nothing -> pure (Nothing, 0) + withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = + readTVar msgQueue_ >>= \case + Just q -> do + r <- action q + sz <- getQueueSize_ q + pure (Just r, sz) + Nothing -> pure (Nothing, 0) deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize) - -- traverse operates on the second tuple element where + -- traverse operates on the second tuple element + getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) getQueueMessages_ :: Bool -> STMMsgQueue -> STM [Message] diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 8754767cd..37cb943c6 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -30,9 +30,12 @@ import Simplex.Messaging.Util ((<$$>)) import System.IO (IOMode (..)) class MsgStoreClass s => STMQueueStore s where - queues' :: s -> TMap RecipientId (StoreQueue s) - senders' :: s -> TMap SenderId RecipientId - notifiers' :: s -> TMap NotifierId RecipientId + queues' :: s -> TMap QueueId (QueueReference (StoreQueue s)) + queueCount' :: s -> TVar Int + notifierCount' :: s -> TVar Int + + -- senders' :: s -> TMap SenderId RecipientId + -- notifiers' :: s -> TMap NotifierId RecipientId storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) mkQueue :: s -> QueueRec -> STM (StoreQueue s) msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) @@ -45,7 +48,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where newMsgStore :: MsgStoreConfig s -> IO s setStoreLog :: s -> StoreLog 'WriteMode -> IO () closeMsgStore :: s -> IO () - activeMsgQueues :: s -> TMap RecipientId (StoreQueue s) + activeMsgQueues :: s -> TMap RecipientId (QueueReference (StoreQueue s)) withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () @@ -76,9 +79,10 @@ data AMSType = forall s. AMSType (SMSType s) withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs where - run !acc (k, v) = do + run !acc (k, QRRecipient v) = do r <- f k v pure $! acc <> r + run acc _ = pure acc getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message] getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst) diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 3f7da8d29..7c0df1385 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -11,6 +11,11 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +data QueueReference q + = QRRecipient q + | QRSender q + | QRNotifier q + data QueueRec = QueueRec { recipientId :: !RecipientId, recipientKey :: !RcvPublicAuthKey, diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 7bf4f3a4a..8c2f8a06b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -50,25 +50,36 @@ import System.IO import UnliftIO.STM addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= +addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier} = atomically add $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) where - add = ifM hasId (pure $ Left DUPLICATE_) $ do - q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing - TM.insert rId q $ queues' st - TM.insert sId rId $ senders' st - forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st + add = ifM duplicateIds (pure $ Left DUPLICATE_) $ do + q <- mkQueue st qr + TM.insert rId (QRRecipient q) qs + TM.insert sId (QRSender q) qs + modifyTVar' (queueCount' st) (+ 1) + forM_ notifier $ \NtfCreds {notifierId} -> do + TM.insert notifierId (QRNotifier q) qs + modifyTVar' (notifierCount' st) (+ 1) pure $ Right q - hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] - hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier + duplicateIds + | rId == sId || sameNtf rId || sameNtf sId = pure False + | otherwise = or <$> sequence [TM.member rId qs, TM.member sId qs, hasNotifier] + sameNtf qId = maybe False ((qId ==) . notifierId) notifier + hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId qs) notifier + qs = queues' st getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) -getQueue st party qId = - maybe (Left AUTH) Right <$> case party of - SRecipient -> TM.lookupIO qId $ queues' st - SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st) - SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st) +getQueue st party qId = fmap (maybe (Left AUTH) Right) $ do + q_ <- TM.lookupIO qId qs + pure $ case (q_, party) of + (Just (QRRecipient q), SRecipient) -> Just q + (Just (QRSender q), SSender) -> Just q -- -> TM.lookupIO sId qs $>>= \case QRRecipient q -> pure $ Just q; _ -> pure Nothing + (Just (QRNotifier q), SNotifier) -> Just q -- -> TM.lookupIO nId qs $>>= \case QRRecipient q -> pure $ Just q; _ -> pure Nothing + _ -> Nothing + where + qs = queues' st getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) getQueueRec st party qId = @@ -93,11 +104,13 @@ addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = $>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) where qr = queueRec' sq - add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do - nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId + qs = queues' st + add q@QueueRec {recipientId = rId} = ifM (TM.member nId qs) (pure $ Left DUPLICATE_) $ do + nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId qs $> notifierId let !q' = q {notifier = Just ntfCreds} writeTVar qr $ Just q' - TM.insert nId rId $ notifiers' st + TM.insert nId (QRNotifier sq) qs + modifyTVar' (notifierCount' st) (+ 1) pure $ Right (rId, nId_) deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) @@ -107,8 +120,9 @@ deleteQueueNotifier st sq = where qr = queueRec' sq delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do - TM.delete notifierId $ notifiers' st - writeTVar qr $! Just q {notifier = Nothing} + TM.delete notifierId $ queues' st + modifyTVar' (notifierCount' st) (subtract 1) + writeTVar qr $ Just q {notifier = Nothing} pure notifierId suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) @@ -118,7 +132,7 @@ suspendQueue st sq = where qr = queueRec' sq suspend q = do - writeTVar qr $! Just q {status = QueueOff} + writeTVar qr $ Just q {status = QueueOff} pure $ recipientId q updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) @@ -129,7 +143,7 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' | updatedAt == Just t = pure (q, False) | otherwise = let !q' = q {updatedAt = Just t} - in (writeTVar qr $! Just q') $> (q', True) + in writeTVar qr (Just q') $> (q', True) log' (q, changed) | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t) | otherwise = pure $ Right q @@ -137,14 +151,19 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) deleteQueue' st rId sq = atomically (readQueueRec qr >>= mapM delete) - $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId) - >>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing)) + $>>= \q -> + withLog "deleteQueue" st (`logDeleteQueue` rId) + >>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing)) where qr = queueRec' sq + qs = queues' st delete q = do writeTVar qr Nothing - TM.delete (senderId q) $ senders' st - forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st + TM.delete (senderId q) qs + modifyTVar' (queueCount' st) (subtract 1) + forM_ (notifier q) $ \NtfCreds {notifierId} -> do + TM.delete notifierId qs + modifyTVar' (notifierCount' st) (subtract 1) pure q readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec) diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 2da3398f2..c568abc9d 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -127,7 +127,7 @@ instance StrEncoding StoreLogRecord where SuspendQueue rId -> strEncode (SuspendQueue_, rId) DeleteQueue rId -> strEncode (DeleteQueue_, rId) DeleteNotifier rId -> strEncode (DeleteNotifier_, rId) - UpdateTime rId t -> strEncode (UpdateTime_, rId, t) + UpdateTime rId t -> strEncode (UpdateTime_, rId, t) strP = strP_ >>= \case @@ -226,8 +226,9 @@ readWriteStoreLog readStore writeStore f st = writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO () writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs where - writeQueue (rId, q) = + writeQueue (rId, QRRecipient q) = readTVarIO (queueRec' q) >>= \case Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them - Nothing -> atomically $ TM.delete rId $ activeMsgQueues st + Nothing -> atomically $ TM.delete rId $ queues' st + writeQueue _ = pure () active QueueRec {status} = status == QueueActive diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index e24f9f1ea..de7477bc8 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE StandaloneDeriving #-} @@ -109,4 +110,4 @@ testSMPStoreLog testSuite tests = ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile compacted' `shouldBe` compacted storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec) - storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec')) + storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (\case QRQueue q -> readTVarIO (queueRec' q); _ -> pure Nothing))