diff --git a/simplexmq.cabal b/simplexmq.cabal index 309b67b83..9dd60595b 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.2.0.1 +version: 6.2.0.101 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and @@ -200,6 +200,7 @@ library Simplex.Messaging.Server.QueueStore.STM Simplex.Messaging.Server.Stats Simplex.Messaging.Server.StoreLog + Simplex.Messaging.Server.StoreLog.Types Simplex.Messaging.Notifications.Server Simplex.Messaging.Notifications.Server.Control Simplex.Messaging.Notifications.Server.Env diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 4635034fc..29cc5bf6a 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -181,6 +181,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira stopServer = do withFileLog closeStoreLog saveServerStats + logInfo "Server stopped" expireFilesThread_ :: XFTPServerConfig -> [M ()] expireFilesThread_ XFTPServerConfig {fileExpiration = Just fileExp} = [expireFiles fileExp] diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 81094e1a7..ca24a2318 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -115,10 +115,12 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg} started = do stopServer :: M () stopServer = do + logInfo "Saving server state..." saveServer NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber liftIO $ readTVarIO smpSubscribers >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (deRefWeak >=> mapM_ killThread)) liftIO $ closeSMPClientAgent smpAgent + logInfo "Server stopped" saveServer :: M () saveServer = withNtfLog closeStoreLog >> saveServerLastNtfs >> saveServerStats diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index d5d816bec..03234aad4 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -538,11 +538,13 @@ messageId :: Message -> MsgId messageId = \case Message {msgId} -> msgId MessageQuota {msgId} -> msgId +{-# INLINE messageId #-} messageTs :: Message -> SystemTime messageTs = \case Message {msgTs} -> msgTs MessageQuota {msgTs} -> msgTs +{-# INLINE messageTs #-} newtype EncRcvMsgBody = EncRcvMsgBody ByteString deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index ba91e50cf..b0e499d3f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -94,15 +94,14 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueueHandles) +import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue, closeMsgQueue) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo -import Simplex.Messaging.Server.QueueStore.STM as QS +import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -223,8 +222,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT saveServer :: Bool -> M () saveServer drainMsgs = do - withLog closeStoreLog - saveServerMessages drainMsgs + ams@(AMS _ ms) <- asks msgStore + liftIO $ saveServerMessages drainMsgs ams >> closeMsgStore ms saveServerNtfs saveServerStats @@ -236,10 +235,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT Server -> String -> (Server -> TQueue (QueueId, ClientId, Subscribed)) -> - (Server -> TMap QueueId (TVar Client)) -> - (Server -> TVar (IM.IntMap Client)) -> + (Server -> TMap QueueId (TVar AClient)) -> + (Server -> TVar (IM.IntMap AClient)) -> (Server -> TVar (IM.IntMap (NonEmpty (QueueId, Subscribed)))) -> - (Client -> TMap QueueId s) -> + (forall st. Client st -> TMap QueueId s) -> (s -> IO ()) -> M () serverThread s label subQ subs subClnts pendingEvts clientSubs unsub = do @@ -250,7 +249,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT $>>= endPreviousSubscriptions >>= mapM_ unsub where - updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), Client)) + updateSubscribers :: TVar (IM.IntMap (Maybe AClient)) -> (QueueId, ClientId, Subscribed) -> STM (Maybe ((QueueId, Subscribed), AClient)) updateSubscribers cls (qId, clntId, subscribed) = -- Client lookup by ID is in the same STM transaction. -- In case client disconnects during the transaction, @@ -273,17 +272,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT -- This case catches Just Nothing - it cannot happen here. -- Nothing is there only before client thread is started. _ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client - clientToBeNotified c' + clientToBeNotified ac@(AClient _ c') | clntId == clientId c' = pure Nothing - | otherwise = (\yes -> if yes then Just ((qId, subscribed), c') else Nothing) <$> readTVar (connected c') - endPreviousSubscriptions :: ((QueueId, Subscribed), Client) -> IO (Maybe s) - endPreviousSubscriptions (qEvt@(qId, _), c) = do + | otherwise = (\yes -> if yes then Just ((qId, subscribed), ac) else Nothing) <$> readTVar (connected c') + endPreviousSubscriptions :: ((QueueId, Subscribed), AClient) -> IO (Maybe s) + endPreviousSubscriptions (qEvt@(qId, _), ac@(AClient _ c)) = do atomically $ modifyTVar' (pendingEvts s) $ IM.alter (Just . maybe [qEvt] (qEvt <|)) (clientId c) atomically $ do sub <- TM.lookupDelete qId (clientSubs c) - removeWhenNoSubs c $> sub + removeWhenNoSubs ac $> sub -- remove client from server's subscribed cients - removeWhenNoSubs c = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c) + removeWhenNoSubs (AClient _ c) = whenM (null <$> readTVar (clientSubs c)) $ modifyTVar' (subClnts s) $ IM.delete (clientId c) deliverNtfsThread :: Server -> M () deliverNtfsThread Server {ntfSubClients} = do @@ -294,7 +293,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT threadDelay ntfInt readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats) where - deliverNtfs ns stats Client {clientId, ntfSubscriptions, sndQ, connected} = + deliverNtfs ns stats (AClient _ Client {clientId, ntfSubscriptions, sndQ, connected}) = whenM (currentClient readTVarIO) $ do subs <- readTVarIO ntfSubscriptions logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs" @@ -342,7 +341,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT ends <- atomically $ swapTVar ref IM.empty unless (null ends) $ forM_ (IM.assocs ends) $ \(cId, qEvts) -> mapM_ (queueEvts qEvts) . join . IM.lookup cId =<< readTVarIO cls - queueEvts qEvts c@Client {connected, sndQ = q} = + queueEvts qEvts (AClient _ c@Client {connected, sndQ = q}) = whenM (readTVarIO connected) $ do sent <- atomically $ ifM (isFullTBQueue q) (pure False) (writeTBQueue q ts $> True) if sent @@ -387,11 +386,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT liftIO $ forever $ do threadDelay' interval old <- expireBeforeEpoch expCfg - Sum deleted <- withActiveMsgQueues ms $ \_ -> expireQueueMsgs stats old + Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs stats old logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages" where - expireQueueMsgs stats old q = - runExceptT (deleteExpiredMsgs q True old) >>= \case + expireQueueMsgs stats old rId q = + runExceptT (deleteExpiredMsgs rId q True old) >>= \case Right deleted -> Sum deleted <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted) Left _ -> pure 0 @@ -422,8 +421,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats - QueueStore {queues, notifiers} <- asks queueStore - let interval = 1000000 * logInterval + AMS _ st <- asks msgStore + let queues = activeMsgQueues st + notifiers = notifiers' st + interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -612,7 +613,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPClients -> withAdminRole $ do active <- unliftIO u (asks clients) >>= readTVarIO hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions" - forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions} -> do + forM_ (IM.toList active) $ \(cid, cl) -> forM_ cl $ \(AClient _ Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do connected' <- bshow <$> readTVarIO connected rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt sndActiveAt' <- strEncode <$> readTVarIO sndActiveAt @@ -622,8 +623,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats - QueueStore {queues, notifiers} <- unliftIO u $ asks queueStore - let getStat :: (ServerStats -> IORef a) -> IO a + AMS _ st <- unliftIO u $ asks msgStore + let queues = activeMsgQueues st + notifiers = notifiers' st + getStat :: (ServerStats -> IORef a) -> IO a getStat var = readIORef (var ss) putStat :: Show a => String -> (ServerStats -> IORef a) -> IO () putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v @@ -745,25 +748,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT putSubscribedClients "SMP" subClients False putSubscribedClients "Ntf" ntfSubClients True where - putActiveClientsInfo :: String -> TMap QueueId (TVar Client) -> Bool -> IO () + putActiveClientsInfo :: String -> TMap QueueId (TVar AClient) -> Bool -> IO () putActiveClientsInfo protoName clients showIds = do activeSubs <- readTVarIO clients hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs) clnts <- countSubClients activeSubs hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "") where - countSubClients :: M.Map QueueId (TVar Client) -> IO IS.IntSet - countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId <$> readTVarIO c) IS.empty - putSubscribedClients :: String -> TVar (IM.IntMap Client) -> Bool -> IO () + countSubClients :: M.Map QueueId (TVar AClient) -> IO IS.IntSet + countSubClients = foldM (\ !s c -> (`IS.insert` s) . clientId' <$> readTVarIO c) IS.empty + putSubscribedClients :: String -> TVar (IM.IntMap AClient) -> Bool -> IO () putSubscribedClients protoName subClnts showIds = do clnts <- readTVarIO subClnts hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IM.size clnts) <> (if showIds then " " <> show (IM.keys clnts) else "") - countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe Client) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap (Maybe AClient) -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0)) where - addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe Client -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) + addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Maybe AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) addSubs acc Nothing = pure acc - addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just cl) = do + addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) (Just acl@(AClient _ cl)) = do subs <- readTVarIO $ subSel cl cnts' <- case countSubs_ of Nothing -> pure cnts @@ -772,11 +775,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT pure (c1 + c1', c2 + c2', c3 + c3', c4 + c4') let cnt = M.size subs clCnt' = if cnt == 0 then clCnt else clCnt + 1 - qs' <- if cnt == 0 then pure qs else addQueueLengths qs cl + qs' <- if cnt == 0 then pure qs else addQueueLengths qs acl pure (subCnt + cnt, cnts', clCnt', qs') - clientTBQueueLengths' :: Foldable t => t (Maybe Client) -> IO (Natural, Natural, Natural) + clientTBQueueLengths' :: Foldable t => t (Maybe AClient) -> IO (Natural, Natural, Natural) clientTBQueueLengths' = foldM (\acc -> maybe (pure acc) (addQueueLengths acc)) (0, 0, 0) - addQueueLengths (!rl, !sl, !ml) cl = do + addQueueLengths (!rl, !sl, !ml) (AClient _ cl) = do (rl', sl', ml') <- queueLengths cl pure (rl + rl', sl + sl', ml + ml') queueLengths Client {rcvQ, sndQ, msgQ} = do @@ -795,21 +798,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT SubPending -> (c1, c2 + 1, c3, c4) SubThread _ -> (c1, c2, c3 + 1, c4) ProhibitSub -> pure (c1, c2, c3, c4 + 1) - CPDelete queueId' -> withUserRole $ unliftIO u $ do - st <- asks queueStore - AMS _ ms <- asks msgStore - queueId <- liftIO (getQueue st SSender queueId') >>= \case - Left _ -> pure queueId' -- fallback to using as recipientId directly - Right QueueRec {recipientId} -> pure recipientId - r <- liftIO $ - deleteQueue st queueId $>>= \q -> - Right . (q,) <$> delMsgQueueSize ms queueId + CPDelete qId -> withUserRole $ unliftIO u $ do + AMS _ st <- asks msgStore + r <- liftIO $ runExceptT $ do + (q, qr) <- ExceptT (getQueueRec st SSender qId) `catchE` \_ -> ExceptT (getQueueRec st SRecipient qId) + ExceptT $ deleteQueueSize st (recipientId qr) q case r of - Left e -> liftIO . hPutStrLn h $ "error: " <> show e - Right (q, numDeleted) -> do - withLog (`logDeleteQueue` queueId) - updateDeletedStats q - liftIO . hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted" + Left e -> liftIO $ hPutStrLn h $ "error: " <> show e + Right (qr, numDeleted) -> do + updateDeletedStats qr + liftIO $ hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted" CPSave -> withAdminRole $ withLock' (savingLock srv) "control" $ do hPutStrLn h "saving server state..." unliftIO u $ saveServer False @@ -838,16 +836,19 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio nextClientId <- asks clientSeq clientId <- atomically $ stateTVar nextClientId $ \next -> (next, next + 1) atomically $ modifyTVar' active $ IM.insert clientId Nothing - c <- liftIO $ newClient clientId q thVersion sessionId ts - runClientThreads active c clientId `finally` clientDisconnected c + AMS msType ms <- asks msgStore + c <- liftIO $ newClient msType clientId q thVersion sessionId ts + runClientThreads msType ms active c clientId `finally` clientDisconnected c where - runClientThreads active c clientId = do - atomically $ modifyTVar' active $ IM.insert clientId $ Just c + runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M () + runClientThreads msType ms active c clientId = do + atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c) s <- asks server expCfg <- asks $ inactiveClientExpiration . config th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c s expCfg + raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams s ms c, receive h ms c] <> disconnectThread_ c s expCfg + disconnectThread_ :: Client s -> Server -> Maybe ExpirationConfig -> [M ()] disconnectThread_ c s (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c s)] disconnectThread_ _ _ _ = [] noSubscriptions Client {clientId} s = do @@ -856,7 +857,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio then pure False else not . IM.member clientId <$> readTVarIO (ntfSubClients s) -clientDisconnected :: Client -> M () +clientDisconnected :: Client s -> M () clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connected, sessionId, endThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disc" -- these can be in separate transactions, @@ -875,7 +876,7 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where - updateSubscribers :: M.Map QueueId a -> TMap QueueId (TVar Client) -> IO () + updateSubscribers :: M.Map QueueId a -> TMap QueueId (TVar AClient) -> IO () updateSubscribers subs srvSubs = forM_ (M.keys subs) $ \qId -> -- lookup of the subscribed client TVar can be in separate transaction, @@ -884,8 +885,8 @@ clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, connecte TM.lookupIO qId srvSubs >>= mapM_ (\c' -> atomically $ whenM (sameClientId c <$> readTVar c') $ TM.delete qId srvSubs) -sameClientId :: Client -> Client -> Bool -sameClientId Client {clientId} Client {clientId = cId'} = clientId == cId' +sameClientId :: Client s -> AClient -> Bool +sameClientId Client {clientId} (AClient _ Client {clientId = cId'}) = clientId == cId' cancelSub :: Sub -> IO () cancelSub s = case subThread s of @@ -895,8 +896,8 @@ cancelSub s = case subThread s of _ -> pure () ProhibitSub -> pure () -receive :: Transport c => THandleSMP c 'TServer -> Client -> M () -receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do +receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M () +receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" sa <- asks serverActive forever $ do @@ -909,7 +910,7 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv write sndQ errs write rcvQ cmds where - updateBatchStats :: ServerStats -> [(Maybe QueueRec, Transmission Cmd)] -> M () + updateBatchStats :: ServerStats -> [(Maybe (StoreQueue s, QueueRec), Transmission Cmd)] -> M () updateBatchStats stats = \case (_, (_, _, (Cmd _ cmd))) : _ -> do let sel_ = case cmd of @@ -920,14 +921,14 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv _ -> Nothing mapM_ (\sel -> incStat $ sel stats) sel_ [] -> pure () - cmdAction :: ServerStats -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) + cmdAction :: ServerStats -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe (StoreQueue s, QueueRec), Transmission Cmd)) cmdAction stats (tAuth, authorized, (corrId, entId, cmdOrError)) = case cmdOrError of Left e -> pure $ Left (corrId, entId, ERR e) - Right cmd -> verified =<< verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd + Right cmd -> verified =<< verifyTransmission ms ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd where verified = \case - VRVerified qr -> pure $ Right (qr, (corrId, entId, cmd)) + VRVerified q -> pure $ Right (q, (corrId, entId, cmd)) VRFailed -> do case cmd of Cmd _ SEND {} -> incStat $ msgSentAuth stats @@ -938,7 +939,7 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv pure $ Left (corrId, entId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty -send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +send :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO () send th c@Client {sndQ, msgQ, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" forever $ atomically (readTBQueue sndQ) >>= sendTransmissions @@ -963,12 +964,12 @@ send th c@Client {sndQ, msgQ, sessionId} = do MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) _ -> (msgs, t) -sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO () sendMsg th c@Client {msgQ, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg" forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t]) -tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO () +tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> NonEmpty (Transmission BrokerMsg) -> IO () tSend th Client {sndActiveAt} ts = do withMVar th $ \h@THandle {params} -> void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts @@ -987,7 +988,7 @@ disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcv ts <- max <$> readTVarIO rcvActiveAt <*> readTVarIO sndActiveAt if systemSeconds ts < old then closeConnection connection else loop -data VerificationResult = VRVerified (Maybe QueueRec) | VRFailed +data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFailed -- This function verifies queue command authorization, with the objective to have constant time between the three AUTH error scenarios: -- - the queue and party key exist, and the provided authorization has type matching queue key, but it is made with the different key. @@ -995,30 +996,29 @@ data VerificationResult = VRVerified (Maybe QueueRec) | VRFailed -- - the queue or party key do not exist. -- In all cases, the time of the verification should depend only on the provided authorization type, -- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result. -verifyTransmission :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M VerificationResult -verifyTransmission auth_ tAuth authorized queueId cmd = +verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s) +verifyTransmission ms auth_ tAuth authorized queueId cmd = case cmd of Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k - Cmd SRecipient _ -> verifyQueue (\q -> Just q `verifiedWith` recipientKey q) <$> get SRecipient + Cmd SRecipient _ -> verifyQueue (\q -> Just q `verifiedWith` recipientKey (snd q)) <$> get SRecipient -- SEND will be accepted without authorization before the queue is secured with KEY or SKEY command - Cmd SSender (SKEY k) -> verifyQueue (\q -> if maybe True (k ==) (senderKey q) then Just q `verifiedWith` k else dummyVerify) <$> get SSender - Cmd SSender SEND {} -> verifyQueue (\q -> Just q `verified` maybe (isNothing tAuth) verify (senderKey q)) <$> get SSender + Cmd SSender (SKEY k) -> verifyQueue (\q -> if maybe True (k ==) (senderKey $ snd q) then Just q `verifiedWith` k else dummyVerify) <$> get SSender + Cmd SSender SEND {} -> verifyQueue (\q -> Just q `verified` maybe (isNothing tAuth) verify (senderKey $ snd q)) <$> get SSender Cmd SSender PING -> pure $ VRVerified Nothing Cmd SSender RFWD {} -> pure $ VRVerified Nothing -- NSUB will not be accepted without authorization - Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (\n -> Just q `verifiedWith` notifierKey n) (notifier q)) <$> get SNotifier + Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (\n -> Just q `verifiedWith` notifierKey n) (notifier $ snd q)) <$> get SNotifier Cmd SProxiedClient _ -> pure $ VRVerified Nothing where verify = verifyCmdAuthorization auth_ tAuth authorized dummyVerify = verify (dummyAuthKey tAuth) `seq` VRFailed - verifyQueue :: (QueueRec -> VerificationResult) -> Either ErrorType QueueRec -> VerificationResult + verifyQueue :: ((StoreQueue s, QueueRec) -> VerificationResult s) -> Either ErrorType (StoreQueue s, QueueRec) -> VerificationResult s verifyQueue = either (const dummyVerify) verified q cond = if cond then VRVerified q else VRFailed + verifiedWith :: Maybe (StoreQueue s, QueueRec) -> C.APublicAuthKey -> VerificationResult s verifiedWith q k = q `verified` verify k - get :: DirectParty p => SParty p -> M (Either ErrorType QueueRec) - get party = do - st <- asks queueStore - liftIO $ getQueue st party queueId + get :: DirectParty p => SParty p -> M (Either ErrorType (StoreQueue s, QueueRec)) + get party = liftIO $ getQueueRec ms party queueId verifyCmdAuthorization :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> C.APublicAuthKey -> Bool verifyCmdAuthorization auth_ tAuth authorized key = maybe False (verify key) tAuth @@ -1065,7 +1065,7 @@ dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/Xo dummyKeyX25519 :: C.PublicKey 'C.X25519 dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg=" -forkClient :: Client -> String -> M () -> M () +forkClient :: Client s -> String -> M () -> M () forkClient Client {endThreads, endThreadSeq} label action = do tId <- atomically $ stateTVar endThreadSeq $ \next -> (next, next + 1) t <- forkIO $ do @@ -1073,13 +1073,17 @@ forkClient Client {endThreads, endThreadSeq} label action = do action `finally` atomically (modifyTVar' endThreads $ IM.delete tId) mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId -client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M () -client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, subscribers} = do - labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" - forever $ - atomically (readTBQueue rcvQ) - >>= mapM processCommand - >>= mapM_ reply . L.nonEmpty . catMaybes . L.toList +client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () +client + thParams' + Server {subscribedQ, ntfSubscribedQ, subscribers} + ms + clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" + forever $ + atomically (readTBQueue rcvQ) + >>= mapM processCommand + >>= mapM_ reply . L.nonEmpty . catMaybes . L.toList where reply :: MonadIO m => NonEmpty (Transmission BrokerMsg) -> m () reply = atomically . writeTBQueue sndQ @@ -1163,43 +1167,40 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Maybe (Transmission BrokerMsg)) - processCommand (qr_, (corrId, entId, cmd)) = case cmd of + processCommand :: (Maybe (StoreQueue s, QueueRec), Transmission Cmd) -> M (Maybe (Transmission BrokerMsg)) + processCommand (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> Just <$> case command of SKEY sKey -> - withQueue $ \QueueRec {sndSecure, recipientId} -> - (corrId,entId,) <$> if sndSecure then secureQueue_ "SKEY" recipientId sKey else pure $ ERR AUTH - SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody + withQueue $ \q QueueRec {sndSecure} -> + (corrId,entId,) <$> if sndSecure then secureQueue_ q sKey else pure $ ERR AUTH + SEND flags msgBody -> withQueue $ sendMessage flags msgBody PING -> pure (corrId, NoEntity, PONG) RFWD encBlock -> (corrId, NoEntity,) <$> processForwardedCommand encBlock Cmd SNotifier NSUB -> Just <$> subscribeNotifications - Cmd SRecipient command -> do - st <- asks queueStore + Cmd SRecipient command -> Just <$> case command of NEW rKey dhKey auth subMode sndSecure -> ifM allowNew - (createQueue st rKey dhKey subMode sndSecure) + (createQueue rKey dhKey subMode sndSecure) (pure (corrId, entId, ERR AUTH)) where allowNew = do ServerConfig {allowNewQueues, newQueueBasicAuth} <- asks config pure $ allowNewQueues && maybe True ((== auth) . Just) newQueueBasicAuth - SUB -> withQueue (`subscribeQueue` entId) + SUB -> withQueue subscribeQueue GET -> withQueue getMessage - ACK msgId -> withQueue (`acknowledgeMsg` msgId) - KEY sKey -> - withQueue $ \QueueRec {recipientId} -> - (corrId,entId,) <$> secureQueue_ "KEY" recipientId sKey - NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey - NDEL -> deleteQueueNotifier_ st - OFF -> suspendQueue_ st - DEL -> delQueueAndMsgs st - QUE -> withQueue $ fmap (corrId,entId,) . getQueueInfo + ACK msgId -> withQueue $ acknowledgeMsg msgId + KEY sKey -> withQueue $ \q _ -> (corrId,entId,) <$> secureQueue_ q sKey + NKEY nKey dhKey -> withQueue $ \q _ -> addQueueNotifier_ q nKey dhKey + NDEL -> withQueue $ \q _ -> deleteQueueNotifier_ q + OFF -> maybe (pure $ err INTERNAL) suspendQueue_ q_ + DEL -> maybe (pure $ err INTERNAL) delQueueAndMsgs q_ + QUE -> withQueue $ \q qr -> (corrId,entId,) <$> getQueueInfo q qr where - createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg) - createQueue st recipientKey dhKey subMode sndSecure = time "NEW" $ do + createQueue :: RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg) + createQueue recipientKey dhKey subMode sndSecure = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random updatedAt <- Just <$> liftIO getSystemDate let rcvDhSecret = C.dh' dhKey privDhKey @@ -1222,20 +1223,18 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> ((RecipientId, SenderId) -> QueueRec) -> M BrokerMsg addQueueRetry 0 _ _ = pure $ ERR INTERNAL addQueueRetry n qik qRec = do - ids@(rId, _) <- getIds - -- create QueueRec record with these ids and keys + ids <- getIds let qr = qRec ids - liftIO (addQueue st qr) >>= \case + liftIO (addQueue ms qr) >>= \case Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec Left e -> pure $ ERR e - Right () -> do - withLog (`logCreateQueue` qr) + Right q -> do stats <- asks serverStats incStat $ qCreated stats incStat $ qCount stats case subMode of SMOnlyCreate -> pure () - SMSubscribe -> void $ subscribeQueue qr rId + SMSubscribe -> void $ subscribeQueue q qr pure $ IDS (qik ids) getIds :: M (RecipientId, SenderId) @@ -1243,16 +1242,17 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s n <- asks $ queueIdBytes . config liftM2 (,) (randomId n) (randomId n) - secureQueue_ :: T.Text -> RecipientId -> SndPublicAuthKey -> M BrokerMsg - secureQueue_ name rId sKey = time name $ do - withLog $ \s -> logSecureQueue s rId sKey - st <- asks queueStore - stats <- asks serverStats - incStat $ qSecured stats - liftIO $ either ERR (const OK) <$> secureQueue st rId sKey + secureQueue_ :: StoreQueue s -> SndPublicAuthKey -> M BrokerMsg + secureQueue_ q sKey = do + liftIO (secureQueue ms q sKey) >>= \case + Left e -> pure $ ERR e + Right () -> do + stats <- asks serverStats + incStat $ qSecured stats + pure OK - addQueueNotifier_ :: QueueStore -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg) - addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do + addQueueNotifier_ :: StoreQueue s -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg) + addQueueNotifier_ q notifierKey dhKey = time "NKEY" $ do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,entId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -1262,19 +1262,17 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s addNotifierRetry n rcvPublicDhKey rcvNtfDhSecret = do notifierId <- randomId =<< asks (queueIdBytes . config) let ntfCreds = NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} - liftIO (addQueueNotifier st entId ntfCreds) >>= \case + liftIO (addQueueNotifier ms q ntfCreds) >>= \case Left DUPLICATE_ -> addNotifierRetry (n - 1) rcvPublicDhKey rcvNtfDhSecret Left e -> pure $ ERR e Right nId_ -> do - withLog $ \s -> logAddNotifier s entId ntfCreds incStat . ntfCreated =<< asks serverStats forM_ nId_ $ \nId -> atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) pure $ NID notifierId rcvPublicDhKey - deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg) - deleteQueueNotifier_ st = do - withLog (`logDeleteNotifier` entId) - liftIO (deleteQueueNotifier st entId) >>= \case + deleteQueueNotifier_ :: StoreQueue s -> M (Transmission BrokerMsg) + deleteQueueNotifier_ q = + liftIO (deleteQueueNotifier ms q) >>= \case Right (Just nId) -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it stats <- asks serverStats @@ -1286,13 +1284,11 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Right Nothing -> pure ok Left e -> pure $ err e - suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg) - suspendQueue_ st = do - withLog (`logSuspendQueue` entId) - okResp <$> liftIO (suspendQueue st entId) + suspendQueue_ :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg) + suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue ms q - subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg) - subscribeQueue qr rId = + subscribeQueue :: StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) + subscribeQueue q qr = atomically (TM.lookup rId subscriptions) >>= \case Nothing -> newSub >>= deliver True Just s@Sub {subThread} -> do @@ -1306,6 +1302,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s incStat $ qSubDuplicate stats atomically (tryTakeTMVar $ delivered s) >> deliver False s where + rId = recipientId qr newSub :: M Sub newSub = time "SUB newSub" . atomically $ do writeTQueue subscribedQ (rId, clientId, True) @@ -1314,17 +1311,15 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s pure sub deliver :: Bool -> Sub -> M (Transmission BrokerMsg) deliver inc sub = do - AMS _ ms <- asks msgStore stats <- asks serverStats fmap (either (\e -> (corrId, rId, ERR e)) id) $ liftIO $ runExceptT $ do - q <- getMsgQueue ms rId - msg_ <- tryPeekMsg q + msg_ <- tryPeekMsg ms rId q liftIO $ when (inc && isJust msg_) $ incStat (qSub stats) liftIO $ deliverMessage "SUB" qr rId sub msg_ -- clients that use GET are not added to server subscribers - getMessage :: QueueRec -> M (Transmission BrokerMsg) - getMessage qr = time "GET" $ do + getMessage :: StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) + getMessage q qr = time "GET" $ do atomically (TM.lookup entId subscriptions) >>= \case Nothing -> atomically newSub >>= (`getMessage_` Nothing) @@ -1350,29 +1345,23 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s pure s getMessage_ :: Sub -> Maybe MsgId -> M (Transmission BrokerMsg) getMessage_ s delivered_ = do - AMS _ ms <- asks msgStore stats <- asks serverStats - fmap (either err id) $ liftIO $ runExceptT $ do - q <- getMsgQueue ms entId - tryPeekMsg q >>= \case + fmap (either err id) $ liftIO $ runExceptT $ + tryPeekMsg ms (recipientId qr) q >>= \case Just msg -> do let encMsg = encryptMsg qr msg incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg) Nothing -> incStat (msgGetNoMsg stats) $> ok - withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) - withQueue action = case qr_ of - Just qr -> updateQueueDate qr >> action qr + withQueue :: (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) + withQueue action = case q_ of Nothing -> pure $ err INTERNAL - - updateQueueDate :: QueueRec -> M () - updateQueueDate QueueRec {updatedAt, recipientId = rId} = do - t <- liftIO getSystemDate - when (Just t /= updatedAt) $ do - withLog $ \s -> logUpdateQueueTime s rId t - st <- asks queueStore - liftIO $ updateQueueTime st rId t + Just (q, qr@QueueRec {updatedAt}) -> do + t <- liftIO getSystemDate + if updatedAt == Just t + then action q qr + else liftIO (updateQueueTime ms q t) >>= either (pure . err) (action q) subscribeNotifications :: M (Transmission BrokerMsg) subscribeNotifications = do @@ -1389,24 +1378,22 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s writeTQueue ntfSubscribedQ (entId, clientId, True) TM.insert entId () ntfSubscriptions - acknowledgeMsg :: QueueRec -> MsgId -> M (Transmission BrokerMsg) - acknowledgeMsg qr msgId = time "ACK" $ do + acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) + acknowledgeMsg msgId q qr = time "ACK" $ do liftIO (TM.lookupIO entId subscriptions) >>= \case Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case Just st -> do - AMS _ ms <- asks msgStore stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ do - q <- getMsgQueue ms entId case st of ProhibitSub -> do - deletedMsg_ <- tryDelMsg q msgId + deletedMsg_ <- tryDelMsg ms (recipientId qr) q msgId liftIO $ mapM_ (updateStats stats True) deletedMsg_ pure ok _ -> do - (deletedMsg_, msg_) <- tryDelPeekMsg q msgId + (deletedMsg_, msg_) <- tryDelPeekMsg ms (recipientId qr) q msgId liftIO $ mapM_ (updateStats stats False) deletedMsg_ liftIO $ deliverMessage "ACK" qr entId sub msg_ _ -> pure $ err NO_MSG @@ -1436,8 +1423,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s incStat $ msgRecvNtf stats updatePeriodStats (activeQueuesNtf stats) entId - sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) - sendMessage qr msgFlags msgBody + sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M (Transmission BrokerMsg) + sendMessage msgFlags msgBody q qr | B.length msgBody > maxMessageLength thVersion = do stats <- asks serverStats incStat $ msgSentLarge stats @@ -1452,14 +1439,12 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s case C.maxLenBS msgBody of Left _ -> pure $ err LARGE_MSG Right body -> do - AMS _ ms <- asks msgStore ServerConfig {messageExpiration, msgIdBytes} <- asks config msgId <- randomId' msgIdBytes msg_ <- liftIO $ time "SEND" $ runExceptT $ do - q <- getMsgQueue ms $ recipientId qr - expireMessages q messageExpiration stats + expireMessages messageExpiration stats msg <- liftIO $ mkMessage msgId body - writeMsg ms q True msg + writeMsg ms (recipientId qr) q True msg case msg_ of Left e -> pure $ err e Right Nothing -> do @@ -1482,9 +1467,9 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s msgTs <- getSystemTime pure $! Message msgId msgTs msgFlags body - expireMessages :: MsgStoreClass s => MsgQueue s -> Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO () - expireMessages q msgExp stats = do - deleted <- maybe (pure 0) (deleteExpiredMsgs q True <=< liftIO . expireBeforeEpoch) msgExp + expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO () + expireMessages msgExp stats = do + deleted <- maybe (pure 0) (deleteExpiredMsgs (recipientId qr) q True <=< liftIO . expireBeforeEpoch) msgExp liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) -- The condition for delivery of the message is: @@ -1509,7 +1494,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s -- and delivery is cancelled - -- the new client will receive message in response to SUB. (TM.lookup rId subscribers >>= mapM readTVar) - $>>= \rc@Client {subscriptions = subs, sndQ = q} -> TM.lookup rId subs + $>>= \rc@(AClient _ Client {subscriptions = subs, sndQ = sndQ'}) -> TM.lookup rId subs $>>= \s@Sub {subThread, delivered} -> case subThread of ProhibitSub -> pure Nothing ServerSub st -> readTVar st >>= \case @@ -1518,15 +1503,15 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Just _ -> pure Nothing -- if a message was already delivered, should not deliver more Nothing -> ifM - (isFullTBQueue q) + (isFullTBQueue sndQ') (writeTVar st SubPending $> Just (rc, s, st)) - (deliver q s $> Nothing) + (deliver sndQ' s $> Nothing) _ -> pure Nothing - deliver q s = do + deliver sndQ' s = do let encMsg = encryptMsg qr msg - writeTBQueue q [(CorrId "", rId, MSG encMsg)] + writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)] void $ setDelivered s msg - forkDeliver (rc@Client {sndQ = q}, s@Sub {delivered}, st) = do + forkDeliver ((AClient _ rc@Client {sndQ = sndQ'}), s@Sub {delivered}, st) = do t <- mkWeakThreadId =<< forkIO deliverThread atomically $ modifyTVar' st $ \case -- this case is needed because deliverThread can exit before it @@ -1544,7 +1529,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do -- a separate thread is needed because it blocks when client sndQ is full. - deliver q s + deliver sndQ' s writeTVar st NoSub enqueueNotification :: NtfCreds -> Message -> M () @@ -1601,12 +1586,12 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s incStat $ pMsgFwdsRecv stats pure $ RRES r3 where - rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) + rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe (StoreQueue s, QueueRec), Transmission Cmd)) rejectOrVerify clntThAuth (tAuth, authorized, (corrId', entId', cmdOrError)) = case cmdOrError of Left e -> pure $ Left (corrId', entId', ERR e) Right cmd' - | allowed -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId')) <$> clntThAuth) tAuth authorized entId' cmd' + | allowed -> verified <$> verifyTransmission ms ((,C.cbNonce (bs corrId')) <$> clntThAuth) tAuth authorized entId' cmd' | otherwise -> pure $ Left (corrId', entId', ERR $ CMD PROHIBITED) where allowed = case cmd' of @@ -1614,7 +1599,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Cmd SSender (SKEY _) -> True _ -> False verified = \case - VRVerified qr -> Right (qr, (corrId', entId', cmd')) + VRVerified q -> Right (q, (corrId', entId', cmd')) VRFailed -> Left (corrId', entId', ERR AUTH) deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg) deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $ @@ -1644,37 +1629,33 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s setDelivered :: Sub -> Message -> STM Bool setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg - delQueueAndMsgs :: QueueStore -> M (Transmission BrokerMsg) - delQueueAndMsgs st = do - withLog (`logDeleteQueue` entId) - AMS _ ms <- asks msgStore - liftIO (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case - Right q -> do + delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg) + delQueueAndMsgs (q, _) = do + liftIO (deleteQueue ms entId q) >>= \case + Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ do writeTQueue subscribedQ (entId, clientId, False) -- queue is usually deleted by the same client that is currently subscribed, -- we delete subscription here, so the client with no subscriptions can be disconnected. TM.delete entId subscriptions - forM_ (notifierId <$> notifier q) $ \nId -> do + forM_ (notifierId <$> notifier qr) $ \nId -> do -- queue is deleted by a different client from the one subscribed to notifications, -- so we don't need to remove subscription from the current client. stats <- asks serverStats deleted <- asks ntfStore >>= liftIO . (`deleteNtfs` nId) when (deleted > 0) $ liftIO $ atomicModifyIORef'_ (ntfCount stats) (subtract deleted) atomically $ writeTQueue ntfSubscribedQ (nId, clientId, False) - updateDeletedStats q + updateDeletedStats qr pure ok Left e -> pure $ err e - getQueueInfo :: QueueRec -> M BrokerMsg - getQueueInfo QueueRec {senderKey, notifier} = do - AMS _ ms <- asks msgStore + getQueueInfo :: StoreQueue s -> QueueRec -> M BrokerMsg + getQueueInfo q QueueRec {recipientId = rId, senderKey, notifier} = do fmap (either ERR id) $ liftIO $ runExceptT $ do - q <- getMsgQueue ms entId qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub - qiSize <- liftIO $ getQueueSize q - qiMsg <- toMsgInfo <$$> tryPeekMsg q + qiSize <- getQueueSize ms rId q + qiMsg <- toMsgInfo <$$> tryPeekMsg ms rId q let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} pure $ INFO info where @@ -1696,9 +1677,6 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s err :: ErrorType -> Transmission BrokerMsg err e = (corrId, entId, ERR e) - okResp :: Either ErrorType () -> Transmission BrokerMsg - okResp = either err $ const ok - updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do stats <- asks serverStats @@ -1711,11 +1689,6 @@ incStat :: MonadIO m => IORef Int -> m () incStat r = liftIO $ atomicModifyIORef'_ r (+ 1) {-# INLINE incStat #-} -withLog :: (StoreLog 'WriteMode -> IO a) -> M () -withLog action = do - env <- ask - liftIO . mapM_ action $ storeLog (env :: Env) - timed :: MonadIO m => T.Text -> RecipientId -> m a -> m a timed name (EntityId qId) a = do t <- liftIO getSystemTime @@ -1735,15 +1708,12 @@ randomId :: Int -> M EntityId randomId = fmap EntityId . randomId' {-# INLINE randomId #-} -saveServerMessages :: Bool -> M () -saveServerMessages drainMsgs = - asks msgStore >>= \case - AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of - Just f -> liftIO $ exportMessages False ms f drainMsgs - Nothing -> logInfo "undelivered messages are not saved" - AMS SMSJournal ms -> do - liftIO $ closeMsgStore ms - logInfo "closed journal message storage" +saveServerMessages :: Bool -> AMsgStore -> IO () +saveServerMessages drainMsgs = \case + AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of + Just f -> exportMessages False ms f drainMsgs + Nothing -> logInfo "undelivered messages are not saved" + AMS SMSJournal _ -> logInfo "closed journal message storage" exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO () exportMessages tty ms f drainMsgs = do @@ -1755,7 +1725,12 @@ exportMessages tty ms f drainMsgs = do logError $ "error exporting messages: " <> tshow e exitFailure where - saveQueueMsgs h rId q = getQueueMessages drainMsgs q >>= \msgs -> Sum (length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs) + saveQueueMsgs h rId q = + runExceptT (getQueueMessages drainMsgs ms rId q) >>= \case + Right msgs -> Sum (length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs) + Left e -> do + logError $ "STORE: saveQueueMsgs, error exporting messages from queue " <> decodeLatin1 (strEncode rId) <> ", " <> tshow e + exitFailure encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') processServerMessages :: M MessageStats @@ -1769,33 +1744,40 @@ processServerMessages = do AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats) Nothing -> pure newMessageStats - AMS SMSJournal ms | expire -> case old_ of - Just old -> do - logInfo "expiring journal store messages..." - withAllMsgQueues False ms $ \_ -> processExpireQueue old - Nothing -> do - logInfo "validating journal store messages..." - withAllMsgQueues False ms $ \_ -> processValidateQueue - AMS SMSJournal _ -> logWarn "skipping message expiration" $> newMessageStats - where - processExpireQueue old q = - runExceptT expireQueue >>= \case - Right (storedMsgsCount, expiredMsgsCount) -> - pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1} - Left e -> do - logError $ "failed expiring messages in queue " <> T.pack (queueDirectory $ queue q) <> ": " <> tshow e - exitFailure - where - expireQueue = do - expired'' <- deleteExpiredMsgs q False old - stored'' <- liftIO $ getQueueSize q - liftIO $ logQueueState q - liftIO $ closeMsgQueueHandles q - pure (stored'', expired'') - processValidateQueue q = - getQueueSize q >>= \storedMsgsCount -> pure mempty {storedMsgsCount, storedQueues = 1} + AMS SMSJournal ms + | expire -> case old_ of + Just old -> do + logInfo "expiring journal store messages..." + withAllMsgQueues False ms $ processExpireQueue old + Nothing -> do + logInfo "validating journal store messages..." + withAllMsgQueues False ms $ processValidateQueue + | otherwise -> logWarn "skipping message expiration" $> newMessageStats + where + processExpireQueue old rId q = + runExceptT expireQueue >>= \case + Right (storedMsgsCount, expiredMsgsCount) -> + pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1} + Left e -> do + logError $ "STORE: processExpireQueue, failed expiring messages in queue, " <> tshow e + exitFailure + where + expireQueue = do + expired'' <- deleteExpiredMsgs rId q False old + stored'' <- getQueueSize ms rId q + liftIO $ logQueueState q + liftIO $ closeMsgQueue q + pure (stored'', expired'') + processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats + processValidateQueue rId q = + runExceptT (getQueueSize ms rId q) >>= \case + Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1} + Left e -> do + logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e + exitFailure -importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats +-- TODO this function should be called after importing queues from store log +importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats importMessages tty ms f old_ = do logInfo $ "restoring messages from file " <> T.pack f LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case @@ -1812,7 +1794,7 @@ importMessages tty ms f old_ = do pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} where progress i = "Processed " <> show i <> " lines" - restoreMsg :: (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s))) -> LB.ByteString -> ExceptT String IO (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s))) + restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> LB.ByteString -> ExceptT String IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) restoreMsg (!i, q_, (!stored, !expired, !overQuota)) s' = do when (tty && i `mod` 1000 == 0) $ liftIO $ putStr (progress i <> "\r") >> hFlush stdout MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s @@ -1823,11 +1805,11 @@ importMessages tty ms f old_ = do q <- case q_ of -- to avoid lookup when restoring the next message to the same queue Just (rId', q') | rId' == rId -> pure q' - _ -> getMsgQueue ms rId + _ -> ExceptT $ getQueue ms SRecipient rId (i + 1,Just (rId, q),) <$> case msg of Message {msgTs} | maybe True (systemSeconds msgTs >=) old_ -> do - writeMsg ms q False msg >>= \case + writeMsg ms rId q False msg >>= \case Just _ -> pure (stored + 1, expired, overQuota) Nothing -> do logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) @@ -1836,12 +1818,12 @@ importMessages tty ms f old_ = do MessageQuota {} -> -- queue was over quota at some point, -- it will be set as over quota once fully imported - mergeQuotaMsgs >> writeMsg ms q False msg $> (stored, expired, M.insert rId q overQuota) + mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota) where -- if the first message in queue head is "quota", remove it. - mergeQuotaMsgs = isolateQueue q "mergeQuotaMsgs" $ - tryPeekMsg_ q >>= \case - Just MessageQuota {} -> tryDeleteMsg_ q False + mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq -> + tryPeekMsg_ mq >>= \case + Just MessageQuota {} -> tryDeleteMsg_ mq False _ -> pure () msgErr :: Show e => String -> e -> String msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s) @@ -1916,7 +1898,8 @@ restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= liftIO (strDecode <$> B.readFile f) >>= \case Right d@ServerStatsData {_qCount = statsQCount, _msgCount = statsMsgCount, _ntfCount = statsNtfCount} -> do s <- asks serverStats - _qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore + AMS _ st <- asks msgStore + _qCount <- M.size <$> readTVarIO (activeMsgQueues st) let _msgCount = storedMsgsCount msgStats _ntfCount = storedMsgsCount ntfStats liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgsCount msgStats, _msgNtfExpired = _msgNtfExpired d + expiredMsgsCount ntfStats} diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index c46519d90..5e60757c1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -43,7 +43,7 @@ import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore -import Simplex.Messaging.Server.QueueStore (QueueRec (..)) +import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog @@ -165,17 +165,15 @@ data Env = Env serverInfo :: ServerInformation, server :: Server, serverIdentity :: KeyHash, - queueStore :: QueueStore, msgStore :: AMsgStore, ntfStore :: NtfStore, random :: TVar ChaChaDRG, - storeLog :: Maybe (StoreLog 'WriteMode), tlsServerCreds :: T.Credential, httpServerCreds :: Maybe T.Credential, serverStats :: ServerStats, sockets :: TVar [(ServiceName, SocketState)], clientSeq :: TVar ClientId, - clients :: TVar (IntMap (Maybe Client)), + clients :: TVar (IntMap (Maybe AClient)), proxyAgent :: ProxyAgent -- senders served on this proxy } @@ -183,9 +181,9 @@ type family MsgStore s where MsgStore 'MSMemory = STMMsgStore MsgStore 'MSJournal = JournalMsgStore -data AMsgStore = forall s. MsgStoreClass (MsgStore s) => AMS (SMSType s) (MsgStore s) +data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s) -data AMsgQueue = forall s. MsgStoreClass (MsgStore s) => AMQ (SMSType s) (MsgQueue (MsgStore s)) +data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s)) data AMsgStoreCfg = forall s. MsgStoreClass (MsgStore s) => AMSC (SMSType s) (MsgStoreConfig (MsgStore s)) @@ -197,11 +195,11 @@ type Subscribed = Bool data Server = Server { subscribedQ :: TQueue (RecipientId, ClientId, Subscribed), - subscribers :: TMap RecipientId (TVar Client), + subscribers :: TMap RecipientId (TVar AClient), ntfSubscribedQ :: TQueue (NotifierId, ClientId, Subscribed), - notifiers :: TMap NotifierId (TVar Client), - subClients :: TVar (IntMap Client), -- clients with SMP subscriptions - ntfSubClients :: TVar (IntMap Client), -- clients with Ntf subscriptions + notifiers :: TMap NotifierId (TVar AClient), + subClients :: TVar (IntMap AClient), -- clients with SMP subscriptions + ntfSubClients :: TVar (IntMap AClient), -- clients with Ntf subscriptions pendingSubEvents :: TVar (IntMap (NonEmpty (RecipientId, Subscribed))), pendingNtfSubEvents :: TVar (IntMap (NonEmpty (NotifierId, Subscribed))), savingLock :: Lock @@ -213,11 +211,16 @@ newtype ProxyAgent = ProxyAgent type ClientId = Int -data Client = Client +data AClient = forall s. MsgStoreClass (MsgStore s) => AClient (SMSType s) (Client (MsgStore s)) + +clientId' :: AClient -> ClientId +clientId' (AClient _ Client {clientId}) = clientId + +data Client s = Client { clientId :: ClientId, subscriptions :: TMap RecipientId Sub, ntfSubscriptions :: TMap NotifierId (), - rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), + rcvQ :: TBQueue (NonEmpty (Maybe (StoreQueue s, QueueRec), Transmission Cmd)), sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), procThreads :: TVar Int, @@ -253,8 +256,8 @@ newServer = do savingLock <- createLockIO return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, subClients, ntfSubClients, pendingSubEvents, pendingNtfSubEvents, savingLock} -newClient :: ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO Client -newClient clientId qSize thVersion sessionId createdAt = do +newClient :: SMSType s -> ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> IO (Client (MsgStore s)) +newClient _msType clientId qSize thVersion sessionId createdAt = do subscriptions <- TM.emptyIO ntfSubscriptions <- TM.emptyIO rcvQ <- newTBQueueIO qSize @@ -283,8 +286,7 @@ newEnv :: ServerConfig -> IO Env newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do serverActive <- newTVarIO True server <- newServer - queueStore <- newQueueStore - msgStore <- case msgStoreType of + msgStore@(AMS _ store) <- case msgStoreType of AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of Just storePath -> @@ -293,10 +295,10 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO random <- C.newRandom - storeLog <- - forM storeLogFile $ \f -> do - logInfo $ "restoring queues from file " <> T.pack f - readWriteQueueStore f queueStore + forM_ storeLogFile $ \f -> do + logInfo $ "restoring queues from file " <> T.pack f + sl <- readWriteQueueStore f store + setStoreLog store sl tlsServerCreds <- getCredentials "SMP" smpCredentials httpServerCreds <- mapM (getCredentials "HTTPS") httpCredentials mapM_ checkHTTPSCredentials httpServerCreds @@ -307,7 +309,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt clientSeq <- newTVarIO 0 clients <- newTVarIO mempty proxyAgent <- newSMPProxyAgent smpAgentCfg random - pure Env {serverActive, config, serverInfo, server, serverIdentity, queueStore, msgStore, ntfStore, random, storeLog, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent} + pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent} where getCredentials protocol creds = do files <- missingCreds @@ -351,3 +353,6 @@ newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random pure ProxyAgent {smpAgent} + +readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode) +readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 0251b6705..79709d3eb 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -46,10 +46,11 @@ import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Information import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..)) import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore) +import Simplex.Messaging.Server.QueueStore.STM (readQueueStore) import Simplex.Messaging.Transport (batchCmdsSMPVersion, sendingProxySMPVersion, simplexMQVersion, supportedServerSMPRelayVRange) import Simplex.Messaging.Transport.Client (SocksProxy, TransportHost (..), defaultSocksProxy) import Simplex.Messaging.Transport.Server (ServerCredentials (..), TransportServerConfig (..), defaultTransportServerConfig) -import Simplex.Messaging.Util (eitherToMaybe, safeDecodeUtf8, tshow) +import Simplex.Messaging.Util (eitherToMaybe, ifM, safeDecodeUtf8, tshow) import Simplex.Messaging.Version (mkVersionRange) import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist) import System.Exit (exitFailure) @@ -85,6 +86,14 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = Journal cmd -> withIniFile $ \ini -> do msgsDirExists <- doesDirectoryExist storeMsgsJournalDir msgsFileExists <- doesFileExist storeMsgsFilePath + let enableStoreLog = settingIsOn "STORE_LOG" "enable" ini + storeLogFile <- case enableStoreLog $> storeLogFilePath of + Just storeLogFile -> do + ifM + (doesFileExist storeLogFile) + (pure storeLogFile) + (putStrLn ("Store log file " <> storeLogFile <> " not found") >> exitFailure) + Nothing -> putStrLn "Store log disabled, see `[STORE_LOG] enable`" >> exitFailure case cmd of JCImport | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage @@ -99,6 +108,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir) "Messages not imported" ms <- newJournalMsgStore + readQueueStore storeLogFile ms msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration putStrLn "Import completed" printMessageStats "Messages" msgStats @@ -116,6 +126,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) "Journal not exported" ms <- newJournalMsgStore + readQueueStore storeLogFile ms exportMessages True ms storeMsgsFilePath False putStrLn "Export completed" putStrLn $ case readMsgStoreType ini of diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 44297bf37..0310a5991 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -11,11 +11,13 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} module Simplex.Messaging.Server.MsgStore.Journal - ( JournalMsgStore (msgQueues, random), - JournalMsgQueue (queue), + ( JournalMsgStore (queues, senders, notifiers, random), + JournalQueue, + JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), JournalStoreConfig (..), getQueueMessages, @@ -32,6 +34,7 @@ module Simplex.Messaging.Server.MsgStore.Journal newJournalId, appendState, queueLogFileName, + journalFilePath, logFileExt, ) where @@ -42,13 +45,11 @@ import Control.Logger.Simple import Control.Monad import Control.Monad.Trans.Except import qualified Data.Attoparsec.ByteString.Char8 as A -import Data.Bitraversable (bimapM) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate) -import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -57,11 +58,14 @@ import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (getMapLock, withLockMap) import Simplex.Messaging.Agent.Lock import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (ErrorType (..), Message (..), RecipientId) +import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types +import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (ifM, tshow, ($>>=)) +import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>)) import System.Directory import System.Exit import System.FilePath (()) @@ -73,7 +77,10 @@ data JournalMsgStore = JournalMsgStore { config :: JournalStoreConfig, random :: TVar StdGen, queueLocks :: TMap RecipientId Lock, - msgQueues :: TMap RecipientId JournalMsgQueue + queues :: TMap RecipientId JournalQueue, + senders :: TMap SenderId RecipientId, + notifiers :: TMap NotifierId RecipientId, + storeLog :: TVar (Maybe (StoreLog 'WriteMode)) } data JournalStoreConfig = JournalStoreConfig @@ -88,9 +95,16 @@ data JournalStoreConfig = JournalStoreConfig stateTailSize :: Int } +data JournalQueue = JournalQueue + { queueLock :: Lock, + -- To avoid race conditions and errors when restoring queues, + -- Nothing is written to TVar when queue is deleted. + queueRec :: TVar (Maybe QueueRec), + msgQueue_ :: TVar (Maybe JournalMsgQueue) + } + data JMQueue = JMQueue { queueDirectory :: FilePath, - queueLock :: Lock, statePath :: FilePath } @@ -198,8 +212,21 @@ logFileExt = ".log" newtype StoreIO a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) +instance STMQueueStore JournalMsgStore where + queues' = queues + senders' = senders + notifiers' = notifiers + storeLog' = storeLog + mkQueue st qr = do + lock <- getMapLock (queueLocks st) $ recipientId qr + q <- newTVar $! Just qr + mq <- newTVar Nothing + pure $ JournalQueue lock q mq + msgQueue_' = msgQueue_ + instance MsgStoreClass JournalMsgStore where type StoreMonad JournalMsgStore = StoreIO + type StoreQueue JournalMsgStore = JournalQueue type MsgQueue JournalMsgStore = JournalMsgQueue type MsgStoreConfig JournalMsgStore = JournalStoreConfig @@ -207,39 +234,51 @@ instance MsgStoreClass JournalMsgStore where newMsgStore config = do random <- newTVarIO =<< newStdGen queueLocks <- TM.emptyIO - msgQueues <- TM.emptyIO - pure JournalMsgStore {config, random, queueLocks, msgQueues} + queues <- TM.emptyIO + senders <- TM.emptyIO + notifiers <- TM.emptyIO + storeLog <- newTVarIO Nothing + pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog} - closeMsgStore st = atomically (swapTVar (msgQueues st) M.empty) >>= mapM_ closeMsgQueueHandles + setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO () + setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) - activeMsgQueues = msgQueues + closeMsgStore st = do + readTVarIO (storeLog st) >>= mapM_ closeStoreLog + readTVarIO (queues st) >>= mapM_ closeMsgQueue + + activeMsgQueues = queues {-# INLINE activeMsgQueues #-} -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. -- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID. -- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name. - withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalMsgQueue -> IO a) -> IO a + withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalQueue -> IO a) -> IO a withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty) where processStore = do - closeMsgStore ms - lock <- createLockIO -- the same lock is used for all queues - (!count, !res) <- foldQueues 0 (processQueue lock) (0, mempty) ("", storePath) + (!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath) putStrLn $ progress count pure res JournalStoreConfig {storePath, pathParts} = config - processQueue :: Lock -> (Int, a) -> (String, FilePath) -> IO (Int, a) - processQueue queueLock (!i, !r) (queueId, dir) = do + processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a) + processQueue (!i, !r) (queueId, dir) = do when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout - let statePath = msgQueueStatePath dir queueId - q <- openMsgQueue ms JMQueue {queueDirectory = dir, queueLock, statePath} r' <- case strDecode $ B.pack queueId of - Right rId -> action rId q + Right rId -> + getQueue ms SRecipient rId >>= \case + Right q -> unStoreIO (getMsgQueue ms rId q) *> action rId q <* closeMsgQueue q + Left AUTH -> do + logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir + removeQueueDirectory_ dir + pure mempty + Left e -> do + logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e + exitFailure Left e -> do - putStrLn ("Error: message queue directory " <> dir <> " is invalid: " <> e) + logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e exitFailure - closeMsgQueueHandles q pure (i + 1, r <> r') progress i = "Processed: " <> show i <> " queues" foldQueues depth f acc (queueId, path) = do @@ -258,23 +297,26 @@ instance MsgStoreClass JournalMsgStore where logQueueStates :: JournalMsgStore -> IO () logQueueStates ms = withActiveMsgQueues ms $ \_ -> logQueueState - logQueueState :: JournalMsgQueue -> IO () - logQueueState q = - readTVarIO (handles q) - >>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= appendState (stateHandle hs)) + logQueueState :: JournalQueue -> IO () + logQueueState q = + void $ + readTVarIO (msgQueue_ q) + $>>= \mq -> readTVarIO (handles mq) + $>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ()) - getMsgQueue :: JournalMsgStore -> RecipientId -> ExceptT ErrorType IO JournalMsgQueue - getMsgQueue ms@JournalMsgStore {queueLocks, msgQueues, random} rId = - tryStore "getMsgQueue" (B.unpack $ strEncode rId) $ withLockMap queueLocks rId "getMsgQueue" $ - TM.lookupIO rId msgQueues >>= maybe newQ pure + queueRec' = queueRec + {-# INLINE queueRec' #-} + + getMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO JournalMsgQueue + getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} = + StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where newQ = do - queueLock <- atomically $ getMapLock queueLocks rId let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) - queue = JMQueue {queueDirectory = dir, queueLock, statePath} + queue = JMQueue {queueDirectory = dir, statePath} q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue) - atomically $ TM.insert rId q msgQueues + atomically $ writeTVar msgQueue_ $! Just q pure q where createQ :: JMQueue -> IO JournalMsgQueue @@ -282,23 +324,25 @@ instance MsgStoreClass JournalMsgStore where -- folder and files are not created here, -- to avoid file IO for queues without messages during subscription journalId <- newJournalId random - mkJournalQueue queue (newMsgQueueState journalId, Nothing) + mkJournalQueue queue (newMsgQueueState journalId) Nothing - delMsgQueue :: JournalMsgStore -> RecipientId -> IO () - delMsgQueue ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do - closeMsgQueue ms rId - removeQueueDirectory ms rId + openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue) + openedMsgQueue = StoreIO . readTVarIO . msgQueue_ + {-# INLINE openedMsgQueue #-} - delMsgQueueSize :: JournalMsgStore -> RecipientId -> IO Int - delMsgQueueSize ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do - st_ <- - atomically (TM.lookupDelete rId (msgQueues ms)) - >>= mapM (\q -> closeMsgQueueHandles q >> readTVarIO (state q)) - removeQueueDirectory ms rId - pure $ maybe (-1) size st_ + deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms rId q = + fst <$$> deleteQueue_ ms rId q - getQueueMessages :: Bool -> JournalMsgQueue -> IO [Message] - getQueueMessages drainMsgs q = run [] + deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms rId q = + deleteQueue_ ms rId q >>= mapM (traverse getSize) + -- traverse operates on the second tuple element + where + getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) + + getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message] + getQueueMessages_ drainMsgs q = StoreIO (run []) where run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs) getMsg msgs hs = chooseReadJournal q drainMsgs hs >>= maybe (pure msgs) readMsg @@ -308,22 +352,23 @@ instance MsgStoreClass JournalMsgStore where updateReadPos q drainMsgs len hs (msg :) <$> run msgs - writeMsg :: JournalMsgStore -> JournalMsgQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - writeMsg ms q@JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} logState msg = - isolateQueue q "writeMsg" $ StoreIO $ do + writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do + q <- getMsgQueue ms rId q' + StoreIO $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) let empty = size == 0 if canWrite || empty then do let canWrt' = quota > size if canWrt' - then writeToJournal st canWrt' msg $> Just (msg, empty) - else writeToJournal st canWrt' msgQuota $> Nothing + then writeToJournal q st canWrt' msg $> Just (msg, empty) + else writeToJournal q st canWrt' msgQuota $> Nothing else pure Nothing where JournalStoreConfig {quota, maxMsgCount} = config ms - msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} - writeToJournal st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do + msgQuota = MessageQuota {msgId = messageId msg, msgTs = messageTs msg} + writeToJournal q st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do let msgStr = strEncode msg' `B.snoc` '\n' msgLen = fromIntegral $ B.length msgStr hs <- maybe createQueueDir pure =<< readTVarIO handles @@ -339,6 +384,7 @@ instance MsgStoreClass JournalMsgStore where updateQueueState q logState hs st' $ when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen)) where + JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q createQueueDir = do createDirectoryIfMissing True queueDirectory sh <- openFile statePath AppendMode @@ -354,11 +400,13 @@ instance MsgStoreClass JournalMsgStore where pure (newJournalState journalId, wh) -- can ONLY be used while restoring messages, not while server running - setOverQuota_ :: JournalMsgQueue -> IO () - setOverQuota_ JournalMsgQueue {state} = atomically $ modifyTVar' state $ \st -> st {canWrite = False} + setOverQuota_ :: JournalQueue -> IO () + setOverQuota_ q = + readTVarIO (msgQueue_ q) + >>= mapM_ (\JournalMsgQueue {state} -> atomically $ modifyTVar' state $ \st -> st {canWrite = False}) - getQueueSize :: JournalMsgQueue -> IO Int - getQueueSize JournalMsgQueue {state} = size <$> readTVarIO state + getQueueSize_ :: JournalMsgQueue -> StoreIO Int + getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state tryPeekMsg_ :: JournalMsgQueue -> StoreIO (Maybe Message) tryPeekMsg_ q@JournalMsgQueue {tipMsg, handles} = @@ -379,27 +427,33 @@ instance MsgStoreClass JournalMsgStore where $>>= \len -> readTVarIO handles $>>= \hs -> updateReadPos q logState len hs $> Just () - isolateQueue :: JournalMsgQueue -> String -> StoreIO a -> ExceptT ErrorType IO a - isolateQueue JournalMsgQueue {queue = q} op = - tryStore op (queueDirectory q) . withLock' (queueLock q) op . unStoreIO + isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a + isolateQueue rId JournalQueue {queueLock} op = + tryStore' op rId . withLock' queueLock op . unStoreIO -tryStore :: String -> String -> IO a -> ExceptT ErrorType IO a -tryStore op qId a = ExceptT $ E.mask_ $ E.try a >>= bimapM storeErr pure +tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a +tryStore' op rId = tryStore op rId . fmap Right + +tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a +tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure where - storeErr :: E.SomeException -> IO ErrorType + storeErr :: E.SomeException -> IO (Either ErrorType a) storeErr e = - let e' = intercalate ", " [op, qId, show e] - in logError ("STORE: " <> T.pack e') $> STORE e' + let e' = intercalate ", " [op, B.unpack $ strEncode rId, show e] + in logError ("STORE: " <> T.pack e') $> Left (STORE e') + +isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a +isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do (st, sh) <- readWriteQueueState ms statePath - (st', rh, wh_) <- closeOnException sh $ openJournals dir st sh + (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} - mkJournalQueue q (st', Just hs) + mkJournalQueue q st' (Just hs) -mkJournalQueue :: JMQueue -> (MsgQueueState, Maybe MsgQueueHandles) -> IO JournalMsgQueue -mkJournalQueue queue (st, hs_) = do +mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue +mkJournalQueue queue st hs_ = do state <- newTVarIO st tipMsg <- newTVarIO Nothing handles <- newTVarIO hs_ @@ -464,17 +518,26 @@ createNewJournal dir journalId = do newJournalId :: TVar StdGen -> IO ByteString newJournalId g = strEncode <$> atomically (stateTVar g $ genByteString 12) -openJournals :: FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle) -openJournals dir st@MsgQueueState {readState = rs, writeState = ws} sh = do +openJournals :: JournalMsgStore -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle) +openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do let rjId = journalId rs wjId = journalId ws openJournal rs >>= \case - Left path -> do - logError $ "STORE: openJournals, no read file - creating new file, " <> T.pack path - rh <- createNewJournal dir rjId - let st' = newMsgQueueState rjId - closeOnException rh $ appendState sh st' - pure (st', rh, Nothing) + Left path + | rjId == wjId -> do + logError $ "STORE: openJournals, no read/write file - creating new file, " <> T.pack path + newReadJournal + | otherwise -> do + let rs' = (newJournalState wjId) {msgCount = msgCount ws, byteCount = byteCount ws} + st' = st {readState = rs', size = msgCount ws} + openJournal rs' >>= \case + Left path' -> do + logError $ "STORE: openJournals, no read and write files - creating new file, read: " <> T.pack path <> ", write: " <> T.pack path' + newReadJournal + Right rh -> do + logError $ "STORE: openJournals, no read file - switched to write file, " <> T.pack path + closeOnException rh $ fixFileSize rh $ bytePos ws + pure (st', rh, Nothing) Right rh | rjId == wjId -> do closeOnException rh $ fixFileSize rh $ bytePos ws @@ -483,16 +546,23 @@ openJournals dir st@MsgQueueState {readState = rs, writeState = ws} sh = do fixFileSize rh $ byteCount rs openJournal ws >>= \case Left path -> do - logError $ "STORE: openJournals, no write file - creating new file, " <> T.pack path - wh <- createNewJournal dir wjId - let size' = msgCount rs - msgPos rs - st' = st {writeState = newJournalState wjId, size = size'} -- we don't amend canWrite to trigger QCONT - closeOnException wh $ appendState sh st' - pure (st', rh, Just wh) + let msgs = msgCount rs + bytes = byteCount rs + size' = msgs - msgPos rs + ws' = (newJournalState rjId) {msgPos = msgs, msgCount = msgs, bytePos = bytes, byteCount = bytes} + st' = st {writeState = ws', size = size'} -- we don't amend canWrite to trigger QCONT + logError $ "STORE: openJournals, no write file, " <> T.pack path + pure (st', rh, Nothing) Right wh -> do closeOnException wh $ fixFileSize wh $ bytePos ws pure (st, rh, Just wh) where + newReadJournal = do + rjId' <- newJournalId $ random ms + rh <- createNewJournal dir rjId' + let st' = newMsgQueueState rjId' + closeOnException rh $ appendState sh st' + pure (st', rh, Nothing) openJournal :: JournalState t -> IO (Either FilePath Handle) openJournal JournalState {journalId} = let path = journalFilePath dir journalId @@ -599,10 +669,18 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws -closeMsgQueue :: JournalMsgStore -> RecipientId -> IO () -closeMsgQueue ms rId = - atomically (TM.lookupDelete rId (msgQueues ms)) - >>= mapM_ closeMsgQueueHandles +deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue)) +deleteQueue_ ms rId q = + runExceptT $ isolateQueueId "deleteQueue_" ms rId $ + deleteQueue' ms rId q >>= mapM remove + where + remove r@(_, mq_) = do + mapM_ closeMsgQueueHandles mq_ + removeQueueDirectory ms rId + pure r + +closeMsgQueue :: JournalQueue -> IO () +closeMsgQueue JournalQueue {msgQueue_} = atomically (swapTVar msgQueue_ Nothing) >>= mapM_ closeMsgQueueHandles closeMsgQueueHandles :: JournalMsgQueue -> IO () closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles @@ -613,9 +691,12 @@ closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles mapM_ hClose wh_ removeQueueDirectory :: JournalMsgStore -> RecipientId -> IO () -removeQueueDirectory st rId = - let dir = msgQueueDirectory st rId - in removePathForcibly dir `catchAny` (\e -> logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e) +removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st + +removeQueueDirectory_ :: FilePath -> IO () +removeQueueDirectory_ dir = + removePathForcibly dir `catchAny` \e -> + logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e hAppend :: Handle -> Int64 -> ByteString -> IO () hAppend h pos s = do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 1b27b6a10..b1412421c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} @@ -7,11 +8,11 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} module Simplex.Messaging.Server.MsgStore.STM ( STMMsgStore (..), - STMMsgQueue (msgQueue), STMStoreConfig (..), ) where @@ -20,21 +21,35 @@ import Control.Concurrent.STM import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Functor (($>)) -import Simplex.Messaging.Protocol (ErrorType, Message (..), RecipientId) +import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types +import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.STM +import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM - -data STMMsgQueue = STMMsgQueue - { msgQueue :: TQueue Message, - quota :: Int, - canWrite :: TVar Bool, - size :: TVar Int - } +import Simplex.Messaging.Util ((<$$>)) +import System.IO (IOMode (..)) data STMMsgStore = STMMsgStore { storeConfig :: STMStoreConfig, - msgQueues :: TMap RecipientId STMMsgQueue + queues :: TMap RecipientId STMQueue, + senders :: TMap SenderId RecipientId, + notifiers :: TMap NotifierId RecipientId, + storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + } + +data STMQueue = STMQueue + { -- To avoid race conditions and errors when restoring queues, + -- Nothing is written to TVar when queue is deleted. + queueRec :: TVar (Maybe QueueRec), + msgQueue_ :: TVar (Maybe STMMsgQueue) + } + +data STMMsgQueue = STMMsgQueue + { msgQueue :: TQueue Message, + canWrite :: TVar Bool, + size :: TVar Int } data STMStoreConfig = STMStoreConfig @@ -42,19 +57,34 @@ data STMStoreConfig = STMStoreConfig quota :: Int } +instance STMQueueStore STMMsgStore where + queues' = queues + senders' = senders + notifiers' = notifiers + storeLog' = storeLog + mkQueue _ qr = STMQueue <$> (newTVar $! Just qr) <*> newTVar Nothing + msgQueue_' = msgQueue_ + instance MsgStoreClass STMMsgStore where type StoreMonad STMMsgStore = STM + type StoreQueue STMMsgStore = STMQueue type MsgQueue STMMsgStore = STMMsgQueue type MsgStoreConfig STMMsgStore = STMStoreConfig newMsgStore :: STMStoreConfig -> IO STMMsgStore newMsgStore storeConfig = do - msgQueues <- TM.emptyIO - pure STMMsgStore {storeConfig, msgQueues} + queues <- TM.emptyIO + senders <- TM.emptyIO + notifiers <- TM.emptyIO + storeLog <- newTVarIO Nothing + pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog} - closeMsgStore _ = pure () + setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO () + setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) - activeMsgQueues = msgQueues + closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog + + activeMsgQueues = queues {-# INLINE activeMsgQueues #-} withAllMsgQueues _ = withActiveMsgQueues @@ -64,39 +94,44 @@ instance MsgStoreClass STMMsgStore where logQueueState _ = pure () - -- The reason for double lookup is that majority of messaging queues exist, - -- because multiple messages are sent to the same queue, - -- so the first lookup without STM transaction will return the queue faster. - -- In case the queue does not exist, it needs to be looked-up again inside transaction. - getMsgQueue :: STMMsgStore -> RecipientId -> ExceptT ErrorType IO STMMsgQueue - getMsgQueue STMMsgStore {msgQueues = qs, storeConfig = STMStoreConfig {quota}} rId = - liftIO $ TM.lookupIO rId qs >>= maybe (atomically maybeNewQ) pure + queueRec' = queueRec + {-# INLINE queueRec' #-} + + getMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM STMMsgQueue + getMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure where - maybeNewQ = TM.lookup rId qs >>= maybe newQ pure newQ = do msgQueue <- newTQueue canWrite <- newTVar True size <- newTVar 0 - let q = STMMsgQueue {msgQueue, quota, canWrite, size} - TM.insert rId q qs + let q = STMMsgQueue {msgQueue, canWrite, size} + writeTVar msgQueue_ $! Just q pure q - delMsgQueue :: STMMsgStore -> RecipientId -> IO () - delMsgQueue st rId = atomically $ TM.delete rId $ msgQueues st + openedMsgQueue :: STMQueue -> STM (Maybe STMMsgQueue) + openedMsgQueue = readTVar . msgQueue_ + {-# INLINE openedMsgQueue #-} - delMsgQueueSize :: STMMsgStore -> RecipientId -> IO Int - delMsgQueueSize st rId = atomically (TM.lookupDelete rId $ msgQueues st) >>= maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) + deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q - getQueueMessages :: Bool -> STMMsgQueue -> IO [Message] - getQueueMessages drainMsgs = atomically . (if drainMsgs then flushTQueue else snapshotTQueue) . msgQueue + deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize) + -- traverse operates on the second tuple element + where + getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) + + getQueueMessages_ :: Bool -> STMMsgQueue -> STM [Message] + getQueueMessages_ drainMsgs = (if drainMsgs then flushTQueue else snapshotTQueue) . msgQueue where snapshotTQueue q = do msgs <- flushTQueue q mapM_ (writeTQueue q) msgs pure msgs - writeMsg :: STMMsgStore -> STMMsgQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - writeMsg _ STMMsgQueue {msgQueue = q, quota, canWrite, size} _logState msg = liftIO $ atomically $ do + writeMsg :: STMMsgStore -> RecipientId -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg ms rId q' _logState msg = liftIO $ atomically $ do + STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms rId q' canWrt <- readTVar canWrite empty <- isEmptyTQueue q if canWrt || empty @@ -109,13 +144,14 @@ instance MsgStoreClass STMMsgStore where else (writeTQueue q $! msgQuota) $> Nothing else pure Nothing where - msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} + STMMsgStore {storeConfig = STMStoreConfig {quota}} = ms + msgQuota = MessageQuota {msgId = messageId msg, msgTs = messageTs msg} - setOverQuota_ :: STMMsgQueue -> IO () - setOverQuota_ q = atomically $ writeTVar (canWrite q) False + setOverQuota_ :: STMQueue -> IO () + setOverQuota_ q = readTVarIO (msgQueue_ q) >>= mapM_ (\mq -> atomically $ writeTVar (canWrite mq) False) - getQueueSize :: STMMsgQueue -> IO Int - getQueueSize STMMsgQueue {size} = readTVarIO size + getQueueSize_ :: STMMsgQueue -> STM Int + getQueueSize_ STMMsgQueue {size} = readTVar size tryPeekMsg_ :: STMMsgQueue -> STM (Maybe Message) tryPeekMsg_ = tryPeekTQueue . msgQueue @@ -127,5 +163,5 @@ instance MsgStoreClass STMMsgStore where Just _ -> modifyTVar' size (subtract 1) _ -> pure () - isolateQueue :: STMMsgQueue -> String -> STM a -> ExceptT ErrorType IO a - isolateQueue _ _ = liftIO . atomically + isolateQueue :: RecipientId -> STMQueue -> String -> STM a -> ExceptT ErrorType IO a + isolateQueue _ _ _ = liftIO . atomically diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 0486804f3..28f250b70 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} @@ -16,29 +17,44 @@ import Data.Int (Int64) import Data.Kind import qualified Data.Map.Strict as M import Data.Time.Clock.System (SystemTime (systemSeconds)) -import Simplex.Messaging.Protocol (ErrorType, Message (..), MsgId, RecipientId) +import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.StoreLog.Types import Simplex.Messaging.TMap (TMap) +import System.IO (IOMode (..)) + +class MsgStoreClass s => STMQueueStore s where + queues' :: s -> TMap RecipientId (StoreQueue s) + senders' :: s -> TMap SenderId RecipientId + notifiers' :: s -> TMap NotifierId RecipientId + storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) + mkQueue :: s -> QueueRec -> STM (StoreQueue s) + msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) class Monad (StoreMonad s) => MsgStoreClass s where type StoreMonad s = (m :: Type -> Type) | m -> s type MsgStoreConfig s = c | c -> s + type StoreQueue s = q | q -> s type MsgQueue s = q | q -> s newMsgStore :: MsgStoreConfig s -> IO s + setStoreLog :: s -> StoreLog 'WriteMode -> IO () closeMsgStore :: s -> IO () - activeMsgQueues :: s -> TMap RecipientId (MsgQueue s) - withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> MsgQueue s -> IO a) -> IO a + activeMsgQueues :: s -> TMap RecipientId (StoreQueue s) + withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () - logQueueState :: MsgQueue s -> IO () - getMsgQueue :: s -> RecipientId -> ExceptT ErrorType IO (MsgQueue s) - delMsgQueue :: s -> RecipientId -> IO () - delMsgQueueSize :: s -> RecipientId -> IO Int - getQueueMessages :: Bool -> MsgQueue s -> IO [Message] - writeMsg :: s -> MsgQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - setOverQuota_ :: MsgQueue s -> IO () -- can ONLY be used while restoring messages, not while server running - getQueueSize :: MsgQueue s -> IO Int + logQueueState :: StoreQueue s -> IO () + queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) + getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s) + openedMsgQueue :: StoreQueue s -> StoreMonad s (Maybe (MsgQueue s)) + deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec) + deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) + getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message] + writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running + getQueueSize_ :: MsgQueue s -> StoreMonad s Int tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message) tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s () - isolateQueue :: MsgQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a + isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a data MSType = MSMemory | MSJournal @@ -48,42 +64,55 @@ data SMSType :: MSType -> Type where data AMSType = forall s. AMSType (SMSType s) -withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> MsgQueue s -> IO a) -> IO a +withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs where run !acc (k, v) = do r <- f k v pure $! acc <> r -tryPeekMsg :: MsgStoreClass s => MsgQueue s -> ExceptT ErrorType IO (Maybe Message) -tryPeekMsg mq = isolateQueue mq "tryPeekMsg" $ tryPeekMsg_ mq +getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message] +getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueMessages_ drainMsgs +{-# INLINE getQueueMessages #-} + +getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int +getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueSize_ +{-# INLINE getQueueSize #-} + +tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) +tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ tryPeekMsg_ {-# INLINE tryPeekMsg #-} -tryDelMsg :: MsgStoreClass s => MsgQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) -tryDelMsg mq msgId' = - isolateQueue mq "tryDelMsg" $ +tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) +tryDelMsg st rId q msgId' = + withMsgQueue st rId q "tryDelMsg" $ \mq -> tryPeekMsg_ mq >>= \case msg_@(Just msg) - | msgId msg == msgId' -> + | messageId msg == msgId' -> tryDeleteMsg_ mq True >> pure msg_ _ -> pure Nothing -- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgStoreClass s => MsgQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) -tryDelPeekMsg mq msgId' = - isolateQueue mq "tryDelPeekMsg" $ +tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) +tryDelPeekMsg st rId q msgId' = + withMsgQueue st rId q "tryDelPeekMsg" $ \mq -> tryPeekMsg_ mq >>= \case msg_@(Just msg) - | msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq) + | messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq) | otherwise -> pure (Nothing, msg_) _ -> pure (Nothing, Nothing) -deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int -deleteExpiredMsgs mq logState old = isolateQueue mq "deleteExpiredMsgs" $ loop 0 +withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (MsgQueue s -> StoreMonad s a) -> ExceptT ErrorType IO a +withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a +{-# INLINE withMsgQueue #-} + +deleteExpiredMsgs :: MsgStoreClass s => RecipientId -> StoreQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int +deleteExpiredMsgs rId q logState old = + isolateQueue rId q "deleteExpiredMsgs" $ openedMsgQueue q >>= maybe (pure 0) (loop 0) where - loop dc = + loop dc mq = tryPeekMsg_ mq >>= \case Just Message {msgTs} | systemSeconds msgTs < old -> - tryDeleteMsg_ mq logState >> loop (dc + 1) + tryDeleteMsg_ mq logState >> loop (dc + 1) mq _ -> pure dc diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 4cd499430..d83d26f08 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -1,120 +1,196 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE UndecidableInstances #-} module Simplex.Messaging.Server.QueueStore.STM - ( QueueStore (..), - newQueueStore, - addQueue, + ( addQueue, getQueue, + getQueueRec, secureQueue, addQueueNotifier, deleteQueueNotifier, suspendQueue, updateQueueTime, - deleteQueue, + deleteQueue', + readQueueStore, + withLog', ) where +import qualified Control.Exception as E +import Control.Logger.Simple import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Except +import Data.Bitraversable (bimapM) +import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) +import qualified Data.Text as T +import Data.Text.Encoding (decodeLatin1) +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore -import Simplex.Messaging.TMap (TMap) +import Simplex.Messaging.Server.StoreLog import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (ifM, ($>>=)) +import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$)) +import System.IO import UnliftIO.STM -data QueueStore = QueueStore - { queues :: TMap RecipientId (TVar QueueRec), - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId - } - -newQueueStore :: IO QueueStore -newQueueStore = do - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - pure QueueStore {queues, senders, notifiers} - -addQueue :: QueueStore -> QueueRec -> IO (Either ErrorType ()) -addQueue QueueStore {queues, senders, notifiers} q@QueueRec {recipientId = rId, senderId = sId, notifier} = atomically $ do - ifM hasId (pure $ Left DUPLICATE_) $ do - TM.insertM rId (newTVar q) queues - TM.insert sId rId senders - forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers - pure $ Right () +addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) +addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= + atomically add + $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) where - hasId = (||) <$> TM.member rId queues <*> TM.member sId senders + add = ifM hasId (pure $ Left DUPLICATE_) $ do + q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing + TM.insert rId q $ queues' st + TM.insert sId rId $ senders' st + forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st + pure $ Right q + hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] + hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier -getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> IO (Either ErrorType QueueRec) -getQueue QueueStore {queues, senders, notifiers} party qId = - toResult <$> (mapM readTVarIO =<< getVar) +getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) +getQueue st party qId = + maybe (Left AUTH) Right <$> case party of + SRecipient -> TM.lookupIO qId $ queues' st + SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st) + SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st) + +getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) +getQueueRec st party qId = + getQueue st party qId + $>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q)) + +secureQueue :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) +secureQueue st sq sKey = + atomically (readQueueRec qr $>>= secure) + $>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey where - getVar = case party of - SRecipient -> TM.lookupIO qId queues - SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues) - SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues) + qr = queueRec' sq + secure q@QueueRec {recipientId = rId} = case senderKey q of + Just k -> pure $ if sKey == k then Right rId else Left AUTH + Nothing -> do + writeTVar qr $! Just q {senderKey = Just sKey} + pure $ Right rId -secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec) -secureQueue QueueStore {queues} rId sKey = toResult <$> do - TM.lookupIO rId queues $>>= \qVar -> atomically $ - readTVar qVar >>= \q -> case senderKey q of - Just k -> pure $ if sKey == k then Just q else Nothing - _ -> - let !q' = q {senderKey = Just sKey} - in writeTVar qVar q' $> Just q' - -addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) -addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do - TM.lookupIO rId queues >>= \case - Just qVar -> atomically $ ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do - q <- readTVar qVar - nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId +addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) +addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = + atomically (readQueueRec qr $>>= add) + $>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) + where + qr = queueRec' sq + add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do + nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId let !q' = q {notifier = Just ntfCreds} - writeTVar qVar q' - TM.insert nId rId notifiers - pure $ Right nId_ - Nothing -> pure $ Left AUTH + writeTVar qr $! Just q' + TM.insert nId rId $ notifiers' st + pure $ Right (rId, nId_) -deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType (Maybe NotifierId)) -deleteQueueNotifier QueueStore {queues, notifiers} rId = - withQueue rId queues $ \qVar -> do - q <- readTVar qVar - forM (notifier q) $ \NtfCreds {notifierId} -> do - TM.delete notifierId notifiers - writeTVar qVar $! q {notifier = Nothing} +deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) +deleteQueueNotifier st sq = + atomically (readQueueRec qr >>= mapM delete) + $>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) + where + qr = queueRec' sq + delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do + TM.delete notifierId $ notifiers' st + writeTVar qr $! Just q {notifier = Nothing} pure notifierId -suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ()) -suspendQueue QueueStore {queues} rId = - withQueue rId queues (`modifyTVar'` \q -> q {status = QueueOff}) +suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +suspendQueue st sq = + atomically (readQueueRec qr >>= mapM suspend) + $>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId) + where + qr = queueRec' sq + suspend q = do + writeTVar qr $! Just q {status = QueueOff} + pure $ recipientId q -updateQueueTime :: QueueStore -> RecipientId -> RoundedSystemTime -> IO () -updateQueueTime QueueStore {queues} rId t = - void $ withQueue rId queues (`modifyTVar'` \q -> q {updatedAt = Just t}) +updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) +updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' + where + qr = queueRec' sq + update q@QueueRec {updatedAt} + | updatedAt == Just t = pure (q, False) + | otherwise = + let !q' = q {updatedAt = Just t} + in (writeTVar qr $! Just q') $> (q', True) + log' (q, changed) + | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t) + | otherwise = pure $ Right q -deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec) -deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do - TM.lookupDelete rId queues >>= \case - Just qVar -> - readTVar qVar >>= \q -> do - TM.delete (senderId q) senders - forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers - pure $ Right q - _ -> pure $ Left AUTH +deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) +deleteQueue' st rId sq = + atomically (readQueueRec qr >>= mapM delete) + $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId) + >>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing)) + where + qr = queueRec' sq + delete q = do + writeTVar qr Nothing + TM.delete (senderId q) $ senders' st + forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st + pure q -toResult :: Maybe a -> Either ErrorType a -toResult = maybe (Left AUTH) Right +readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec) +readQueueRec qr = maybe (Left AUTH) Right <$> readTVar qr +{-# INLINE readQueueRec #-} -withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM a) -> IO (Either ErrorType a) -withQueue rId queues f = toResult <$> TM.lookupIO rId queues >>= atomically . mapM f +withLog' :: String -> TVar (Maybe (StoreLog 'WriteMode)) -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) +withLog' name sl action = + readTVarIO sl + >>= maybe (pure $ Right ()) (E.try . action >=> bimapM logErr pure) + where + logErr :: E.SomeException -> IO ErrorType + logErr e = logError ("STORE: " <> T.pack err) $> STORE err + where + err = name <> ", withLog, " <> show e + +withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) +withLog name = withLog' name . storeLog' + +readQueueStore :: forall s. STMQueueStore s => FilePath -> s -> IO () +readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines + where + processLine :: LB.ByteString -> IO () + processLine s' = either printError procLogRecord (strDecode s) + where + s = LB.toStrict s' + procLogRecord :: StoreLogRecord -> IO () + procLogRecord = \case + CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue" + SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey + AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds + SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st + DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId + DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st + UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t + printError :: String -> IO () + printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s + withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () + withQueue qId op a = runExceptT go >>= qError qId op + where + go = do + q <- ExceptT $ getQueue st SRecipient qId + liftIO (readTVarIO $ queueRec' q) >>= \case + Nothing -> logWarn $ logPfx qId op <> "already deleted" + Just _ -> void $ ExceptT $ a q + qError qId op = \case + Left e -> logError $ logPfx qId op <> tshow e + Right _ -> pure () + logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", " diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 2e272260a..2da3398f2 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -6,6 +6,7 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Simplex.Messaging.Server.StoreLog @@ -23,36 +24,33 @@ module Simplex.Messaging.Server.StoreLog logDeleteQueue, logDeleteNotifier, logUpdateQueueTime, - readWriteQueueStore, readWriteStoreLog, + writeQueueStore, ) where import Control.Applicative (optional, (<|>)) import Control.Concurrent.STM +import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Lazy.Char8 as LB -import qualified Data.ByteString.Base64.URL as B64 +import qualified Data.Map.Strict as M import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) import Data.Time.Format.ISO8601 (iso8601Show) +import GHC.IO (catchAny) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore -import Simplex.Messaging.Server.QueueStore.STM -import Simplex.Messaging.Util (bshow, ifM, unlessM, whenM) +import Simplex.Messaging.Server.StoreLog.Types +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Util (ifM, tshow, unlessM, whenM) import System.Directory (doesFileExist, renameFile) import System.IO --- | opaque container for file handle with a type-safe IOMode --- constructors are not exported, openWriteStoreLog and openReadStoreLog should be used instead -data StoreLog (a :: IOMode) where - ReadStoreLog :: FilePath -> Handle -> StoreLog 'ReadMode - WriteStoreLog :: FilePath -> Handle -> StoreLog 'WriteMode - data StoreLogRecord = CreateQueue QueueRec | SecureQueue QueueId SndPublicAuthKey @@ -159,11 +157,13 @@ storeLogFilePath = \case closeStoreLog :: StoreLog a -> IO () closeStoreLog = \case - WriteStoreLog _ h -> hClose h - ReadStoreLog _ h -> hClose h + WriteStoreLog _ h -> close_ h + ReadStoreLog _ h -> close_ h + where + close_ h = hClose h `catchAny` \e -> logError ("STORE: closeStoreLog, error closing, " <> tshow e) writeStoreLogRecord :: StrEncoding r => StoreLog 'WriteMode -> r -> IO () -writeStoreLogRecord (WriteStoreLog _ h) r = do +writeStoreLogRecord (WriteStoreLog _ h) r = E.uninterruptibleMask_ $ do B.hPut h $ strEncode r `B.snoc` '\n' -- hPutStrLn makes write non-atomic for length > 1024 hFlush h @@ -188,9 +188,6 @@ logDeleteNotifier s = writeStoreLogRecord s . DeleteNotifier logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> RoundedSystemTime -> IO () logUpdateQueueTime s qId t = writeStoreLogRecord s $ UpdateTime qId t -readWriteQueueStore :: FilePath -> QueueStore -> IO (StoreLog 'WriteMode) -readWriteQueueStore = readWriteStoreLog readQueues writeQueues - readWriteStoreLog :: (FilePath -> s -> IO ()) -> (StoreLog 'WriteMode -> s -> IO ()) -> FilePath -> s -> IO (StoreLog 'WriteMode) readWriteStoreLog readStore writeStore f st = ifM @@ -226,31 +223,11 @@ readWriteStoreLog readStore writeStore f st = renameFile tempBackup timedBackup logInfo $ "original state preserved as " <> T.pack timedBackup -writeQueues :: StoreLog 'WriteMode -> QueueStore -> IO () -writeQueues s st = readTVarIO (queues st) >>= mapM_ writeQueue +writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO () +writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs where - writeQueue v = readTVarIO v >>= \q -> when (active q) $ logCreateQueue s q + writeQueue (rId, q) = + readTVarIO (queueRec' q) >>= \case + Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them + Nothing -> atomically $ TM.delete rId $ activeMsgQueues st active QueueRec {status} = status == QueueActive - -readQueues :: FilePath -> QueueStore -> IO () -readQueues f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines - where - processLine :: LB.ByteString -> IO () - processLine s' = either printError procLogRecord (strDecode s) - where - s = LB.toStrict s' - procLogRecord :: StoreLogRecord -> IO () - procLogRecord = \case - CreateQueue q -> addQueue st q >>= qError "create" (recipientId q) - SecureQueue qId sKey -> secureQueue st qId sKey >>= qError "secure" qId - AddNotifier qId ntfCreds -> addQueueNotifier st qId ntfCreds >>= qError "addNotifier" qId - SuspendQueue qId -> suspendQueue st qId >>= qError "suspend" qId - DeleteQueue qId -> deleteQueue st qId >>= qError "delete" qId - DeleteNotifier qId -> deleteQueueNotifier st qId >>= qError "deleteNotifier" qId - UpdateTime qId t -> updateQueueTime st qId t - printError :: String -> IO () - printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s - qError :: B.ByteString -> RecipientId -> Either ErrorType a -> IO () - qError op (EntityId qId) = \case - Left e -> B.putStrLn $ op <> " stored queue " <> B64.encode qId <> " error: " <> bshow e - Right _ -> pure () diff --git a/src/Simplex/Messaging/Server/StoreLog/Types.hs b/src/Simplex/Messaging/Server/StoreLog/Types.hs new file mode 100644 index 000000000..6b711f9d7 --- /dev/null +++ b/src/Simplex/Messaging/Server/StoreLog/Types.hs @@ -0,0 +1,13 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} + +module Simplex.Messaging.Server.StoreLog.Types where + +import System.IO (Handle, IOMode (..)) + +-- | opaque container for file handle with a type-safe IOMode +-- constructors are not exported, openWriteStoreLog and openReadStoreLog should be used instead +data StoreLog (a :: IOMode) where + ReadStoreLog :: FilePath -> Handle -> StoreLog 'ReadMode + WriteStoreLog :: FilePath -> Handle -> StoreLog 'WriteMode diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index ea0322b74..b467c5ea9 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -35,12 +35,16 @@ raceAny_ = r [] r as (m : ms) = withAsync m $ \a -> r (a : as) ms r as [] = void $ waitAnyCancel as -infixl 4 <$$>, <$?> +infixl 4 <$$>, <$$, <$?> (<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b) (<$$>) = fmap . fmap {-# INLINE (<$$>) #-} +(<$$) :: (Functor f, Functor g) => b -> f (g a) -> f (g b) +(<$$) = fmap . fmap . const +{-# INLINE (<$$) #-} + (<$?>) :: MonadFail m => (a -> Either String b) -> m a -> m b f <$?> m = either fail pure . f =<< m {-# INLINE (<$?>) #-} diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 16e3a12f0..4fc48d6c7 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} @@ -7,6 +8,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} @@ -17,19 +19,25 @@ import Control.Concurrent.STM import Control.Exception (bracket) import Control.Monad import Control.Monad.IO.Class +import Control.Monad.Trans.Except +import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Base64.URL as B64 +import Data.Maybe (fromJust) import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Protocol (EntityId (..), Message (..), noMsgFlags) +import Simplex.Messaging.Protocol (EntityId (..), Message (..), RecipientId, SParty (..), noMsgFlags) import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages, printMessageStats) -import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth) +import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore) import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types -import SMPClient (testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) +import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.Server.QueueStore.STM +import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) +import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) import System.IO (IOMode (..), hClose, withFile) @@ -44,13 +52,18 @@ msgStoreTests = do describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState + describe "missing files" $ do + it "should create read file when missing" testReadFileMissing + it "should switch to write file when read file missing" testReadFileMissingSwitch + it "should create write file when missing" testWriteFileMissing + it "should create read file when read and write files are missing" testReadAndWriteFilesMissing where - someMsgStoreTests :: MsgStoreClass s => SpecWith s + someMsgStoreTests :: STMQueueStore s => SpecWith s someMsgStoreTests = do it "should get queue and store/read messages" testGetQueue it "should not fail on EOF when changing read journal" testChangeReadJournal -withMsgStore :: MsgStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO () +withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO () withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore testSMTStoreConfig :: STMStoreConfig @@ -83,98 +96,121 @@ deriving instance Eq (JournalState t) deriving instance Eq (SJournalType t) -testGetQueue :: MsgStoreClass s => s -> IO () +testNewQueueRec :: TVar ChaChaDRG -> Bool -> IO (RecipientId, QueueRec) +testNewQueueRec g sndSecure = do + rId <- atomically $ EntityId <$> C.randomBytes 24 g + senderId <- atomically $ EntityId <$> C.randomBytes 24 g + (recipientKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g + (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g + let qr = + QueueRec + { recipientId = rId, + recipientKey, + rcvDhSecret = C.dh' k pk, + senderId, + senderKey = Nothing, + sndSecure, + notifier = Nothing, + status = QueueActive, + updatedAt = Nothing + } + pure (rId, qr) + +testGetQueue :: STMQueueStore s => s -> IO () testGetQueue ms = do g <- C.newRandom - rId <- EntityId <$> atomically (C.randomBytes 24 g) + (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- getMsgQueue ms rId - let write s = writeMsg ms q True =<< mkMessage s + q <- ExceptT $ addQueue ms qr + let write s = writeMsg ms rId q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" Just (Message {msgId = mId3}, False) <- write "message 3" - Msg "message 1" <- tryPeekMsg q - Msg "message 1" <- tryPeekMsg q - Nothing <- tryDelMsg q mId2 - Msg "message 1" <- tryDelMsg q mId1 - Nothing <- tryDelMsg q mId1 - Msg "message 2" <- tryPeekMsg q - Nothing <- tryDelMsg q mId1 - (Nothing, Msg "message 2") <- tryDelPeekMsg q mId1 - (Msg "message 2", Msg "message 3") <- tryDelPeekMsg q mId2 - (Nothing, Msg "message 3") <- tryDelPeekMsg q mId2 - Msg "message 3" <- tryPeekMsg q - (Msg "message 3", Nothing) <- tryDelPeekMsg q mId3 - Nothing <- tryDelMsg q mId2 - Nothing <- tryDelMsg q mId3 - Nothing <- tryPeekMsg q + Msg "message 1" <- tryPeekMsg ms rId q + Msg "message 1" <- tryPeekMsg ms rId q + Nothing <- tryDelMsg ms rId q mId2 + Msg "message 1" <- tryDelMsg ms rId q mId1 + Nothing <- tryDelMsg ms rId q mId1 + Msg "message 2" <- tryPeekMsg ms rId q + Nothing <- tryDelMsg ms rId q mId1 + (Nothing, Msg "message 2") <- tryDelPeekMsg ms rId q mId1 + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 + (Nothing, Msg "message 3") <- tryDelPeekMsg ms rId q mId2 + Msg "message 3" <- tryPeekMsg ms rId q + (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 + Nothing <- tryDelMsg ms rId q mId2 + Nothing <- tryDelMsg ms rId q mId3 + Nothing <- tryPeekMsg ms rId q Just (Message {msgId = mId4}, True) <- write "message 4" - Msg "message 4" <- tryPeekMsg q + Msg "message 4" <- tryPeekMsg ms rId q Just (Message {msgId = mId5}, False) <- write "message 5" - (Nothing, Msg "message 4") <- tryDelPeekMsg q mId3 - (Msg "message 4", Msg "message 5") <- tryDelPeekMsg q mId4 + (Nothing, Msg "message 4") <- tryDelPeekMsg ms rId q mId3 + (Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms rId q mId4 Just (Message {msgId = mId6}, False) <- write "message 6" Just (Message {msgId = mId7}, False) <- write "message 7" Nothing <- write "message 8" - Msg "message 5" <- tryPeekMsg q - (Nothing, Msg "message 5") <- tryDelPeekMsg q mId4 - (Msg "message 5", Msg "message 6") <- tryDelPeekMsg q mId5 - (Msg "message 6", Msg "message 7") <- tryDelPeekMsg q mId6 - (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg q mId7 - (Just MessageQuota {}, Nothing) <- tryDelPeekMsg q mId8 - (Nothing, Nothing) <- tryDelPeekMsg q mId8 - pure () - delMsgQueue ms rId + Msg "message 5" <- tryPeekMsg ms rId q + (Nothing, Msg "message 5") <- tryDelPeekMsg ms rId q mId4 + (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms rId q mId5 + (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms rId q mId6 + (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms rId q mId7 + (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms rId q mId8 + (Nothing, Nothing) <- tryDelPeekMsg ms rId q mId8 + void $ ExceptT $ deleteQueue ms rId q -testChangeReadJournal :: MsgStoreClass s => s -> IO () +testChangeReadJournal :: STMQueueStore s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom - rId <- EntityId <$> atomically (C.randomBytes 24 g) + (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- getMsgQueue ms rId - let write s = writeMsg ms q True =<< mkMessage s + q <- ExceptT $ addQueue ms qr + let write s = writeMsg ms rId q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" - (Msg "message 1", Nothing) <- tryDelPeekMsg q mId1 + (Msg "message 1", Nothing) <- tryDelPeekMsg ms rId q mId1 Just (Message {msgId = mId2}, True) <- write "message 2" - (Msg "message 2", Nothing) <- tryDelPeekMsg q mId2 + (Msg "message 2", Nothing) <- tryDelPeekMsg ms rId q mId2 Just (Message {msgId = mId3}, True) <- write "message 3" - (Msg "message 3", Nothing) <- tryDelPeekMsg q mId3 + (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 Just (Message {msgId = mId4}, True) <- write "message 4" - (Msg "message 4", Nothing) <- tryDelPeekMsg q mId4 + (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q mId4 Just (Message {msgId = mId5}, True) <- write "message 5" - (Msg "message 5", Nothing) <- tryDelPeekMsg q mId5 - pure () - delMsgQueue ms rId + (Msg "message 5", Nothing) <- tryDelPeekMsg ms rId q mId5 + void $ ExceptT $ deleteQueue ms rId q testExportImportStore :: JournalMsgStore -> IO () testExportImportStore ms = do g <- C.newRandom - rId1 <- EntityId <$> atomically (C.randomBytes 24 g) - rId2 <- EntityId <$> atomically (C.randomBytes 24 g) + (rId1, qr1) <- testNewQueueRec g True + (rId2, qr2) <- testNewQueueRec g True + sl <- readWriteQueueStore testStoreLogFile ms runRight_ $ do - let write q s = writeMsg ms q True =<< mkMessage s - q1 <- getMsgQueue ms rId1 - Just (Message {}, True) <- write q1 "message 1" - Just (Message {}, False) <- write q1 "message 2" - q2 <- getMsgQueue ms rId2 - Just (Message {msgId = mId3}, True) <- write q2 "message 3" - Just (Message {msgId = mId4}, False) <- write q2 "message 4" - (Msg "message 3", Msg "message 4") <- tryDelPeekMsg q2 mId3 - (Msg "message 4", Nothing) <- tryDelPeekMsg q2 mId4 - Just (Message {}, True) <- write q2 "message 5" - Just (Message {}, False) <- write q2 "message 6" - Just (Message {}, False) <- write q2 "message 7" - Nothing <- write q2 "message 8" + let write rId q s = writeMsg ms rId q True =<< mkMessage s + q1 <- ExceptT $ addQueue ms qr1 + liftIO $ logCreateQueue sl qr1 + Just (Message {}, True) <- write rId1 q1 "message 1" + Just (Message {}, False) <- write rId1 q1 "message 2" + q2 <- ExceptT $ addQueue ms qr2 + liftIO $ logCreateQueue sl qr2 + Just (Message {msgId = mId3}, True) <- write rId2 q2 "message 3" + Just (Message {msgId = mId4}, False) <- write rId2 q2 "message 4" + (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId2 q2 mId3 + (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId2 q2 mId4 + Just (Message {}, True) <- write rId2 q2 "message 5" + Just (Message {}, False) <- write rId2 q2 "message 6" + Just (Message {}, False) <- write rId2 q2 "message 7" + Nothing <- write rId2 q2 "message 8" pure () length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 exportMessages False ms testStoreMsgsFile False renameFile testStoreMsgsFile (testStoreMsgsFile <> ".copy") closeMsgStore ms + closeStoreLog sl exportMessages False ms testStoreMsgsFile False (B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy") let cfg = (testJournalStoreCfg :: JournalStoreConfig) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg + readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False ms' testStoreMsgsFile Nothing printMessageStats "Messages" stats @@ -183,6 +219,7 @@ testExportImportStore ms = do exportMessages False ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig + readWriteQueueStore testStoreLogFile stmStore >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing exportMessages False stmStore testStoreMsgsFile False @@ -256,24 +293,121 @@ testQueueState ms = do testMessageState :: JournalMsgStore -> IO () testMessageState ms = do g <- C.newRandom - rId <- EntityId <$> atomically (C.randomBytes 24 g) + (rId, qr) <- testNewQueueRec g True let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) - write q s = writeMsg ms q True =<< mkMessage s + write q s = writeMsg ms rId q True =<< mkMessage s mId1 <- runRight $ do - q <- getMsgQueue ms rId + q <- ExceptT $ addQueue ms qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" - liftIO $ closeMsgQueue ms rId + liftIO $ closeMsgQueue q pure mId1 ls <- B.lines <$> B.readFile statePath B.writeFile statePath $ B.unlines $ take (length ls - 1) ls runRight_ $ do - q <- getMsgQueue ms rId + q <- ExceptT $ getQueue ms SRecipient rId Just (Message {msgId = mId3}, False) <- write q "message 3" - (Msg "message 1", Msg "message 3") <- tryDelPeekMsg q mId1 - (Msg "message 3", Nothing) <- tryDelPeekMsg q mId3 - liftIO $ closeMsgQueueHandles q + (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms rId q mId1 + (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 + liftIO $ closeMsgQueue q + +testReadFileMissing :: JournalMsgStore -> IO () +testReadFileMissing ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + let write q s = writeMsg ms rId q True =<< mkMessage s + q <- runRight $ do + q <- ExceptT $ addQueue ms qr + Just (Message {}, True) <- write q "message 1" + Msg "message 1" <- tryPeekMsg ms rId q + pure q + + mq <- fromJust <$> readTVarIO (msgQueue_' q) + MsgQueueState {readState = rs} <- readTVarIO $ state mq + closeMsgStore ms + let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs + removeFile path + + runRight_ $ do + q' <- ExceptT $ getQueue ms SRecipient rId + Nothing <- tryPeekMsg ms rId q' + Just (Message {}, True) <- write q' "message 2" + Msg "message 2" <- tryPeekMsg ms rId q' + pure () + +testReadFileMissingSwitch :: JournalMsgStore -> IO () +testReadFileMissingSwitch ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + q <- writeMessages ms rId qr + + mq <- fromJust <$> readTVarIO (msgQueue_' q) + MsgQueueState {readState = rs} <- readTVarIO $ state mq + closeMsgStore ms + let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs + removeFile path + + runRight_ $ do + q' <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, False) <- writeMsg ms rId q' True =<< mkMessage "message 6" + Msg "message 5" <- tryPeekMsg ms rId q' + pure () + +testWriteFileMissing :: JournalMsgStore -> IO () +testWriteFileMissing ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + q <- writeMessages ms rId qr + + mq <- fromJust <$> readTVarIO (msgQueue_' q) + MsgQueueState {writeState = ws} <- readTVarIO $ state mq + closeMsgStore ms + let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws + print path + removeFile path + + runRight_ $ do + q' <- ExceptT $ getQueue ms SRecipient rId + Just Message {msgId = mId3} <- tryPeekMsg ms rId q' + (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId q' mId3 + Just Message {msgId = mId4} <- tryPeekMsg ms rId q' + (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q' mId4 + Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" + Msg "message 6" <- tryPeekMsg ms rId q' + pure () + +testReadAndWriteFilesMissing :: JournalMsgStore -> IO () +testReadAndWriteFilesMissing ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + q <- writeMessages ms rId qr + + mq <- fromJust <$> readTVarIO (msgQueue_' q) + MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq + closeMsgStore ms + removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs + removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId ws + + runRight_ $ do + q' <- ExceptT $ getQueue ms SRecipient rId + Nothing <- tryPeekMsg ms rId q' + Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" + Msg "message 6" <- tryPeekMsg ms rId q' + pure () + +writeMessages :: JournalMsgStore -> RecipientId -> QueueRec -> IO JournalQueue +writeMessages ms rId qr = runRight $ do + q <- ExceptT $ addQueue ms qr + let write s = writeMsg ms rId q True =<< mkMessage s + Just (Message {msgId = mId1}, True) <- write "message 1" + Just (Message {msgId = mId2}, False) <- write "message 2" + Just (Message {}, False) <- write "message 3" + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms rId q mId1 + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 + Just (Message {}, False) <- write "message 4" + Just (Message {}, False) <- write "message 5" + pure q diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 7ee5767fd..e24f9f1ea 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -16,32 +16,17 @@ import Data.Either (partitionEithers) import qualified Data.Map.Strict as M import SMPClient import AgentTests.SQLiteTests +import CoreTests.MsgStoreTests import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol -import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.Server.Env.STM (readWriteQueueStore) +import Simplex.Messaging.Server.MsgStore.Journal +import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore -import Simplex.Messaging.Server.QueueStore.STM (QueueStore (..), newQueueStore) +import Simplex.Messaging.Server.StoreLog import Test.Hspec -testNewQueueRec :: TVar ChaChaDRG -> Bool -> IO QueueRec -testNewQueueRec g sndSecure = do - recipientId <- atomically $ EntityId <$> C.randomBytes 24 g - senderId <- atomically $ EntityId <$> C.randomBytes 24 g - (recipientKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g - (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g - pure QueueRec - { recipientId, - recipientKey, - rcvDhSecret = C.dh' k pk, - senderId, - senderKey = Nothing, - sndSecure, - notifier = Nothing, - status = QueueActive, - updatedAt = Nothing - } - testNtfCreds :: TVar ChaChaDRG -> IO NtfCreds testNtfCreds g = do (notifierKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g @@ -66,7 +51,7 @@ deriving instance Eq NtfCreds storeLogTests :: Spec storeLogTests = forM_ [False, True] $ \sndSecure -> do - (qr, ntfCreds, date) <- runIO $ do + ((rId, qr), ntfCreds, date) <- runIO $ do g <- C.newRandom (,,) <$> testNewQueueRec g sndSecure <*> testNtfCreds g <*> getSystemDate testSMPStoreLog ("SMP server store log, sndSecure = " <> show sndSecure) @@ -74,37 +59,37 @@ storeLogTests = { name = "create new queue", saved = [CreateQueue qr], compacted = [CreateQueue qr], - state = M.fromList [(recipientId qr, qr)] + state = M.fromList [(rId, qr)] }, SLTC { name = "secure queue", - saved = [CreateQueue qr, SecureQueue (recipientId qr) testPublicAuthKey], + saved = [CreateQueue qr, SecureQueue rId testPublicAuthKey], compacted = [CreateQueue qr {senderKey = Just testPublicAuthKey}], - state = M.fromList [(recipientId qr, qr {senderKey = Just testPublicAuthKey})] + state = M.fromList [(rId, qr {senderKey = Just testPublicAuthKey})] }, SLTC { name = "create and delete queue", - saved = [CreateQueue qr, DeleteQueue $ recipientId qr], + saved = [CreateQueue qr, DeleteQueue rId], compacted = [], state = M.fromList [] }, SLTC { name = "create queue and add notifier", - saved = [CreateQueue qr, AddNotifier (recipientId qr) ntfCreds], + saved = [CreateQueue qr, AddNotifier rId ntfCreds], compacted = [CreateQueue $ qr {notifier = Just ntfCreds}], - state = M.fromList [(recipientId qr, qr {notifier = Just ntfCreds})] + state = M.fromList [(rId, qr {notifier = Just ntfCreds})] }, SLTC { name = "delete notifier", - saved = [CreateQueue qr, AddNotifier (recipientId qr) ntfCreds, DeleteNotifier (recipientId qr)], + saved = [CreateQueue qr, AddNotifier rId ntfCreds, DeleteNotifier rId], compacted = [CreateQueue qr], - state = M.fromList [(recipientId qr, qr)] + state = M.fromList [(rId, qr)] }, SLTC { name = "update time", - saved = [CreateQueue qr, UpdateTime (recipientId qr) date], + saved = [CreateQueue qr, UpdateTime rId date], compacted = [CreateQueue qr {updatedAt = Just date}], - state = M.fromList [(recipientId qr, qr {updatedAt = Just date})] + state = M.fromList [(rId, qr {updatedAt = Just date})] } ] @@ -117,11 +102,11 @@ testSMPStoreLog testSuite tests = replicateM_ 3 $ testReadWrite t where testReadWrite SLTC {compacted, state} = do - st <- newQueueStore + st <- newMsgStore testJournalStoreCfg l <- readWriteQueueStore testStoreLogFile st storeState st `shouldReturn` state closeStoreLog l ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile compacted' `shouldBe` compacted - storeState :: QueueStore -> IO (M.Map RecipientId QueueRec) - storeState st = readTVarIO (queues st) >>= mapM readTVarIO + storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec) + storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec')) diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index e80d39897..28073d5e4 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -37,11 +37,12 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol import Simplex.Messaging.Server (exportMessages) -import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) +import Simplex.Messaging.Server.Env.STM (ServerConfig (..), readWriteQueueStore) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..)) import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore) import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) +import Simplex.Messaging.Server.StoreLog (closeStoreLog) import Simplex.Messaging.Transport import Simplex.Messaging.Util (whenM) import Simplex.Messaging.Version (mkVersionRange) @@ -812,6 +813,7 @@ testRestoreExpireMessages = exportStoreMessages = \case AMSType SMSJournal -> do ms <- newMsgStore testJournalStoreCfg {quota = 4} + readWriteQueueStore testStoreLogFile ms >>= closeStoreLog removeFileIfExists testStoreMsgsFile exportMessages False ms testStoreMsgsFile False AMSType SMSMemory -> pure ()