diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0fda31b00..ac9aa52f1 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -795,8 +795,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPDelete qId -> withUserRole $ unliftIO u $ do AMS _ st <- asks msgStore r <- liftIO $ runExceptT $ do - (q, qr) <- ExceptT (getQueueRec st SSender qId) `catchE` \_ -> ExceptT (getQueueRec st SRecipient qId) - ExceptT $ deleteQueueSize st (recipientId qr) q + q <- ExceptT (getQueue st SSender qId) `catchE` \_ -> ExceptT (getQueue st SRecipient qId) + ExceptT $ deleteQueueSize st q case r of Left e -> liftIO $ hPutStrLn h $ "error: " <> show e Right (qr, numDeleted) -> do @@ -1199,10 +1199,9 @@ client updatedAt <- Just <$> liftIO getSystemDate let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure} - qRec (recipientId, senderId) = + qRec senderId = QueueRec - { recipientId, - senderId, + { senderId, recipientKey, rcvDhSecret, senderKey = Nothing, @@ -1214,12 +1213,12 @@ client (corrId,entId,) <$> addQueueRetry 3 qik qRec where addQueueRetry :: - Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> ((RecipientId, SenderId) -> QueueRec) -> M BrokerMsg + Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> (SenderId -> QueueRec) -> M BrokerMsg addQueueRetry 0 _ _ = pure $ ERR INTERNAL addQueueRetry n qik qRec = do - ids <- getIds - let qr = qRec ids - liftIO (addQueue ms qr) >>= \case + ids@(rId, sId) <- getIds + let qr = qRec sId + liftIO (addQueue ms rId qr) >>= \case Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec Left e -> pure $ ERR e Right q -> do @@ -1296,7 +1295,7 @@ client incStat $ qSubDuplicate stats atomically (tryTakeTMVar $ delivered s) >> deliver False s where - rId = recipientId qr + rId = recipientId' q newSub :: M Sub newSub = time "SUB newSub" . atomically $ do writeTQueue subscribedQ (rId, clientId, True) @@ -1449,10 +1448,10 @@ client when (notification msgFlags) $ do mapM_ (`enqueueNotification` msg) (notifier qr) incStat $ msgSentNtf stats - liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) + liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId' q) incStat $ msgSent stats incStat $ msgCount stats - liftIO $ updatePeriodStats (activeQueues stats) (recipientId qr) + liftIO $ updatePeriodStats (activeQueues stats) (recipientId' q) pure ok where THandleParams {thVersion} = thParams' @@ -1481,7 +1480,7 @@ client whenM (TM.memberIO rId subscribers) $ atomically deliverToSub >>= mapM_ forkDeliver where - rId = recipientId qr + rId = recipientId' q deliverToSub = -- lookup has ot be in the same transaction, -- so that if subscription ends, it re-evalutates @@ -1625,7 +1624,7 @@ client delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg) delQueueAndMsgs (q, _) = do - liftIO (deleteQueue ms entId q) >>= \case + liftIO (deleteQueue ms q) >>= \case Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ do diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 4db18eb4a..1c013ce71 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -236,29 +236,27 @@ instance STMQueueStore (JournalMsgStore 'MSMemory) where senders' = senders . queueStore notifiers' = notifiers . queueStore storeLog' = storeLog . queueStore - mkQueue st qr@QueueRec {recipientId} = do - lock <- atomically $ getMapLock (queueLocks st) recipientId - makeQueue st lock qr + mkQueue st rId qr = do + lock <- atomically $ getMapLock (queueLocks st) rId + makeQueue st lock rId qr -makeQueue :: JournalMsgStore s -> Lock -> QueueRec -> IO (JournalQueue s) -makeQueue st queueLock qr@QueueRec {recipientId} = do +makeQueue :: JournalMsgStore s -> Lock -> RecipientId -> QueueRec -> IO (JournalQueue s) +makeQueue st queueLock rId qr = do queueRec <- newTVarIO $ Just qr msgQueue_ <- newTVarIO Nothing activeAt <- newTVarIO 0 isEmpty <- newTVarIO Nothing pure JournalQueue - { recipientId, + { recipientId = rId, queueLock, queueRec, msgQueue_, activeAt, isEmpty, - queueDirectory = msgQueueDirectory st recipientId + queueDirectory = msgQueueDirectory st rId } --- statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId) - instance MsgStoreClass (JournalMsgStore s) where type StoreMonad (JournalMsgStore s) = StoreIO s type StoreQueue (JournalMsgStore s) = JournalQueue s @@ -371,15 +369,15 @@ instance MsgStoreClass (JournalMsgStore s) where pure QueueCounts {queueCount, notifierCount} JQStore {} -> undefined - addQueue :: JournalMsgStore s -> QueueRec -> IO (Either ErrorType (JournalQueue s)) - addQueue st@JournalMsgStore {queueLocks = ls} qr@QueueRec {recipientId = rId, senderId = sId, notifier} = case queueStore st of - MQStore {} -> addQueue' st qr + addQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (Either ErrorType (JournalQueue s)) + addQueue st@JournalMsgStore {queueLocks = ls} rId qr@QueueRec {senderId = sId, notifier} = case queueStore st of + MQStore {} -> addQueue' st rId qr JQStore {queues_, senders_, notifiers_} -> do lock <- atomically $ getMapLock ls rId tryStore "addQueue" rId $ withLock' lock "addQueue" $ withLockMap ls sId "addQueueS" $ withNotifierLock $ ifM hasAnyId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do - q <- makeQueue st lock qr + q <- makeQueue st lock rId qr storeQueue_ q qr atomically $ TM.insert rId (Just q) queues_ saveQueueRef st sId rId senders_ @@ -421,7 +419,7 @@ instance MsgStoreClass (JournalMsgStore s) where addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = case queueStore st of MQStore {} -> addQueueNotifier' st sq ntfCreds JQStore {notifiers_} -> - isolateQueueRec sq "addQueueNotifier" $ \q@QueueRec {recipientId = rId} -> + isolateQueueRec sq "addQueueNotifier" $ \q -> withLockMap (queueLocks st) nId "addQueueNotifierN" $ ifM hasNotifierId (pure $ Left DUPLICATE_) $ do nId_ <- forM (notifier q) $ \NtfCreds {notifierId = nId'} -> @@ -430,7 +428,7 @@ instance MsgStoreClass (JournalMsgStore s) where atomically $ TM.delete nId' notifiers_ pure nId' storeQueue sq q {notifier = Just ntfCreds} - saveQueueRef st nId rId notifiers_ + saveQueueRef st nId (recipientId sq) notifiers_ pure $ Right nId_ where hasNotifierId = anyM [hasId, hasDir] @@ -518,13 +516,12 @@ instance MsgStoreClass (JournalMsgStore s) where sz <- unStoreIO $ getQueueSize_ mq pure (r, sz) - deleteQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType QueueRec) - deleteQueue ms rId q = - fst <$$> deleteQueue_ ms rId q + deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec) + deleteQueue ms q = fst <$$> deleteQueue_ ms q - deleteQueueSize :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms rId q = - deleteQueue_ ms rId q >>= mapM (traverse getSize) + deleteQueueSize :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms q = + deleteQueue_ ms q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) @@ -893,12 +890,13 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws -deleteQueue_ :: forall s. JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) -deleteQueue_ ms rId q = +deleteQueue_ :: forall s. JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) +deleteQueue_ ms q = isolateQueueId "deleteQueue_" ms rId $ case queueStore ms of - MQStore {} -> deleteQueue' ms rId q >>= mapM remove + MQStore {} -> deleteQueue' ms q >>= mapM remove JQStore {} -> undefined where + rId = recipientId q remove :: (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Maybe (JournalMsgQueue s)) remove r@(_, mq_) = do mapM_ closeMsgQueueHandles mq_ diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 9378edf9f..a3a895415 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -64,8 +64,7 @@ instance STMQueueStore STMMsgStore where senders' = senders notifiers' = notifiers storeLog' = storeLog - mkQueue _ qr@QueueRec {recipientId} = - STMQueue recipientId <$> newTVarIO (Just qr) <*> newTVarIO Nothing + mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing instance MsgStoreClass STMMsgStore where type StoreMonad STMMsgStore = STM @@ -156,11 +155,11 @@ instance MsgStoreClass STMMsgStore where pure (Just r, sz) Nothing -> pure (Nothing, 0) - deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) - deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q + deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms q = fst <$$> deleteQueue' ms q - deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize) + deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 22015da4b..d349e6eba 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -33,7 +33,7 @@ class MsgStoreClass s => STMQueueStore s where senders' :: s -> TMap SenderId RecipientId notifiers' :: s -> TMap NotifierId RecipientId storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) - mkQueue :: s -> QueueRec -> IO (StoreQueue s) + mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s) class Monad (StoreMonad s) => MsgStoreClass s where type StoreMonad s = (m :: Type -> Type) | m -> s @@ -52,7 +52,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) queueCounts :: s -> IO QueueCounts - addQueue :: s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) + addQueue :: s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) getQueue :: DirectParty p => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) secureQueue :: s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) addQueueNotifier :: s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) @@ -65,8 +65,8 @@ class Monad (StoreMonad s) => MsgStoreClass s where -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) - deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec) - deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) + deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec) + deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) getQueueMessages_ :: Bool -> StoreQueue s -> MsgQueue s -> StoreMonad s [Message] writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 3f7da8d29..3f40e0f3e 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -12,8 +12,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol data QueueRec = QueueRec - { recipientId :: !RecipientId, - recipientKey :: !RcvPublicAuthKey, + { recipientKey :: !RcvPublicAuthKey, rcvDhSecret :: !RcvDhSecret, senderId :: !SenderId, senderKey :: !(Maybe SndPublicAuthKey), diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index ca2b70319..8836468b1 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -43,10 +43,10 @@ import Simplex.Messaging.Util (anyM, ifM, ($>>=), (<$$)) import System.IO import UnliftIO.STM -addQueue' :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier} = - (mkQueue st qr >>= atomically . add) - $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) +addQueue' :: STMQueueStore s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) +addQueue' st rId qr@QueueRec {senderId = sId, notifier} = + (mkQueue st rId qr >>= atomically . add) + $>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr) where add q = ifM hasId (pure $ Left DUPLICATE_) $ do TM.insert rId q $ queues' st @@ -66,35 +66,36 @@ getQueue' st party qId = secureQueue' :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue' st sq sKey = atomically (readQueueRec qr $>>= secure) - $>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey + $>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey where qr = queueRec' sq - secure q@QueueRec {recipientId = rId} = case senderKey q of - Just k -> pure $ if sKey == k then Right rId else Left AUTH + secure q = case senderKey q of + Just k -> pure $ if sKey == k then Right () else Left AUTH Nothing -> do writeTVar qr $ Just q {senderKey = Just sKey} - pure $ Right rId + pure $ Right () addQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier' st sq ntfCreds@NtfCreds {notifierId = nId} = atomically (readQueueRec qr $>>= add) - $>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) + $>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) where qr = queueRec' sq - add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do + rId = recipientId' sq + add q = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId let !q' = q {notifier = Just ntfCreds} writeTVar qr $ Just q' TM.insert nId rId $ notifiers' st - pure $ Right (rId, nId_) + pure $ Right nId_ deleteQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier' st sq = atomically (readQueueRec qr >>= mapM delete) - $>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) + $>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq) where qr = queueRec' sq - delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do + delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do TM.delete notifierId $ notifiers' st writeTVar qr $! Just q {notifier = Nothing} pure notifierId @@ -102,12 +103,10 @@ deleteQueueNotifier' st sq = suspendQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) suspendQueue' st sq = atomically (readQueueRec qr >>= mapM suspend) - $>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId) + $>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq) where qr = queueRec' sq - suspend q = do - writeTVar qr $! Just q {status = QueueOff} - pure $ recipientId q + suspend q = writeTVar qr $! Just q {status = QueueOff} updateQueueTime' :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' @@ -119,13 +118,13 @@ updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log let !q' = q {updatedAt = Just t} in (writeTVar qr $! Just q') $> (q', True) log' (q, changed) - | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t) + | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t) | otherwise = pure $ Right q -deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) -deleteQueue' st rId sq = +deleteQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) +deleteQueue' st sq = atomically (readQueueRec qr >>= mapM delete) - $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId) + $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq) >>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing)) where qr = queueRec' sq diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 889cb6046..4cc55e978 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -57,7 +57,7 @@ import System.Directory (doesFileExist, renameFile) import System.IO data StoreLogRecord - = CreateQueue QueueRec + = CreateQueue RecipientId QueueRec | SecureQueue QueueId SndPublicAuthKey | AddNotifier QueueId NtfCreds | SuspendQueue QueueId @@ -76,10 +76,9 @@ data SLRTag | UpdateTime_ instance StrEncoding QueueRec where - strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} = + strEncode QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} = B.unwords - [ "rid=" <> strEncode recipientId, - "rk=" <> strEncode recipientKey, + [ "rk=" <> strEncode recipientKey, "rdh=" <> strEncode rcvDhSecret, "sid=" <> strEncode senderId, "sk=" <> strEncode senderKey @@ -93,7 +92,6 @@ instance StrEncoding QueueRec where updatedAtStr t = " updated_at=" <> strEncode t strP = do - recipientId <- "rid=" *> strP_ recipientKey <- "rk=" *> strP_ rcvDhSecret <- "rdh=" *> strP_ senderId <- "sid=" *> strP_ @@ -101,7 +99,7 @@ instance StrEncoding QueueRec where sndSecure <- (" sndSecure=" *> strP) <|> pure False notifier <- optional $ " notifier=" *> strP updatedAt <- optional $ " updated_at=" *> strP - pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt} + pure QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt} instance StrEncoding SLRTag where strEncode = \case @@ -126,7 +124,7 @@ instance StrEncoding SLRTag where instance StrEncoding StoreLogRecord where strEncode = \case - CreateQueue q -> strEncode (CreateQueue_, q) + CreateQueue rId q -> strEncode (CreateQueue_, Str $ "rid=" <> strEncode rId, q) SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey) AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds) SuspendQueue rId -> strEncode (SuspendQueue_, rId) @@ -136,7 +134,7 @@ instance StrEncoding StoreLogRecord where strP = strP_ >>= \case - CreateQueue_ -> CreateQueue <$> strP + CreateQueue_ -> CreateQueue <$> ("rid=" *> strP_) <*> strP SecureQueue_ -> SecureQueue <$> strP_ <*> strP AddNotifier_ -> AddNotifier <$> strP_ <*> strP SuspendQueue_ -> SuspendQueue <$> strP @@ -172,8 +170,8 @@ writeStoreLogRecord (WriteStoreLog _ h) r = E.uninterruptibleMask_ $ do B.hPut h $ strEncode r `B.snoc` '\n' -- hPutStrLn makes write non-atomic for length > 1024 hFlush h -logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO () -logCreateQueue s = writeStoreLogRecord s . CreateQueue +logCreateQueue :: StoreLog 'WriteMode -> RecipientId -> QueueRec -> IO () +logCreateQueue s rId q = writeStoreLogRecord s $ CreateQueue rId q logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SndPublicAuthKey -> IO () logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey @@ -233,7 +231,7 @@ writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M. where writeQueue (rId, q) = readTVarIO (queueRec' q) >>= \case - Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them + Just q' -> when (active q') $ logCreateQueue s rId q' -- TODO we should log suspended queues when we use them Nothing -> atomically $ TM.delete rId $ activeMsgQueues st active QueueRec {status} = status == QueueActive @@ -246,11 +244,11 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin s = LB.toStrict s' procLogRecord :: StoreLogRecord -> IO () procLogRecord = \case - CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue" + CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue" SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st - DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId + DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t printError :: String -> IO () diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index c1fc2a708..0ce2fd1c3 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -105,8 +105,7 @@ testNewQueueRec g sndSecure = do (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g let qr = QueueRec - { recipientId = rId, - recipientKey, + { recipientKey, rcvDhSecret = C.dh' k pk, senderId, senderKey = Nothing, @@ -122,7 +121,7 @@ testGetQueue ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" @@ -157,14 +156,14 @@ testGetQueue ms = do (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 (Nothing, Nothing) <- tryDelPeekMsg ms q mId8 - void $ ExceptT $ deleteQueue ms rId q + void $ ExceptT $ deleteQueue ms q testChangeReadJournal :: STMQueueStore s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" (Msg "message 1", Nothing) <- tryDelPeekMsg ms q mId1 @@ -176,7 +175,7 @@ testChangeReadJournal ms = do (Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4 Just (Message {msgId = mId5}, True) <- write "message 5" (Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5 - void $ ExceptT $ deleteQueue ms rId q + void $ ExceptT $ deleteQueue ms q testExportImportStore :: JournalMsgStore s -> IO () testExportImportStore ms = do @@ -186,12 +185,12 @@ testExportImportStore ms = do sl <- readWriteQueueStore testStoreLogFile ms runRight_ $ do let write q s = writeMsg ms q True =<< mkMessage s - q1 <- ExceptT $ addQueue ms qr1 - liftIO $ logCreateQueue sl qr1 + q1 <- ExceptT $ addQueue ms rId1 qr1 + liftIO $ logCreateQueue sl rId1 qr1 Just (Message {}, True) <- write q1 "message 1" Just (Message {}, False) <- write q1 "message 2" - q2 <- ExceptT $ addQueue ms qr2 - liftIO $ logCreateQueue sl qr2 + q2 <- ExceptT $ addQueue ms rId2 qr2 + liftIO $ logCreateQueue sl rId2 qr2 Just (Message {msgId = mId3}, True) <- write q2 "message 3" Just (Message {msgId = mId4}, False) <- write q2 "message 4" (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q2 mId3 @@ -300,7 +299,7 @@ testMessageState ms = do write q s = writeMsg ms q True =<< mkMessage s mId1 <- runRight $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" liftIO $ closeMsgQueue q @@ -322,7 +321,7 @@ testReadFileMissing ms = do (rId, qr) <- testNewQueueRec g True let write q s = writeMsg ms q True =<< mkMessage s q <- runRight $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr Just (Message {}, True) <- write q "message 1" Msg "message 1" <- tryPeekMsg ms q pure q @@ -344,7 +343,7 @@ testReadFileMissingSwitch :: JournalMsgStore s -> IO () testReadFileMissingSwitch ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True - q <- writeMessages ms qr + q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {readState = rs} <- readTVarIO $ state mq @@ -362,7 +361,7 @@ testWriteFileMissing :: JournalMsgStore s -> IO () testWriteFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True - q <- writeMessages ms qr + q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {writeState = ws} <- readTVarIO $ state mq @@ -385,7 +384,7 @@ testReadAndWriteFilesMissing :: JournalMsgStore s -> IO () testReadAndWriteFilesMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True - q <- writeMessages ms qr + q <- writeMessages ms rId qr mq <- fromJust <$> readTVarIO (msgQueue_' q) MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq @@ -400,9 +399,9 @@ testReadAndWriteFilesMissing ms = do Msg "message 6" <- tryPeekMsg ms q' pure () -writeMessages :: JournalMsgStore s -> QueueRec -> IO (JournalQueue s) -writeMessages ms qr = runRight $ do - q <- ExceptT $ addQueue ms qr +writeMessages :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) +writeMessages ms rId qr = runRight $ do + q <- ExceptT $ addQueue ms rId qr let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 5de40f0ef..b95a4494b 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -57,38 +57,38 @@ storeLogTests = testSMPStoreLog ("SMP server store log, sndSecure = " <> show sndSecure) [ SLTC { name = "create new queue", - saved = [CreateQueue qr], - compacted = [CreateQueue qr], + saved = [CreateQueue rId qr], + compacted = [CreateQueue rId qr], state = M.fromList [(rId, qr)] }, SLTC { name = "secure queue", - saved = [CreateQueue qr, SecureQueue rId testPublicAuthKey], - compacted = [CreateQueue qr {senderKey = Just testPublicAuthKey}], + saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey], + compacted = [CreateQueue rId qr {senderKey = Just testPublicAuthKey}], state = M.fromList [(rId, qr {senderKey = Just testPublicAuthKey})] }, SLTC { name = "create and delete queue", - saved = [CreateQueue qr, DeleteQueue rId], + saved = [CreateQueue rId qr, DeleteQueue rId], compacted = [], state = M.fromList [] }, SLTC { name = "create queue and add notifier", - saved = [CreateQueue qr, AddNotifier rId ntfCreds], - compacted = [CreateQueue $ qr {notifier = Just ntfCreds}], + saved = [CreateQueue rId qr, AddNotifier rId ntfCreds], + compacted = [CreateQueue rId qr {notifier = Just ntfCreds}], state = M.fromList [(rId, qr {notifier = Just ntfCreds})] }, SLTC { name = "delete notifier", - saved = [CreateQueue qr, AddNotifier rId ntfCreds, DeleteNotifier rId], - compacted = [CreateQueue qr], + saved = [CreateQueue rId qr, AddNotifier rId ntfCreds, DeleteNotifier rId], + compacted = [CreateQueue rId qr], state = M.fromList [(rId, qr)] }, SLTC { name = "update time", - saved = [CreateQueue qr, UpdateTime rId date], - compacted = [CreateQueue qr {updatedAt = Just date}], + saved = [CreateQueue rId qr, UpdateTime rId date], + compacted = [CreateQueue rId qr {updatedAt = Just date}], state = M.fromList [(rId, qr {updatedAt = Just date})] } ]