remove recipientId from QueueRec, statePath from queue object

This commit is contained in:
Evgeny Poberezkin
2024-12-13 18:29:23 +00:00
parent f3fc087800
commit dfa77a1efa
9 changed files with 104 additions and 113 deletions
+13 -14
View File
@@ -795,8 +795,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
CPDelete qId -> withUserRole $ unliftIO u $ do
AMS _ st <- asks msgStore
r <- liftIO $ runExceptT $ do
(q, qr) <- ExceptT (getQueueRec st SSender qId) `catchE` \_ -> ExceptT (getQueueRec st SRecipient qId)
ExceptT $ deleteQueueSize st (recipientId qr) q
q <- ExceptT (getQueue st SSender qId) `catchE` \_ -> ExceptT (getQueue st SRecipient qId)
ExceptT $ deleteQueueSize st q
case r of
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
Right (qr, numDeleted) -> do
@@ -1199,10 +1199,9 @@ client
updatedAt <- Just <$> liftIO getSystemDate
let rcvDhSecret = C.dh' dhKey privDhKey
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure}
qRec (recipientId, senderId) =
qRec senderId =
QueueRec
{ recipientId,
senderId,
{ senderId,
recipientKey,
rcvDhSecret,
senderKey = Nothing,
@@ -1214,12 +1213,12 @@ client
(corrId,entId,) <$> addQueueRetry 3 qik qRec
where
addQueueRetry ::
Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> ((RecipientId, SenderId) -> QueueRec) -> M BrokerMsg
Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> (SenderId -> QueueRec) -> M BrokerMsg
addQueueRetry 0 _ _ = pure $ ERR INTERNAL
addQueueRetry n qik qRec = do
ids <- getIds
let qr = qRec ids
liftIO (addQueue ms qr) >>= \case
ids@(rId, sId) <- getIds
let qr = qRec sId
liftIO (addQueue ms rId qr) >>= \case
Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec
Left e -> pure $ ERR e
Right q -> do
@@ -1296,7 +1295,7 @@ client
incStat $ qSubDuplicate stats
atomically (tryTakeTMVar $ delivered s) >> deliver False s
where
rId = recipientId qr
rId = recipientId' q
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clientId, True)
@@ -1449,10 +1448,10 @@ client
when (notification msgFlags) $ do
mapM_ (`enqueueNotification` msg) (notifier qr)
incStat $ msgSentNtf stats
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId' q)
incStat $ msgSent stats
incStat $ msgCount stats
liftIO $ updatePeriodStats (activeQueues stats) (recipientId qr)
liftIO $ updatePeriodStats (activeQueues stats) (recipientId' q)
pure ok
where
THandleParams {thVersion} = thParams'
@@ -1481,7 +1480,7 @@ client
whenM (TM.memberIO rId subscribers) $
atomically deliverToSub >>= mapM_ forkDeliver
where
rId = recipientId qr
rId = recipientId' q
deliverToSub =
-- lookup has ot be in the same transaction,
-- so that if subscription ends, it re-evalutates
@@ -1625,7 +1624,7 @@ client
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg)
delQueueAndMsgs (q, _) = do
liftIO (deleteQueue ms entId q) >>= \case
liftIO (deleteQueue ms q) >>= \case
Right qr -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ do
@@ -236,29 +236,27 @@ instance STMQueueStore (JournalMsgStore 'MSMemory) where
senders' = senders . queueStore
notifiers' = notifiers . queueStore
storeLog' = storeLog . queueStore
mkQueue st qr@QueueRec {recipientId} = do
lock <- atomically $ getMapLock (queueLocks st) recipientId
makeQueue st lock qr
mkQueue st rId qr = do
lock <- atomically $ getMapLock (queueLocks st) rId
makeQueue st lock rId qr
makeQueue :: JournalMsgStore s -> Lock -> QueueRec -> IO (JournalQueue s)
makeQueue st queueLock qr@QueueRec {recipientId} = do
makeQueue :: JournalMsgStore s -> Lock -> RecipientId -> QueueRec -> IO (JournalQueue s)
makeQueue st queueLock rId qr = do
queueRec <- newTVarIO $ Just qr
msgQueue_ <- newTVarIO Nothing
activeAt <- newTVarIO 0
isEmpty <- newTVarIO Nothing
pure
JournalQueue
{ recipientId,
{ recipientId = rId,
queueLock,
queueRec,
msgQueue_,
activeAt,
isEmpty,
queueDirectory = msgQueueDirectory st recipientId
queueDirectory = msgQueueDirectory st rId
}
-- statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId)
instance MsgStoreClass (JournalMsgStore s) where
type StoreMonad (JournalMsgStore s) = StoreIO s
type StoreQueue (JournalMsgStore s) = JournalQueue s
@@ -371,15 +369,15 @@ instance MsgStoreClass (JournalMsgStore s) where
pure QueueCounts {queueCount, notifierCount}
JQStore {} -> undefined
addQueue :: JournalMsgStore s -> QueueRec -> IO (Either ErrorType (JournalQueue s))
addQueue st@JournalMsgStore {queueLocks = ls} qr@QueueRec {recipientId = rId, senderId = sId, notifier} = case queueStore st of
MQStore {} -> addQueue' st qr
addQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (Either ErrorType (JournalQueue s))
addQueue st@JournalMsgStore {queueLocks = ls} rId qr@QueueRec {senderId = sId, notifier} = case queueStore st of
MQStore {} -> addQueue' st rId qr
JQStore {queues_, senders_, notifiers_} -> do
lock <- atomically $ getMapLock ls rId
tryStore "addQueue" rId $
withLock' lock "addQueue" $ withLockMap ls sId "addQueueS" $ withNotifierLock $
ifM hasAnyId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do
q <- makeQueue st lock qr
q <- makeQueue st lock rId qr
storeQueue_ q qr
atomically $ TM.insert rId (Just q) queues_
saveQueueRef st sId rId senders_
@@ -421,7 +419,7 @@ instance MsgStoreClass (JournalMsgStore s) where
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = case queueStore st of
MQStore {} -> addQueueNotifier' st sq ntfCreds
JQStore {notifiers_} ->
isolateQueueRec sq "addQueueNotifier" $ \q@QueueRec {recipientId = rId} ->
isolateQueueRec sq "addQueueNotifier" $ \q ->
withLockMap (queueLocks st) nId "addQueueNotifierN" $
ifM hasNotifierId (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId = nId'} ->
@@ -430,7 +428,7 @@ instance MsgStoreClass (JournalMsgStore s) where
atomically $ TM.delete nId' notifiers_
pure nId'
storeQueue sq q {notifier = Just ntfCreds}
saveQueueRef st nId rId notifiers_
saveQueueRef st nId (recipientId sq) notifiers_
pure $ Right nId_
where
hasNotifierId = anyM [hasId, hasDir]
@@ -518,13 +516,12 @@ instance MsgStoreClass (JournalMsgStore s) where
sz <- unStoreIO $ getQueueSize_ mq
pure (r, sz)
deleteQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
fst <$$> deleteQueue_ ms rId q
deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteQueue_ ms q
deleteQueueSize :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q =
deleteQueue_ ms rId q >>= mapM (traverse getSize)
deleteQueueSize :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q =
deleteQueue_ ms q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
@@ -893,12 +890,13 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
&& msgPos ws == msgCount ws
&& bytePos ws == byteCount ws
deleteQueue_ :: forall s. JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ ms rId q =
deleteQueue_ :: forall s. JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
deleteQueue_ ms q =
isolateQueueId "deleteQueue_" ms rId $ case queueStore ms of
MQStore {} -> deleteQueue' ms rId q >>= mapM remove
MQStore {} -> deleteQueue' ms q >>= mapM remove
JQStore {} -> undefined
where
rId = recipientId q
remove :: (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Maybe (JournalMsgQueue s))
remove r@(_, mq_) = do
mapM_ closeMsgQueueHandles mq_
+5 -6
View File
@@ -64,8 +64,7 @@ instance STMQueueStore STMMsgStore where
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr@QueueRec {recipientId} =
STMQueue recipientId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
instance MsgStoreClass STMMsgStore where
type StoreMonad STMMsgStore = STM
@@ -156,11 +155,11 @@ instance MsgStoreClass STMMsgStore where
pure (Just r, sz)
Nothing -> pure (Nothing, 0)
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteQueue' ms q
deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize)
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
@@ -33,7 +33,7 @@ class MsgStoreClass s => STMQueueStore s where
senders' :: s -> TMap SenderId RecipientId
notifiers' :: s -> TMap NotifierId RecipientId
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
mkQueue :: s -> QueueRec -> IO (StoreQueue s)
mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s)
class Monad (StoreMonad s) => MsgStoreClass s where
type StoreMonad s = (m :: Type -> Type) | m -> s
@@ -52,7 +52,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s))
queueCounts :: s -> IO QueueCounts
addQueue :: s -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue :: s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
getQueue :: DirectParty p => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
secureQueue :: s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
addQueueNotifier :: s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
@@ -65,8 +65,8 @@ class Monad (StoreMonad s) => MsgStoreClass s where
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message]
writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
+1 -2
View File
@@ -12,8 +12,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
data QueueRec = QueueRec
{ recipientId :: !RecipientId,
recipientKey :: !RcvPublicAuthKey,
{ recipientKey :: !RcvPublicAuthKey,
rcvDhSecret :: !RcvDhSecret,
senderId :: !SenderId,
senderKey :: !(Maybe SndPublicAuthKey),
+20 -21
View File
@@ -43,10 +43,10 @@ import Simplex.Messaging.Util (anyM, ifM, ($>>=), (<$$))
import System.IO
import UnliftIO.STM
addQueue' :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier} =
(mkQueue st qr >>= atomically . add)
$>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr)
addQueue' :: STMQueueStore s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue' st rId qr@QueueRec {senderId = sId, notifier} =
(mkQueue st rId qr >>= atomically . add)
$>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr)
where
add q = ifM hasId (pure $ Left DUPLICATE_) $ do
TM.insert rId q $ queues' st
@@ -66,35 +66,36 @@ getQueue' st party qId =
secureQueue' :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue' st sq sKey =
atomically (readQueueRec qr $>>= secure)
$>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey
$>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey
where
qr = queueRec' sq
secure q@QueueRec {recipientId = rId} = case senderKey q of
Just k -> pure $ if sKey == k then Right rId else Left AUTH
secure q = case senderKey q of
Just k -> pure $ if sKey == k then Right () else Left AUTH
Nothing -> do
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right rId
pure $ Right ()
addQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier' st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
qr = queueRec' sq
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
rId = recipientId' sq
add q = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId $ notifiers' st
pure $ Right (rId, nId_)
pure $ Right nId_
deleteQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq)
where
qr = queueRec' sq
delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers' st
writeTVar qr $! Just q {notifier = Nothing}
pure notifierId
@@ -102,12 +103,10 @@ deleteQueueNotifier' st sq =
suspendQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue' st sq =
atomically (readQueueRec qr >>= mapM suspend)
$>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId)
$>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq)
where
qr = queueRec' sq
suspend q = do
writeTVar qr $! Just q {status = QueueOff}
pure $ recipientId q
suspend q = writeTVar qr $! Just q {status = QueueOff}
updateQueueTime' :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
@@ -119,13 +118,13 @@ updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log
let !q' = q {updatedAt = Just t}
in (writeTVar qr $! Just q') $> (q', True)
log' (q, changed)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t)
| otherwise = pure $ Right q
deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st rId sq =
deleteQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq)
>>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing))
where
qr = queueRec' sq
+11 -13
View File
@@ -57,7 +57,7 @@ import System.Directory (doesFileExist, renameFile)
import System.IO
data StoreLogRecord
= CreateQueue QueueRec
= CreateQueue RecipientId QueueRec
| SecureQueue QueueId SndPublicAuthKey
| AddNotifier QueueId NtfCreds
| SuspendQueue QueueId
@@ -76,10 +76,9 @@ data SLRTag
| UpdateTime_
instance StrEncoding QueueRec where
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} =
strEncode QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} =
B.unwords
[ "rid=" <> strEncode recipientId,
"rk=" <> strEncode recipientKey,
[ "rk=" <> strEncode recipientKey,
"rdh=" <> strEncode rcvDhSecret,
"sid=" <> strEncode senderId,
"sk=" <> strEncode senderKey
@@ -93,7 +92,6 @@ instance StrEncoding QueueRec where
updatedAtStr t = " updated_at=" <> strEncode t
strP = do
recipientId <- "rid=" *> strP_
recipientKey <- "rk=" *> strP_
rcvDhSecret <- "rdh=" *> strP_
senderId <- "sid=" *> strP_
@@ -101,7 +99,7 @@ instance StrEncoding QueueRec where
sndSecure <- (" sndSecure=" *> strP) <|> pure False
notifier <- optional $ " notifier=" *> strP
updatedAt <- optional $ " updated_at=" *> strP
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt}
pure QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt}
instance StrEncoding SLRTag where
strEncode = \case
@@ -126,7 +124,7 @@ instance StrEncoding SLRTag where
instance StrEncoding StoreLogRecord where
strEncode = \case
CreateQueue q -> strEncode (CreateQueue_, q)
CreateQueue rId q -> strEncode (CreateQueue_, Str $ "rid=" <> strEncode rId, q)
SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey)
AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds)
SuspendQueue rId -> strEncode (SuspendQueue_, rId)
@@ -136,7 +134,7 @@ instance StrEncoding StoreLogRecord where
strP =
strP_ >>= \case
CreateQueue_ -> CreateQueue <$> strP
CreateQueue_ -> CreateQueue <$> ("rid=" *> strP_) <*> strP
SecureQueue_ -> SecureQueue <$> strP_ <*> strP
AddNotifier_ -> AddNotifier <$> strP_ <*> strP
SuspendQueue_ -> SuspendQueue <$> strP
@@ -172,8 +170,8 @@ writeStoreLogRecord (WriteStoreLog _ h) r = E.uninterruptibleMask_ $ do
B.hPut h $ strEncode r `B.snoc` '\n' -- hPutStrLn makes write non-atomic for length > 1024
hFlush h
logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO ()
logCreateQueue s = writeStoreLogRecord s . CreateQueue
logCreateQueue :: StoreLog 'WriteMode -> RecipientId -> QueueRec -> IO ()
logCreateQueue s rId q = writeStoreLogRecord s $ CreateQueue rId q
logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SndPublicAuthKey -> IO ()
logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey
@@ -233,7 +231,7 @@ writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.
where
writeQueue (rId, q) =
readTVarIO (queueRec' q) >>= \case
Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them
Just q' -> when (active q') $ logCreateQueue s rId q' -- TODO we should log suspended queues when we use them
Nothing -> atomically $ TM.delete rId $ activeMsgQueues st
active QueueRec {status} = status == QueueActive
@@ -246,11 +244,11 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin
s = LB.toStrict s'
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue"
CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue"
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
printError :: String -> IO ()
+17 -18
View File
@@ -105,8 +105,7 @@ testNewQueueRec g sndSecure = do
(k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g
let qr =
QueueRec
{ recipientId = rId,
recipientKey,
{ recipientKey,
rcvDhSecret = C.dh' k pk,
senderId,
senderKey = Nothing,
@@ -122,7 +121,7 @@ testGetQueue ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
runRight_ $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
Just (Message {msgId = mId2}, False) <- write "message 2"
@@ -157,14 +156,14 @@ testGetQueue ms = do
(Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7
(Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8
(Nothing, Nothing) <- tryDelPeekMsg ms q mId8
void $ ExceptT $ deleteQueue ms rId q
void $ ExceptT $ deleteQueue ms q
testChangeReadJournal :: STMQueueStore s => s -> IO ()
testChangeReadJournal ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
runRight_ $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
(Msg "message 1", Nothing) <- tryDelPeekMsg ms q mId1
@@ -176,7 +175,7 @@ testChangeReadJournal ms = do
(Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4
Just (Message {msgId = mId5}, True) <- write "message 5"
(Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5
void $ ExceptT $ deleteQueue ms rId q
void $ ExceptT $ deleteQueue ms q
testExportImportStore :: JournalMsgStore s -> IO ()
testExportImportStore ms = do
@@ -186,12 +185,12 @@ testExportImportStore ms = do
sl <- readWriteQueueStore testStoreLogFile ms
runRight_ $ do
let write q s = writeMsg ms q True =<< mkMessage s
q1 <- ExceptT $ addQueue ms qr1
liftIO $ logCreateQueue sl qr1
q1 <- ExceptT $ addQueue ms rId1 qr1
liftIO $ logCreateQueue sl rId1 qr1
Just (Message {}, True) <- write q1 "message 1"
Just (Message {}, False) <- write q1 "message 2"
q2 <- ExceptT $ addQueue ms qr2
liftIO $ logCreateQueue sl qr2
q2 <- ExceptT $ addQueue ms rId2 qr2
liftIO $ logCreateQueue sl rId2 qr2
Just (Message {msgId = mId3}, True) <- write q2 "message 3"
Just (Message {msgId = mId4}, False) <- write q2 "message 4"
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q2 mId3
@@ -300,7 +299,7 @@ testMessageState ms = do
write q s = writeMsg ms q True =<< mkMessage s
mId1 <- runRight $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {}, False) <- write q "message 2"
liftIO $ closeMsgQueue q
@@ -322,7 +321,7 @@ testReadFileMissing ms = do
(rId, qr) <- testNewQueueRec g True
let write q s = writeMsg ms q True =<< mkMessage s
q <- runRight $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
Just (Message {}, True) <- write q "message 1"
Msg "message 1" <- tryPeekMsg ms q
pure q
@@ -344,7 +343,7 @@ testReadFileMissingSwitch :: JournalMsgStore s -> IO ()
testReadFileMissingSwitch ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
q <- writeMessages ms qr
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue_' q)
MsgQueueState {readState = rs} <- readTVarIO $ state mq
@@ -362,7 +361,7 @@ testWriteFileMissing :: JournalMsgStore s -> IO ()
testWriteFileMissing ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
q <- writeMessages ms qr
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue_' q)
MsgQueueState {writeState = ws} <- readTVarIO $ state mq
@@ -385,7 +384,7 @@ testReadAndWriteFilesMissing :: JournalMsgStore s -> IO ()
testReadAndWriteFilesMissing ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
q <- writeMessages ms qr
q <- writeMessages ms rId qr
mq <- fromJust <$> readTVarIO (msgQueue_' q)
MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq
@@ -400,9 +399,9 @@ testReadAndWriteFilesMissing ms = do
Msg "message 6" <- tryPeekMsg ms q'
pure ()
writeMessages :: JournalMsgStore s -> QueueRec -> IO (JournalQueue s)
writeMessages ms qr = runRight $ do
q <- ExceptT $ addQueue ms qr
writeMessages :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s)
writeMessages ms rId qr = runRight $ do
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
Just (Message {msgId = mId2}, False) <- write "message 2"
+11 -11
View File
@@ -57,38 +57,38 @@ storeLogTests =
testSMPStoreLog ("SMP server store log, sndSecure = " <> show sndSecure)
[ SLTC
{ name = "create new queue",
saved = [CreateQueue qr],
compacted = [CreateQueue qr],
saved = [CreateQueue rId qr],
compacted = [CreateQueue rId qr],
state = M.fromList [(rId, qr)]
},
SLTC
{ name = "secure queue",
saved = [CreateQueue qr, SecureQueue rId testPublicAuthKey],
compacted = [CreateQueue qr {senderKey = Just testPublicAuthKey}],
saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey],
compacted = [CreateQueue rId qr {senderKey = Just testPublicAuthKey}],
state = M.fromList [(rId, qr {senderKey = Just testPublicAuthKey})]
},
SLTC
{ name = "create and delete queue",
saved = [CreateQueue qr, DeleteQueue rId],
saved = [CreateQueue rId qr, DeleteQueue rId],
compacted = [],
state = M.fromList []
},
SLTC
{ name = "create queue and add notifier",
saved = [CreateQueue qr, AddNotifier rId ntfCreds],
compacted = [CreateQueue $ qr {notifier = Just ntfCreds}],
saved = [CreateQueue rId qr, AddNotifier rId ntfCreds],
compacted = [CreateQueue rId qr {notifier = Just ntfCreds}],
state = M.fromList [(rId, qr {notifier = Just ntfCreds})]
},
SLTC
{ name = "delete notifier",
saved = [CreateQueue qr, AddNotifier rId ntfCreds, DeleteNotifier rId],
compacted = [CreateQueue qr],
saved = [CreateQueue rId qr, AddNotifier rId ntfCreds, DeleteNotifier rId],
compacted = [CreateQueue rId qr],
state = M.fromList [(rId, qr)]
},
SLTC
{ name = "update time",
saved = [CreateQueue qr, UpdateTime rId date],
compacted = [CreateQueue qr {updatedAt = Just date}],
saved = [CreateQueue rId qr, UpdateTime rId date],
compacted = [CreateQueue rId qr {updatedAt = Just date}],
state = M.fromList [(rId, qr {updatedAt = Just date})]
}
]