smp server: expire messages in idle message queues (including not opened) (#1403)

* smp server: expire messages in idle message queues (including not opened)

* use message expiration interval

* simpler

* version

* remove version
This commit is contained in:
Evgeny
2024-11-15 10:26:24 +00:00
committed by GitHub
parent 3017d14392
commit 17a0be10fa
9 changed files with 84 additions and 47 deletions
+9 -12
View File
@@ -296,7 +296,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
deliverNtfs ns stats (AClient _ Client {clientId, ntfSubscriptions, sndQ, connected}) =
whenM (currentClient readTVarIO) $ do
subs <- readTVarIO ntfSubscriptions
logDebug $ "NOTIFICATIONS: client #" <> tshow clientId <> " is current with " <> tshow (M.size subs) <> " subs"
ntfQs <- M.assocs . M.filterWithKey (\nId _ -> M.member nId subs) <$> readTVarIO ns
tryAny (atomically $ flushSubscribedNtfs ntfQs) >>= \case
Right len -> updateNtfStats len
@@ -321,12 +320,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
writeTVar v []
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta)
updateNtfStats 0 = logDebug $ "NOTIFICATIONS: no ntfs for client #" <> tshow clientId
updateNtfStats 0 = pure ()
updateNtfStats len = liftIO $ do
atomicModifyIORef'_ (ntfCount stats) (subtract len)
atomicModifyIORef'_ (msgNtfs stats) (+ len)
atomicModifyIORef'_ (msgNtfsB stats) (+ (len `div` 80 + 1)) -- up to 80 NMSG in the batch
logDebug $ "NOTIFICATIONS: delivered to client #" <> tshow clientId <> " " <> tshow len <> " ntfs"
sendPendingEvtsThread :: Server -> M ()
sendPendingEvtsThread s = do
@@ -386,13 +384,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
liftIO $ forever $ do
threadDelay' interval
old <- expireBeforeEpoch expCfg
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs stats old
now <- systemSeconds <$> getSystemTime
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old
atomicModifyIORef'_ (msgExpired stats) (+ deleted)
logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages"
where
expireQueueMsgs stats old rId q =
runExceptT (deleteExpiredMsgs rId q True old) >>= \case
Right deleted -> Sum deleted <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
Left _ -> pure 0
expireQueueMsgs now ms old rId q =
either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old)
expireNtfsThread :: ServerConfig -> M ()
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
@@ -1469,7 +1467,7 @@ client
expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO ()
expireMessages msgExp stats = do
deleted <- maybe (pure 0) (deleteExpiredMsgs (recipientId qr) q True <=< liftIO . expireBeforeEpoch) msgExp
deleted <- maybe (pure 0) (deleteExpiredMsgs ms (recipientId qr) q <=< liftIO . expireBeforeEpoch) msgExp
liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
-- The condition for delivery of the message is:
@@ -1763,9 +1761,8 @@ processServerMessages = do
exitFailure
where
expireQueue = do
expired'' <- deleteExpiredMsgs rId q False old
expired'' <- deleteExpiredMsgs ms rId q old
stored'' <- getQueueSize ms rId q
liftIO $ logQueueState q
liftIO $ closeMsgQueue q
pure (stored'', expired'')
processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats
@@ -1823,7 +1820,7 @@ importMessages tty ms f old_ = do
-- if the first message in queue head is "quota", remove it.
mergeQuotaMsgs = withMsgQueue ms rId q "mergeQuotaMsgs" $ \mq ->
tryPeekMsg_ mq >>= \case
Just MessageQuota {} -> tryDeleteMsg_ mq False
Just MessageQuota {} -> tryDeleteMsg_ q mq False
_ -> pure ()
msgErr :: Show e => String -> e -> String
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
+3 -2
View File
@@ -121,7 +121,7 @@ defaultMessageExpiration :: ExpirationConfig
defaultMessageExpiration =
ExpirationConfig
{ ttl = defMsgExpirationDays * 86400, -- seconds
checkInterval = 43200 -- seconds, 12 hours
checkInterval = 21600 -- seconds, 6 hours
}
defNtfExpirationHours :: Int64
@@ -290,7 +290,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize}
let idleInterval = maybe maxBound checkInterval messageExpiration
cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
+1 -1
View File
@@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
doesFileExist iniFile >>= \case
True -> readIniFile iniFile >>= either exitError a
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize}
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
iniFile = combine cfgPath "smp-server.ini"
serverVersion = "SMP server v" <> simplexMQVersion
defaultServerPorts = "5223,443"
@@ -53,6 +53,7 @@ import Data.List (intercalate)
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
@@ -92,7 +93,9 @@ data JournalStoreConfig = JournalStoreConfig
-- This number should be set bigger than queue quota.
maxMsgCount :: Int,
maxStateLines :: Int,
stateTailSize :: Int
stateTailSize :: Int,
-- time in seconds after which the queue will be closed after message expiration
idleInterval :: Int64
}
data JournalQueue = JournalQueue
@@ -100,7 +103,9 @@ data JournalQueue = JournalQueue
-- To avoid race conditions and errors when restoring queues,
-- Nothing is written to TVar when queue is deleted.
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe JournalMsgQueue)
msgQueue_ :: TVar (Maybe JournalMsgQueue),
-- system time in seconds since epoch
activeAt :: TVar Int64
}
data JMQueue = JMQueue
@@ -221,7 +226,8 @@ instance STMQueueStore JournalMsgStore where
lock <- getMapLock (queueLocks st) $ recipientId qr
q <- newTVar $! Just qr
mq <- newTVar Nothing
pure $ JournalQueue lock q mq
activeAt <- newTVar 0
pure $ JournalQueue lock q mq activeAt
msgQueue_' = msgQueue_
instance MsgStoreClass JournalMsgStore where
@@ -295,11 +301,11 @@ instance MsgStoreClass JournalMsgStore where
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
logQueueStates :: JournalMsgStore -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ \_ -> logQueueState
logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState
logQueueState :: JournalQueue -> IO ()
logQueueState :: JournalQueue -> StoreIO ()
logQueueState q =
void $
StoreIO . void $
readTVarIO (msgQueue_ q)
$>>= \mq -> readTVarIO (handles mq)
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
@@ -326,9 +332,20 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing
openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue)
openedMsgQueue = StoreIO . readTVarIO . msgQueue_
{-# INLINE openedMsgQueue #-}
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
Just <$>
E.bracket
(unStoreIO $ getMsgQueue ms rId q)
(\_ -> closeMsgQueue q)
(unStoreIO . action)
Just mq -> do
ts <- readTVarIO $ activeAt q
if now - ts >= idleInterval config
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
else pure Nothing
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
@@ -355,7 +372,7 @@ instance MsgStoreClass JournalMsgStore where
writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do
q <- getMsgQueue ms rId q'
StoreIO $ do
StoreIO $ (`E.finally` updateActiveAt q') $ do
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
let empty = size == 0
if canWrite || empty
@@ -419,18 +436,21 @@ instance MsgStoreClass JournalMsgStore where
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO ()
tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $
tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
void $
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
$>>= (pure . fmap snd)
$>>= \len -> readTVarIO handles
$>>= \hs -> updateReadPos q logState len hs $> Just ()
$>>= \hs -> updateReadPos mq logState len hs $> Just ()
isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
isolateQueue rId JournalQueue {queueLock} op =
tryStore' op rId . withLock' queueLock op . unStoreIO
updateActiveAt :: JournalQueue -> IO ()
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' op rId = tryStore op rId . fmap Right
+7 -5
View File
@@ -21,6 +21,7 @@ import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
@@ -108,9 +109,10 @@ instance MsgStoreClass STMMsgStore where
writeTVar msgQueue_ $! Just q
pure q
openedMsgQueue :: STMQueue -> STM (Maybe STMMsgQueue)
openedMsgQueue = readTVar . msgQueue_
{-# INLINE openedMsgQueue #-}
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action
{-# INLINE withIdleMsgQueue #-}
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
@@ -157,8 +159,8 @@ instance MsgStoreClass STMMsgStore where
tryPeekMsg_ = tryPeekTQueue . msgQueue
{-# INLINE tryPeekMsg_ #-}
tryDeleteMsg_ :: STMMsgQueue -> Bool -> STM ()
tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} _logState =
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
tryDeleteMsg_ _ STMMsgQueue {msgQueue = q, size} _logState =
tryReadTQueue q >>= \case
Just _ -> modifyTVar' size (subtract 1)
_ -> pure ()
+25 -10
View File
@@ -15,6 +15,7 @@ import Control.Monad (foldM)
import Control.Monad.Trans.Except
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
@@ -42,10 +43,11 @@ class Monad (StoreMonad s) => MsgStoreClass s where
activeMsgQueues :: s -> TMap RecipientId (StoreQueue s)
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
openedMsgQueue :: StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
@@ -53,7 +55,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s ()
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
data MSType = MSMemory | MSJournal
@@ -89,7 +91,7 @@ tryDelMsg st rId q msgId' =
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' ->
tryDeleteMsg_ mq True >> pure msg_
tryDeleteMsg_ q mq True >> pure msg_
_ -> pure Nothing
-- atomic delete (== read) last and peek next message if available
@@ -98,7 +100,7 @@ tryDelPeekMsg st rId q msgId' =
withMsgQueue st rId q "tryDelPeekMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)
@@ -106,13 +108,26 @@ withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String ->
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
{-# INLINE withMsgQueue #-}
deleteExpiredMsgs :: MsgStoreClass s => RecipientId -> StoreQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs rId q logState old =
isolateQueue rId q "deleteExpiredMsgs" $ openedMsgQueue q >>= maybe (pure 0) (loop 0)
deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st rId q old =
isolateQueue rId q "deleteExpiredMsgs" $
getMsgQueue st rId q >>= deleteExpireMsgs_ old q
-- closed and idle queues will be closed after expiration
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
idleDeleteExpiredMsgs now st rId q old =
isolateQueue rId q "idleDeleteExpiredMsgs" $
fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
n <- loop 0
logQueueState q
pure n
where
loop dc mq =
loop dc =
tryPeekMsg_ mq >>= \case
Just Message {msgTs}
| systemSeconds msgTs < old ->
tryDeleteMsg_ mq logState >> loop (dc + 1) mq
tryDeleteMsg_ q mq False >> loop (dc + 1)
_ -> pure dc
+3 -2
View File
@@ -3076,8 +3076,9 @@ testTwoUsers = withAgentClients2 $ \a b -> do
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
-- to avoice race condition
nGet a =##> \case ("", "", DOWN _ _) -> True; ("", "", UP _ _) -> True; _ -> False
nGet a =##> \case ("", "", UP _ _) -> True; ("", "", DOWN _ _) -> True; _ -> False
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
+2 -1
View File
@@ -77,7 +77,8 @@ testJournalStoreCfg =
quota = 3,
maxMsgCount = 4,
maxStateLines = 2,
stateTailSize = 256
stateTailSize = 256,
idleInterval = 21600
}
mkMessage :: MonadIO m => ByteString -> m Message
+1 -1
View File
@@ -66,7 +66,7 @@ testRetryIntervalSameMode =
testRetryIntervalSwitchMode :: Spec
testRetryIntervalSwitchMode =
it "should increase elapased time and interval when the mode stays the same" $ do
it "should increase elapased time and interval when the mode switches" $ do
lock <- newEmptyTMVarIO
intervals <- newTVarIO []
reportedIntervals <- newTVarIO []