From 5940514f40f1e25ede535f4bb6502a5bc6e5321e Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sat, 26 Oct 2024 16:14:55 +0100 Subject: [PATCH] smp server: remove queue from map when closing, test (#1392) * smp server: remove queue from map when closing, test * remove print * refactor --- cabal.project | 8 +++++ package.yaml | 1 + src/Simplex/Messaging/Server.hs | 4 +-- .../Messaging/Server/MsgStore/Journal.hs | 28 +++++++++------- tests/CoreTests/MsgStoreTests.hs | 32 +++++++++++++++++-- 5 files changed, 57 insertions(+), 16 deletions(-) diff --git a/cabal.project b/cabal.project index 1a4745ffc..b26c04055 100644 --- a/cabal.project +++ b/cabal.project @@ -4,6 +4,14 @@ packages: . -- packages: . ../http2 -- packages: . ../network-transport +-- package * + -- coverage: True + -- library-coverage: True + +-- package attoparsec + -- coverage: False + -- library-coverage: False + index-state: 2023-12-12T00:00:00Z package cryptostore diff --git a/package.yaml b/package.yaml index f4efaf5bc..f3c1e71c5 100644 --- a/package.yaml +++ b/package.yaml @@ -188,6 +188,7 @@ tests: - -rtsopts - -with-rtsopts=-A64M - -with-rtsopts=-N1 + # - -fhpc ghc-options: # - -haddock diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 38d718fd2..ea00c9591 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -94,7 +94,7 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueue) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgQueue (..), JMQueue (..), closeMsgQueueHandles) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore @@ -1788,7 +1788,7 @@ processServerMessages = do expired'' <- deleteExpiredMsgs q False old stored'' <- liftIO $ getQueueSize q liftIO $ logQueueState q - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueueHandles q pure (stored'', expired'') processValidateQueue q = getQueueSize q >>= \storedMsgsCount -> pure mempty {storedMsgsCount, storedQueues = 1} diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 5324c9997..d40abc67f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -20,11 +20,13 @@ module Simplex.Messaging.Server.MsgStore.Journal JournalStoreConfig (..), getQueueMessages, closeMsgQueue, + closeMsgQueueHandles, -- below are exported for tests MsgQueueState (..), JournalState (..), SJournalType (..), msgQueueDirectory, + msgQueueStatePath, readWriteQueueState, newMsgQueueState, newJournalId, @@ -47,6 +49,7 @@ import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate) +import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -207,7 +210,7 @@ instance MsgStoreClass JournalMsgStore where msgQueues <- TM.emptyIO pure JournalMsgStore {config, random, queueLocks, msgQueues} - closeMsgStore st = readTVarIO (msgQueues st) >>= mapM_ closeMsgQueue + closeMsgStore st = atomically (swapTVar (msgQueues st) M.empty) >>= mapM_ closeMsgQueueHandles activeMsgQueues = msgQueues {-# INLINE activeMsgQueues #-} @@ -236,7 +239,7 @@ instance MsgStoreClass JournalMsgStore where Left e -> do putStrLn ("Error: message queue directory " <> dir <> " is invalid: " <> e) exitFailure - closeMsgQueue q + closeMsgQueueHandles q pure (i + 1, r <> r') progress i = "Processed: " <> show i <> " queues" foldQueues depth f acc (queueId, path) = do @@ -283,15 +286,16 @@ instance MsgStoreClass JournalMsgStore where delMsgQueue :: JournalMsgStore -> RecipientId -> IO () delMsgQueue ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do - void $ deleteMsgQueue_ ms rId + closeMsgQueue ms rId removeQueueDirectory ms rId delMsgQueueSize :: JournalMsgStore -> RecipientId -> IO Int delMsgQueueSize ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do - state_ <- deleteMsgQueue_ ms rId - sz <- maybe (pure $ -1) (fmap size . readTVarIO) state_ + st_ <- + atomically (TM.lookupDelete rId (msgQueues ms)) + >>= mapM (\q -> closeMsgQueueHandles q >> readTVarIO (state q)) removeQueueDirectory ms rId - pure sz + pure $ maybe (-1) size st_ getQueueMessages :: Bool -> JournalMsgQueue -> IO [Message] getQueueMessages drainMsgs q = run [] @@ -587,13 +591,13 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws -deleteMsgQueue_ :: JournalMsgStore -> RecipientId -> IO (Maybe (TVar MsgQueueState)) -deleteMsgQueue_ st rId = - atomically (TM.lookupDelete rId (msgQueues st)) - >>= mapM (\q -> closeMsgQueue q $> state q) +closeMsgQueue :: JournalMsgStore -> RecipientId -> IO () +closeMsgQueue ms rId = + atomically (TM.lookupDelete rId (msgQueues ms)) + >>= mapM_ closeMsgQueueHandles -closeMsgQueue :: JournalMsgQueue -> IO () -closeMsgQueue q = readTVarIO (handles q) >>= mapM_ closeHandles +closeMsgQueueHandles :: JournalMsgQueue -> IO () +closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles where closeHandles (MsgQueueHandles sh rh wh_) = do hClose sh diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 4a5d95eb2..5166ce7ef 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -5,19 +5,21 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} module CoreTests.MsgStoreTests where -import AgentTests.FunctionalAPITests (runRight_) +import AgentTests.FunctionalAPITests (runRight, runRight_) import Control.Concurrent.STM import Control.Exception (bracket) import Control.Monad import Control.Monad.IO.Class import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Base64.URL as B64 import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C @@ -41,6 +43,7 @@ msgStoreTests = do it "should export and import journal store" testExportImportStore describe "queue state" $ do it "should restore queue state from the last line" testQueueState + it "should recover when message is written and state is not" testMessageState where someMsgStoreTests :: MsgStoreClass s => SpecWith s someMsgStoreTests = do @@ -189,7 +192,7 @@ testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) let dir = msgQueueDirectory ms rId - statePath = dir (queueLogFileName <> logFileExt) + statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) createDirectoryIfMissing True dir state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) @@ -248,3 +251,28 @@ testQueueState ms = do forM_ names $ \name -> let f = dir name in unless (f == keep) $ removeFile f + +testMessageState :: JournalMsgStore -> IO () +testMessageState ms = do + g <- C.newRandom + rId <- EntityId <$> atomically (C.randomBytes 24 g) + let dir = msgQueueDirectory ms rId + statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + write q s = writeMsg ms q True =<< mkMessage s + + mId1 <- runRight $ do + q <- getMsgQueue ms rId + Just (Message {msgId = mId1}, True) <- write q "message 1" + Just (Message {}, False) <- write q "message 2" + liftIO $ closeMsgQueue ms rId + pure mId1 + + ls <- B.lines <$> B.readFile statePath + B.writeFile statePath $ B.unlines $ take (length ls - 1) ls + + runRight_ $ do + q <- getMsgQueue ms rId + Just (Message {msgId = mId3}, False) <- write q "message 3" + (Msg "message 1", Msg "message 3") <- tryDelPeekMsg q mId1 + (Msg "message 3", Nothing) <- tryDelPeekMsg q mId3 + liftIO $ closeMsgQueueHandles q