mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-25 20:42:15 +00:00
* smp server: always allow services without option * smp server: maintain IDs hash in session subscription states * smp server: service message delivery error handling * ntf server: log subscription count and hash differences * smp server: remove delivery threads when service subscription ended/client disconnected
1084 lines
44 KiB
Haskell
1084 lines
44 KiB
Haskell
{-# LANGUAGE BangPatterns #-}
|
|
{-# LANGUAGE CPP #-}
|
|
{-# LANGUAGE DataKinds #-}
|
|
{-# LANGUAGE DerivingStrategies #-}
|
|
{-# LANGUAGE DuplicateRecordFields #-}
|
|
{-# LANGUAGE FlexibleContexts #-}
|
|
{-# LANGUAGE FlexibleInstances #-}
|
|
{-# LANGUAGE GADTs #-}
|
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
|
{-# LANGUAGE InstanceSigs #-}
|
|
{-# LANGUAGE LambdaCase #-}
|
|
{-# LANGUAGE MultiWayIf #-}
|
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
|
{-# LANGUAGE NamedFieldPuns #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
{-# LANGUAGE StandaloneDeriving #-}
|
|
{-# LANGUAGE TypeApplications #-}
|
|
{-# LANGUAGE TypeFamilies #-}
|
|
{-# LANGUAGE TupleSections #-}
|
|
|
|
module Simplex.Messaging.Server.MsgStore.Journal
|
|
( JournalMsgStore (random, expireBackupsBefore),
|
|
QStore (..),
|
|
QStoreCfg (..),
|
|
JournalQueue (msgQueue'), -- msgQueue' is used in tests
|
|
JournalMsgQueue (queue, state),
|
|
JMQueue (queueDirectory, statePath),
|
|
JournalStoreConfig (..),
|
|
closeMsgQueue,
|
|
closeMsgQueueHandles,
|
|
-- below are exported for tests
|
|
MsgQueueState (..),
|
|
JournalState (..),
|
|
SJournalType (..),
|
|
msgQueueDirectory,
|
|
msgQueueStatePath,
|
|
readQueueState,
|
|
newMsgQueueState,
|
|
getJournalQueueMessages,
|
|
newJournalId,
|
|
appendState,
|
|
queueLogFileName,
|
|
journalFilePath,
|
|
logFileExt,
|
|
stmQueueStore,
|
|
#if defined(dbServerPostgres)
|
|
postgresQueueStore,
|
|
#endif
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent.STM
|
|
import qualified Control.Exception as E
|
|
import Control.Logger.Simple
|
|
import Control.Monad
|
|
import Control.Monad.Trans.Except
|
|
import qualified Data.Attoparsec.ByteString.Char8 as A
|
|
import Data.ByteString.Char8 (ByteString)
|
|
import qualified Data.ByteString.Char8 as B
|
|
import Data.Either (fromRight, partitionEithers)
|
|
import Data.Functor (($>))
|
|
import Data.Int (Int64)
|
|
import Data.List (sort)
|
|
import qualified Data.Map.Strict as M
|
|
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
|
|
import Data.Text (Text)
|
|
import qualified Data.Text as T
|
|
import Data.Text.Encoding (decodeLatin1)
|
|
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
|
|
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
|
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
|
|
import GHC.IO (catchAny)
|
|
import Simplex.Messaging.Agent.Client (getMapLock)
|
|
import Simplex.Messaging.Agent.Lock
|
|
import Simplex.Messaging.Encoding.String
|
|
import Simplex.Messaging.Protocol
|
|
import Simplex.Messaging.Server.MsgStore.Journal.SharedLock
|
|
import Simplex.Messaging.Server.MsgStore.Types
|
|
import Simplex.Messaging.Server.QueueStore
|
|
#if defined(dbServerPostgres)
|
|
import Simplex.Messaging.Server.QueueStore.Postgres
|
|
#endif
|
|
import Simplex.Messaging.Server.QueueStore.STM
|
|
import Simplex.Messaging.Server.QueueStore.Types
|
|
import Simplex.Messaging.SystemTime
|
|
import Simplex.Messaging.TMap (TMap)
|
|
import qualified Simplex.Messaging.TMap as TM
|
|
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
|
|
import System.Directory
|
|
import System.FilePath (takeFileName, (</>))
|
|
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
|
|
import qualified System.IO as IO
|
|
import System.Random (StdGen, genByteString, newStdGen)
|
|
|
|
data JournalMsgStore s = JournalMsgStore
|
|
{ config :: JournalStoreConfig s,
|
|
random :: TVar StdGen,
|
|
queueLocks :: TMap RecipientId Lock,
|
|
sharedLock :: TMVar RecipientId,
|
|
queueStore_ :: QStore s,
|
|
openedQueueCount :: TVar Int,
|
|
expireBackupsBefore :: UTCTime
|
|
}
|
|
|
|
data QStore (s :: QSType) where
|
|
MQStore :: QStoreType 'QSMemory -> QStore 'QSMemory
|
|
#if defined(dbServerPostgres)
|
|
PQStore :: QStoreType 'QSPostgres -> QStore 'QSPostgres
|
|
#endif
|
|
|
|
type family QStoreType s where
|
|
QStoreType 'QSMemory = STMQueueStore (JournalQueue 'QSMemory)
|
|
#if defined(dbServerPostgres)
|
|
QStoreType 'QSPostgres = PostgresQueueStore (JournalQueue 'QSPostgres)
|
|
#endif
|
|
|
|
withQS :: (QueueStoreClass (JournalQueue s) (QStoreType s) => QStoreType s -> r) -> QStore s -> r
|
|
withQS f = \case
|
|
MQStore st -> f st
|
|
#if defined(dbServerPostgres)
|
|
PQStore st -> f st
|
|
#endif
|
|
{-# INLINE withQS #-}
|
|
|
|
stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
|
|
stmQueueStore st = case queueStore_ st of
|
|
MQStore st' -> st'
|
|
|
|
#if defined(dbServerPostgres)
|
|
postgresQueueStore :: JournalMsgStore 'QSPostgres -> PostgresQueueStore (JournalQueue 'QSPostgres)
|
|
postgresQueueStore st = case queueStore_ st of
|
|
PQStore st' -> st'
|
|
#endif
|
|
|
|
data JournalStoreConfig s = JournalStoreConfig
|
|
{ storePath :: FilePath,
|
|
pathParts :: Int,
|
|
queueStoreCfg :: QStoreCfg s,
|
|
quota :: Int,
|
|
-- Max number of messages per journal file - ignored in STM store.
|
|
-- When this limit is reached, the file will be changed.
|
|
-- This number should be set bigger than queue quota.
|
|
maxMsgCount :: Int,
|
|
maxStateLines :: Int,
|
|
stateTailSize :: Int,
|
|
-- time in seconds after which the queue will be closed after message expiration
|
|
idleInterval :: Int64,
|
|
-- expire state backup files
|
|
expireBackupsAfter :: NominalDiffTime,
|
|
keepMinBackups :: Int
|
|
}
|
|
|
|
data QStoreCfg s where
|
|
MQStoreCfg :: QStoreCfg 'QSMemory
|
|
#if defined(dbServerPostgres)
|
|
PQStoreCfg :: PostgresStoreCfg -> QStoreCfg 'QSPostgres
|
|
#endif
|
|
|
|
data JournalQueue (s :: QSType) = JournalQueue
|
|
{ recipientId' :: RecipientId,
|
|
queueLock :: Lock,
|
|
sharedLock :: TMVar RecipientId,
|
|
-- To avoid race conditions and errors when restoring queues,
|
|
-- Nothing is written to TVar when queue is deleted.
|
|
queueRec' :: TVar (Maybe QueueRec),
|
|
msgQueue' :: TVar (Maybe (JournalMsgQueue s)),
|
|
-- system time in seconds since epoch
|
|
activeAt :: TVar Int64,
|
|
queueState :: TVar (Maybe QState) -- Nothing - unknown
|
|
}
|
|
|
|
data QState = QState
|
|
{ hasPending :: Bool,
|
|
hasStored :: Bool
|
|
}
|
|
|
|
data JMQueue = JMQueue
|
|
{ queueDirectory :: FilePath,
|
|
statePath :: FilePath
|
|
}
|
|
|
|
data JournalMsgQueue (s :: QSType) = JournalMsgQueue
|
|
{ queue :: JMQueue,
|
|
state :: TVar MsgQueueState,
|
|
-- tipMsg contains last message and length incl. newline
|
|
-- Nothing - unknown, Just Nothing - empty queue.
|
|
-- It prevents reading each message twice,
|
|
-- and reading it after it was just written.
|
|
tipMsg :: TVar (Maybe (Maybe (Message, Int64))),
|
|
handles :: TVar (Maybe MsgQueueHandles)
|
|
}
|
|
|
|
data MsgQueueState = MsgQueueState
|
|
{ readState :: JournalState 'JTRead,
|
|
writeState :: JournalState 'JTWrite,
|
|
canWrite :: Bool,
|
|
size :: Int
|
|
}
|
|
deriving (Show)
|
|
|
|
data MsgQueueHandles = MsgQueueHandles
|
|
{ stateHandle :: Handle, -- handle to queue state log file, rotates and removes old backups when server is restarted
|
|
readHandle :: Handle,
|
|
writeHandle :: Maybe Handle -- optional, used when write file is different from read file
|
|
}
|
|
|
|
data JournalState t = JournalState
|
|
{ journalType :: SJournalType t,
|
|
journalId :: ByteString,
|
|
msgPos :: Int,
|
|
msgCount :: Int,
|
|
bytePos :: Int64,
|
|
byteCount :: Int64
|
|
}
|
|
deriving (Show)
|
|
|
|
qState :: MsgQueueState -> QState
|
|
qState MsgQueueState {size, readState = rs, writeState = ws} =
|
|
let hasPending = size > 0
|
|
in QState {hasPending, hasStored = hasPending || msgCount rs > 0 || msgCount ws > 0}
|
|
{-# INLINE qState #-}
|
|
|
|
data JournalType = JTRead | JTWrite
|
|
|
|
data SJournalType (t :: JournalType) where
|
|
SJTRead :: SJournalType 'JTRead
|
|
SJTWrite :: SJournalType 'JTWrite
|
|
|
|
class JournalTypeI t where sJournalType :: SJournalType t
|
|
|
|
instance JournalTypeI 'JTRead where sJournalType = SJTRead
|
|
|
|
instance JournalTypeI 'JTWrite where sJournalType = SJTWrite
|
|
|
|
deriving instance Show (SJournalType t)
|
|
|
|
newMsgQueueState :: ByteString -> MsgQueueState
|
|
newMsgQueueState journalId =
|
|
MsgQueueState
|
|
{ writeState = newJournalState journalId,
|
|
readState = newJournalState journalId,
|
|
canWrite = True,
|
|
size = 0
|
|
}
|
|
|
|
newJournalState :: JournalTypeI t => ByteString -> JournalState t
|
|
newJournalState journalId = JournalState sJournalType journalId 0 0 0 0
|
|
|
|
journalFilePath :: FilePath -> ByteString -> FilePath
|
|
journalFilePath dir journalId = dir </> (msgLogFileName <> "." <> B.unpack journalId <> logFileExt)
|
|
|
|
instance StrEncoding MsgQueueState where
|
|
strEncode MsgQueueState {writeState, readState, canWrite, size} =
|
|
B.unwords
|
|
[ "write=" <> strEncode writeState,
|
|
"read=" <> strEncode readState,
|
|
"canWrite=" <> strEncode canWrite,
|
|
"size=" <> strEncode size
|
|
]
|
|
strP = do
|
|
writeState <- "write=" *> strP
|
|
readState <- " read=" *> strP
|
|
canWrite <- " canWrite=" *> strP
|
|
size <- " size=" *> strP
|
|
pure MsgQueueState {writeState, readState, canWrite, size}
|
|
|
|
instance JournalTypeI t => StrEncoding (JournalState t) where
|
|
strEncode JournalState {journalId, msgPos, msgCount, bytePos, byteCount} =
|
|
B.intercalate "," [journalId, e msgPos, e msgCount, e bytePos, e byteCount]
|
|
where
|
|
e :: StrEncoding a => a -> ByteString
|
|
e = strEncode
|
|
strP = do
|
|
journalId <- A.takeTill (== ',')
|
|
JournalState sJournalType journalId <$> i <*> i <*> i <*> i
|
|
where
|
|
i :: Integral a => A.Parser a
|
|
i = A.char ',' *> A.decimal
|
|
|
|
queueLogFileName :: String
|
|
queueLogFileName = "queue_state"
|
|
|
|
msgLogFileName :: String
|
|
msgLogFileName = "messages"
|
|
|
|
logFileExt :: String
|
|
logFileExt = ".log"
|
|
|
|
newtype StoreIO (s :: QSType) a = StoreIO {unStoreIO :: IO a}
|
|
deriving newtype (Functor, Applicative, Monad)
|
|
|
|
instance StoreQueueClass (JournalQueue s) where
|
|
recipientId = recipientId'
|
|
{-# INLINE recipientId #-}
|
|
queueRec = queueRec'
|
|
{-# INLINE queueRec #-}
|
|
withQueueLock :: JournalQueue s -> Text -> IO a -> IO a
|
|
withQueueLock JournalQueue {recipientId', queueLock, sharedLock} =
|
|
withLockWaitShared recipientId' queueLock sharedLock
|
|
{-# INLINE withQueueLock #-}
|
|
|
|
instance QueueStoreClass (JournalQueue s) (QStore s) where
|
|
type QueueStoreCfg (QStore s) = QStoreCfg s
|
|
|
|
newQueueStore :: QStoreCfg s -> IO (QStore s)
|
|
newQueueStore = \case
|
|
MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) ()
|
|
#if defined(dbServerPostgres)
|
|
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True)
|
|
#endif
|
|
|
|
closeQueueStore = withQS (closeQueueStore @(JournalQueue s))
|
|
{-# INLINE closeQueueStore #-}
|
|
loadedQueues = withQS loadedQueues
|
|
{-# INLINE loadedQueues #-}
|
|
compactQueues = withQS (compactQueues @(JournalQueue s))
|
|
{-# INLINE compactQueues #-}
|
|
getEntityCounts = withQS (getEntityCounts @(JournalQueue s))
|
|
{-# INLINE getEntityCounts #-}
|
|
addQueue_ = withQS addQueue_
|
|
{-# INLINE addQueue_ #-}
|
|
getQueue_ = withQS getQueue_
|
|
{-# INLINE getQueue_ #-}
|
|
getQueues_ = withQS getQueues_
|
|
{-# INLINE getQueues_ #-}
|
|
addQueueLinkData = withQS addQueueLinkData
|
|
{-# INLINE addQueueLinkData #-}
|
|
getQueueLinkData = withQS getQueueLinkData
|
|
{-# INLINE getQueueLinkData #-}
|
|
deleteQueueLinkData = withQS deleteQueueLinkData
|
|
{-# INLINE deleteQueueLinkData #-}
|
|
secureQueue = withQS secureQueue
|
|
{-# INLINE secureQueue #-}
|
|
updateKeys = withQS updateKeys
|
|
{-# INLINE updateKeys #-}
|
|
addQueueNotifier = withQS addQueueNotifier
|
|
{-# INLINE addQueueNotifier #-}
|
|
deleteQueueNotifier = withQS deleteQueueNotifier
|
|
{-# INLINE deleteQueueNotifier #-}
|
|
suspendQueue = withQS suspendQueue
|
|
{-# INLINE suspendQueue #-}
|
|
blockQueue = withQS blockQueue
|
|
{-# INLINE blockQueue #-}
|
|
unblockQueue = withQS unblockQueue
|
|
{-# INLINE unblockQueue #-}
|
|
updateQueueTime = withQS updateQueueTime
|
|
{-# INLINE updateQueueTime #-}
|
|
deleteStoreQueue = withQS deleteStoreQueue
|
|
{-# INLINE deleteStoreQueue #-}
|
|
getCreateService = withQS (getCreateService @(JournalQueue s))
|
|
{-# INLINE getCreateService #-}
|
|
setQueueService = withQS setQueueService
|
|
{-# INLINE setQueueService #-}
|
|
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
|
|
{-# INLINE getQueueNtfServices #-}
|
|
getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s))
|
|
{-# INLINE getServiceQueueCountHash #-}
|
|
|
|
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
|
|
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
|
|
queueRec' <- newTVarIO $ Just qr
|
|
msgQueue' <- newTVarIO Nothing
|
|
activeAt <- newTVarIO 0
|
|
queueState <- newTVarIO Nothing
|
|
pure $
|
|
JournalQueue
|
|
{ recipientId' = rId,
|
|
queueLock,
|
|
sharedLock,
|
|
queueRec',
|
|
msgQueue',
|
|
activeAt,
|
|
queueState
|
|
}
|
|
|
|
instance MsgStoreClass (JournalMsgStore s) where
|
|
type StoreMonad (JournalMsgStore s) = StoreIO s
|
|
type MsgQueue (JournalMsgStore s) = JournalMsgQueue s
|
|
type QueueStore (JournalMsgStore s) = QStore s
|
|
type StoreQueue (JournalMsgStore s) = JournalQueue s
|
|
type MsgStoreConfig (JournalMsgStore s) = JournalStoreConfig s
|
|
|
|
newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s)
|
|
newMsgStore config@JournalStoreConfig {queueStoreCfg} = do
|
|
random <- newTVarIO =<< newStdGen
|
|
queueLocks <- TM.emptyIO
|
|
sharedLock <- newEmptyTMVarIO
|
|
queueStore_ <- newQueueStore @(JournalQueue s) queueStoreCfg
|
|
openedQueueCount <- newTVarIO 0
|
|
expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime
|
|
pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, openedQueueCount, expireBackupsBefore}
|
|
|
|
closeMsgStore :: JournalMsgStore s -> IO ()
|
|
closeMsgStore ms = do
|
|
let st = queueStore_ ms
|
|
closeQueues $ loadedQueues @(JournalQueue s) st
|
|
closeQueueStore @(JournalQueue s) st
|
|
where
|
|
closeQueues qs = readTVarIO qs >>= mapM_ (closeMsgQueue ms)
|
|
|
|
withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
|
|
withActiveMsgQueues = withQS withLoadedQueues . queueStore_
|
|
|
|
-- This function can only be used in server CLI commands or before server is started.
|
|
-- It does not cache queues and is NOT concurrency safe.
|
|
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
|
|
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
|
|
MQStore st -> withLoadedQueues st run
|
|
#if defined(dbServerPostgres)
|
|
PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run
|
|
#endif
|
|
where
|
|
run q = do
|
|
r <- action q
|
|
closeMsgQueue ms q
|
|
pure r
|
|
|
|
-- This function is concurrency safe
|
|
expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
|
|
expireOldMessages tty ms now ttl = case queueStore_ ms of
|
|
MQStore st ->
|
|
withLoadedQueues st $ \q -> run $ isolateQueue ms q "deleteExpiredMsgs" $ do
|
|
StoreIO (readTVarIO $ queueRec q) >>= \case
|
|
Just QueueRec {updatedAt = Just (RoundedSystemTime t)} | t > veryOld ->
|
|
expireQueueMsgs ms now old q
|
|
_ -> pure newMessageStats
|
|
#if defined(dbServerPostgres)
|
|
PQStore st -> do
|
|
let JournalMsgStore {queueLocks, sharedLock} = ms
|
|
foldRecentQueueRecs veryOld tty st $ \(rId, qr) -> do
|
|
q <- mkQueue ms False rId qr
|
|
withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
|
|
getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
|
|
#endif
|
|
where
|
|
old = now - ttl
|
|
veryOld = now - 2 * ttl - 86400
|
|
run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
|
|
run = fmap (fromRight newMessageStats) . runExceptT
|
|
-- Use cached queue if available.
|
|
-- Also see the comment in loadQueue in PostgresQueueStore
|
|
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
|
|
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
|
|
|
|
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
|
|
foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of
|
|
MQStore st -> fmap Right $ foldRcvServiceQueues st serviceId f' acc
|
|
where
|
|
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
|
|
#if defined(dbServerPostgres)
|
|
PQStore st -> foldRcvServiceQueueRecs st serviceId f' acc
|
|
where
|
|
JournalMsgStore {queueLocks, sharedLock} = ms
|
|
f' a (rId, qr) = do
|
|
q <- mkQueue ms False rId qr
|
|
qMsg_ <-
|
|
withSharedWaitLock rId queueLocks sharedLock $ runExceptT $ tryStore' "foldRcvServiceMessages" rId $
|
|
(qr,) . snd <$$> (getLoadedQueue q >>= unStoreIO . getPeekMsgQueue ms)
|
|
f a rId qMsg_
|
|
-- Use cached queue if available.
|
|
-- Also see the comment in loadQueue in PostgresQueueStore
|
|
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
|
|
#endif
|
|
|
|
logQueueStates :: JournalMsgStore s -> IO ()
|
|
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
|
|
|
|
logQueueState :: JournalQueue s -> StoreIO s ()
|
|
logQueueState q =
|
|
StoreIO . void $
|
|
readTVarIO (msgQueue' q)
|
|
$>>= \mq -> readTVarIO (handles mq)
|
|
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
|
|
|
|
queueStore = queueStore_
|
|
{-# INLINE queueStore #-}
|
|
|
|
loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
|
|
loadedQueueCounts ms = do
|
|
let (qs, ns, nLocks_) = loaded
|
|
loadedQueueCount <- M.size <$> readTVarIO qs
|
|
loadedNotifierCount <- M.size <$> readTVarIO ns
|
|
openJournalCount <- readTVarIO (openedQueueCount ms)
|
|
queueLockCount <- M.size <$> readTVarIO (queueLocks ms)
|
|
notifierLockCount <- maybe (pure 0) (fmap M.size . readTVarIO) nLocks_
|
|
pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount, queueLockCount, notifierLockCount}
|
|
where
|
|
loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock))
|
|
loaded = case queueStore_ ms of
|
|
MQStore STMQueueStore {queues, notifiers} -> (queues, notifiers, Nothing)
|
|
#if defined(dbServerPostgres)
|
|
PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks)
|
|
#endif
|
|
|
|
mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
|
|
mkQueue ms keepLock rId qr = do
|
|
lock <- if keepLock then atomically $ getMapLock (queueLocks ms) rId else createLockIO
|
|
makeQueue_ ms rId qr lock
|
|
|
|
getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
|
|
getMsgQueue ms@JournalMsgStore {random} q'@JournalQueue {recipientId' = rId, msgQueue'} forWrite =
|
|
StoreIO $ readTVarIO msgQueue' >>= maybe newQ pure
|
|
where
|
|
newQ = do
|
|
let dir = msgQueueDirectory ms rId
|
|
statePath = msgQueueStatePath dir rId
|
|
queue = JMQueue {queueDirectory = dir, statePath}
|
|
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue)
|
|
atomically $ writeTVar msgQueue' $ Just q
|
|
st <- readTVarIO $ state q
|
|
atomically $ writeTVar (queueState q') $ Just $! qState st
|
|
pure q
|
|
where
|
|
createQ :: JMQueue -> IO (JournalMsgQueue s)
|
|
createQ queue = do
|
|
-- folder and files are not created here,
|
|
-- to avoid file IO for queues without messages during subscription
|
|
journalId <- newJournalId random
|
|
mkJournalQueue queue (newMsgQueueState journalId) Nothing
|
|
|
|
getPeekMsgQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message))
|
|
getPeekMsgQueue ms q@JournalQueue {queueState} =
|
|
StoreIO (readTVarIO queueState) >>= \case
|
|
Just QState {hasPending} -> if hasPending then peek else pure Nothing
|
|
Nothing -> do
|
|
-- We only close the queue if we just learnt it's empty.
|
|
-- This is needed to reduce file descriptors and memory usage
|
|
-- after the server just started and many clients subscribe.
|
|
-- In case the queue became non-empty on write and then again empty on read
|
|
-- we won't be closing it, to avoid frequent open/close on active queues.
|
|
r <- peek
|
|
when (isNothing r) $ StoreIO $ closeMsgQueue ms q
|
|
pure r
|
|
where
|
|
peek = do
|
|
mq <- getMsgQueue ms q False
|
|
(mq,) <$$> tryPeekMsg_ q mq
|
|
|
|
-- only runs action if queue is not empty
|
|
withIdleMsgQueue :: Int64 -> JournalMsgStore s -> JournalQueue s -> (JournalMsgQueue s -> StoreIO s a) -> StoreIO s (Maybe a, Int)
|
|
withIdleMsgQueue now ms@JournalMsgStore {config} q@JournalQueue {queueState} action =
|
|
StoreIO $ readTVarIO (msgQueue' q) >>= \case
|
|
Nothing ->
|
|
E.bracket
|
|
getNonEmptyMsgQueue
|
|
(mapM_ $ \_ -> closeMsgQueue ms q)
|
|
(maybe (pure (Nothing, 0)) (unStoreIO . run))
|
|
where
|
|
run mq = do
|
|
r <- action mq
|
|
sz <- getQueueSize_ mq
|
|
pure (Just r, sz)
|
|
Just mq -> do
|
|
ts <- readTVarIO $ activeAt q
|
|
r <- if now - ts >= idleInterval config
|
|
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue ms q
|
|
else pure Nothing
|
|
sz <- unStoreIO $ getQueueSize_ mq
|
|
pure (r, sz)
|
|
where
|
|
getNonEmptyMsgQueue :: IO (Maybe (JournalMsgQueue s))
|
|
getNonEmptyMsgQueue =
|
|
readTVarIO queueState >>= \case
|
|
Just QState {hasStored}
|
|
| hasStored -> Just <$> unStoreIO (getMsgQueue ms q False)
|
|
| otherwise -> pure Nothing
|
|
Nothing -> do
|
|
mq <- unStoreIO $ getMsgQueue ms q False
|
|
-- queueState was updated in getMsgQueue
|
|
readTVarIO queueState >>= \case
|
|
Just QState {hasStored} | not hasStored -> closeMsgQueue ms q $> Nothing
|
|
_ -> pure $ Just mq
|
|
|
|
deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
|
|
deleteQueue ms q = fst <$$> deleteQueue_ ms q
|
|
|
|
deleteQueueSize :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int))
|
|
deleteQueueSize ms q =
|
|
deleteQueue_ ms q >>= mapM (traverse getSize)
|
|
-- traverse operates on the second tuple element
|
|
where
|
|
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
|
|
|
|
-- drainMsgs is never True with Journal storage
|
|
getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
|
|
getQueueMessages_ drainMsgs q' q = StoreIO $ if drainMsgs then run [] else readTVarIO (state q) >>= runFast
|
|
where
|
|
run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs)
|
|
getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
|
|
where
|
|
readMsg (rs, h) = do
|
|
(msg, len) <- hGetMsgAt h $ bytePos rs
|
|
updateReadPos q' q drainMsgs len hs
|
|
(msg :) <$> run msgs
|
|
runFast MsgQueueState {writeState = ws, readState = rs, size}
|
|
| size > 0 =
|
|
readTVarIO (handles q) >>= \case
|
|
Just (MsgQueueHandles _ rh wh_) -> do
|
|
msgs <- getJournalRange rh (bytePos rs) (byteCount rs)
|
|
case wh_ of
|
|
Just wh -> (msgs ++) <$> getJournalRange wh 0 (bytePos ws)
|
|
Nothing -> pure msgs
|
|
Nothing -> pure []
|
|
| otherwise = pure []
|
|
|
|
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
|
|
writeMsg ms q' logState msg = isolateQueue ms q' "writeMsg" $ do
|
|
q <- getMsgQueue ms q' True
|
|
StoreIO $ (`E.finally` updateActiveAt q') $ do
|
|
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
|
|
let empty = size == 0
|
|
if canWrite || empty
|
|
then do
|
|
let canWrt' = quota > size
|
|
if canWrt'
|
|
then writeToJournal q st canWrt' msg $> Just (msg, empty)
|
|
else writeToJournal q st canWrt' msgQuota $> Nothing
|
|
else pure Nothing
|
|
where
|
|
JournalStoreConfig {quota, maxMsgCount} = config ms
|
|
msgQuota = MessageQuota {msgId = messageId msg, msgTs = messageTs msg}
|
|
writeToJournal q st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do
|
|
let msgStr = strEncode msg' `B.snoc` '\n'
|
|
msgLen = fromIntegral $ B.length msgStr
|
|
hs <- maybe createQueueDir pure =<< readTVarIO handles
|
|
(ws, wh) <- case writeHandle hs of
|
|
Nothing | msgCount writeState >= maxMsgCount -> switchWriteJournal hs
|
|
wh_ -> pure (writeState, fromMaybe (readHandle hs) wh_)
|
|
let msgPos' = msgPos ws + 1
|
|
bytePos' = bytePos ws + msgLen
|
|
ws' = ws {msgPos = msgPos', msgCount = msgPos', bytePos = bytePos', byteCount = bytePos'}
|
|
rs' = if journalId ws == journalId rs then rs {msgCount = msgPos', byteCount = bytePos'} else rs
|
|
!st' = st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1}
|
|
hAppend wh (bytePos ws) msgStr
|
|
updateQueueState q' q logState hs st' $
|
|
when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen))
|
|
where
|
|
JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q
|
|
createQueueDir = do
|
|
createDirectoryIfMissing True queueDirectory
|
|
sh <- openFile statePath AppendMode
|
|
rh <- createNewJournal queueDirectory $ journalId rs
|
|
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
|
|
atomically $ writeTVar handles $ Just hs
|
|
atomically $ modifyTVar' (openedQueueCount ms) (+ 1)
|
|
pure hs
|
|
switchWriteJournal hs = do
|
|
journalId <- newJournalId $ random ms
|
|
wh <- createNewJournal queueDirectory journalId
|
|
atomically $ writeTVar handles $ Just $ hs {writeHandle = Just wh}
|
|
pure (newJournalState journalId, wh)
|
|
|
|
-- can ONLY be used while restoring messages, not while server running
|
|
setOverQuota_ :: JournalQueue s -> IO ()
|
|
setOverQuota_ q =
|
|
readTVarIO (msgQueue' q)
|
|
>>= mapM_ (\JournalMsgQueue {state} -> atomically $ modifyTVar' state $ \st -> st {canWrite = False})
|
|
|
|
getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int
|
|
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state
|
|
|
|
tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
|
|
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
|
|
StoreIO $ (readTVarIO handles $>>= chooseReadJournal q mq True $>>= peekMsg)
|
|
where
|
|
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
|
|
where
|
|
readMsg = do
|
|
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
|
|
atomically $ writeTVar tipMsg $ Just (Just ml)
|
|
pure $ Just msg
|
|
|
|
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
|
|
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
|
|
void $
|
|
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
|
|
$>>= (pure . fmap snd)
|
|
$>>= \len -> readTVarIO handles
|
|
$>>= \hs -> updateReadPos q mq logState len hs $> Just ()
|
|
|
|
isolateQueue :: JournalMsgStore s -> JournalQueue s -> Text -> StoreIO s a -> ExceptT ErrorType IO a
|
|
isolateQueue _ sq op = tryStore' op (recipientId' sq) . withQueueLock sq op . unStoreIO
|
|
|
|
unsafeRunStore :: JournalQueue s -> Text -> StoreIO s a -> IO a
|
|
unsafeRunStore sq op a =
|
|
unStoreIO a `E.catch` \e -> storeError op (recipientId' sq) e >> E.throwIO e
|
|
|
|
updateActiveAt :: JournalQueue s -> IO ()
|
|
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
|
|
|
|
tryStore' :: Text -> RecipientId -> IO a -> ExceptT ErrorType IO a
|
|
tryStore' op rId = tryStore op rId . fmap Right
|
|
|
|
tryStore :: forall a. Text -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
|
|
tryStore op rId a = ExceptT $ E.mask_ $ a `E.catch` storeError op rId
|
|
|
|
storeError :: Text -> RecipientId -> E.SomeException -> IO (Either ErrorType a)
|
|
storeError op rId e =
|
|
let e' = T.intercalate ", " [op, decodeLatin1 $ strEncode rId, tshow e]
|
|
in logError ("STORE: " <> e') $> Left (STORE e')
|
|
|
|
isolateQueueId :: Text -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
|
|
isolateQueueId op JournalMsgStore {queueLocks, sharedLock} rId =
|
|
tryStore op rId . withLockMapWaitShared rId queueLocks sharedLock op
|
|
|
|
openMsgQueue :: JournalMsgStore s -> JMQueue -> Bool -> IO (JournalMsgQueue s)
|
|
openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, statePath} forWrite = do
|
|
(st_, shouldBackup) <- readQueueState ms statePath
|
|
case st_ of
|
|
Nothing -> do
|
|
st <- newMsgQueueState <$> newJournalId (random ms)
|
|
when shouldBackup $ backupQueueState statePath -- rename invalid state file
|
|
mkJournalQueue q st Nothing
|
|
Just st
|
|
| size st == 0 -> do
|
|
(st', hs_) <- removeJournals st shouldBackup
|
|
when (isJust hs_) incOpenedCount
|
|
mkJournalQueue q st' hs_
|
|
| otherwise -> do
|
|
sh <- openBackupQueueState st shouldBackup
|
|
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
|
|
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
|
|
incOpenedCount
|
|
mkJournalQueue q st' (Just hs)
|
|
where
|
|
incOpenedCount = atomically $ modifyTVar' (openedQueueCount ms) (+ 1)
|
|
-- If the queue is empty, journals are deleted.
|
|
-- New journal is created if queue is written to.
|
|
-- canWrite is set to True.
|
|
removeJournals MsgQueueState {readState = rs, writeState = ws} shouldBackup = E.uninterruptibleMask_ $ do
|
|
rjId <- newJournalId $ random ms
|
|
let st = newMsgQueueState rjId
|
|
hs_ <-
|
|
if forWrite
|
|
then Just <$> newJournalHandles st rjId
|
|
else Nothing <$ backupQueueState statePath
|
|
removeJournalIfExists dir rs
|
|
unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws
|
|
pure (st, hs_)
|
|
where
|
|
newJournalHandles st rjId = do
|
|
sh <- openBackupQueueState st shouldBackup
|
|
appendState_ sh st
|
|
rh <- closeOnException sh $ createNewJournal dir rjId
|
|
pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
|
|
openBackupQueueState st shouldBackup
|
|
| shouldBackup = do
|
|
-- State backup is made in two steps to mitigate the crash during the backup.
|
|
-- Temporary backup file will be used when it is present.
|
|
let tempBackup = statePath <> ".bak"
|
|
renameFile statePath tempBackup -- 1) temp backup
|
|
sh <- openFile statePath AppendMode
|
|
closeOnException sh $ appendState sh st -- 2) save state to new file
|
|
backupQueueState tempBackup -- 3) timed backup
|
|
pure sh
|
|
| otherwise = openFile statePath AppendMode
|
|
backupQueueState path = do
|
|
ts <- getCurrentTime
|
|
renameFile path $ stateBackupPath statePath ts
|
|
-- remove old backups
|
|
times <- sort . mapMaybe backupPathTime <$> listDirectory dir
|
|
let toDelete = filter (< expireBackupsBefore ms) $ take (length times - keepMinBackups config) times
|
|
mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete
|
|
where
|
|
backupPathTime :: FilePath -> Maybe UTCTime
|
|
backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack
|
|
statePathPfx = T.pack $ takeFileName statePath <> "."
|
|
|
|
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s)
|
|
mkJournalQueue queue st hs_ = do
|
|
state <- newTVarIO st
|
|
tipMsg <- newTVarIO Nothing
|
|
handles <- newTVarIO hs_
|
|
-- using the same queue lock which is currently locked,
|
|
-- to avoid map lookup on queue operations
|
|
pure JournalMsgQueue {queue, state, tipMsg, handles}
|
|
|
|
chooseReadJournal :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle))
|
|
chooseReadJournal q' q log' hs = do
|
|
st@MsgQueueState {writeState = ws, readState = rs} <- readTVarIO (state q)
|
|
case writeHandle hs of
|
|
Just wh | msgPos rs >= msgCount rs && journalId rs /= journalId ws -> do
|
|
-- switching to write journal
|
|
atomically $ writeTVar (handles q) $ Just hs {readHandle = wh, writeHandle = Nothing}
|
|
hClose $ readHandle hs
|
|
when log' $ removeJournal (queueDirectory $ queue q) rs
|
|
let !rs' = (newJournalState $ journalId ws) {msgCount = msgCount ws, byteCount = byteCount ws}
|
|
!st' = st {readState = rs'}
|
|
updateQueueState q' q log' hs st' $ pure ()
|
|
pure $ Just (rs', wh)
|
|
_ | msgPos rs >= msgCount rs && journalId rs == journalId ws -> pure Nothing
|
|
_ -> pure $ Just (rs, readHandle hs)
|
|
|
|
updateQueueState :: JournalQueue s -> JournalMsgQueue s -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO ()
|
|
updateQueueState q' q log' hs st a = do
|
|
unless (validQueueState st) $ E.throwIO $ userError $ "updateQueueState invalid state: " <> show st
|
|
when log' $ appendState (stateHandle hs) st
|
|
atomically $ writeTVar (queueState q') $ Just $! qState st
|
|
atomically $ writeTVar (state q) st >> a
|
|
|
|
appendState :: Handle -> MsgQueueState -> IO ()
|
|
appendState h = E.uninterruptibleMask_ . appendState_ h
|
|
{-# INLINE appendState #-}
|
|
|
|
appendState_ :: Handle -> MsgQueueState -> IO ()
|
|
appendState_ h st = B.hPutStr h $ strEncode st `B.snoc` '\n'
|
|
|
|
updateReadPos :: JournalQueue s -> JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO ()
|
|
updateReadPos q' q log' len hs = do
|
|
st@MsgQueueState {readState = rs, size} <- readTVarIO (state q)
|
|
let JournalState {msgPos, bytePos} = rs
|
|
let msgPos' = msgPos + 1
|
|
rs' = rs {msgPos = msgPos', bytePos = bytePos + len}
|
|
st' = st {readState = rs', size = size - 1}
|
|
updateQueueState q' q log' hs st' $ writeTVar (tipMsg q) Nothing
|
|
|
|
msgQueueDirectory :: JournalMsgStore s -> RecipientId -> FilePath
|
|
msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathParts}} rId =
|
|
storePath </> B.unpack (B.intercalate "/" $ splitSegments pathParts $ strEncode rId)
|
|
where
|
|
splitSegments _ "" = []
|
|
splitSegments 1 s = [s]
|
|
splitSegments n s =
|
|
let (seg, s') = B.splitAt 2 s
|
|
in seg : splitSegments (n - 1) s'
|
|
|
|
msgQueueStatePath :: FilePath -> RecipientId -> FilePath
|
|
msgQueueStatePath dir rId = dir </> (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt)
|
|
|
|
createNewJournal :: FilePath -> ByteString -> IO Handle
|
|
createNewJournal dir journalId = do
|
|
let path = journalFilePath dir journalId -- TODO retry if file exists
|
|
h <- openFile path ReadWriteMode
|
|
B.hPutStr h ""
|
|
pure h
|
|
|
|
newJournalId :: TVar StdGen -> IO ByteString
|
|
newJournalId g = strEncode <$> atomically (stateTVar g $ genByteString 12)
|
|
|
|
openJournals :: JournalMsgStore s -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle)
|
|
openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do
|
|
let rjId = journalId rs
|
|
wjId = journalId ws
|
|
openJournal rs >>= \case
|
|
Left path
|
|
| rjId == wjId -> do
|
|
logError $ "STORE: openJournals, no read/write file - creating new file, " <> T.pack path
|
|
newReadJournal
|
|
| otherwise -> do
|
|
let rs' = (newJournalState wjId) {msgCount = msgCount ws, byteCount = byteCount ws}
|
|
st' = st {readState = rs', size = msgCount ws}
|
|
openJournal rs' >>= \case
|
|
Left path' -> do
|
|
logError $ "STORE: openJournals, no read and write files - creating new file, read: " <> T.pack path <> ", write: " <> T.pack path'
|
|
newReadJournal
|
|
Right rh -> do
|
|
logError $ "STORE: openJournals, no read file - switched to write file, " <> T.pack path
|
|
closeOnException rh $ fixFileSize rh $ bytePos ws
|
|
pure (st', rh, Nothing)
|
|
Right rh
|
|
| rjId == wjId -> do
|
|
closeOnException rh $ fixFileSize rh $ bytePos ws
|
|
pure (st, rh, Nothing)
|
|
| otherwise -> closeOnException rh $ do
|
|
fixFileSize rh $ byteCount rs
|
|
openJournal ws >>= \case
|
|
Left path -> do
|
|
let msgs = msgCount rs
|
|
bytes = byteCount rs
|
|
size' = msgs - msgPos rs
|
|
ws' = (newJournalState rjId) {msgPos = msgs, msgCount = msgs, bytePos = bytes, byteCount = bytes}
|
|
st' = st {writeState = ws', size = size'} -- we don't amend canWrite to trigger QCONT
|
|
logError $ "STORE: openJournals, no write file, " <> T.pack path
|
|
pure (st', rh, Nothing)
|
|
Right wh -> do
|
|
closeOnException wh $ fixFileSize wh $ bytePos ws
|
|
pure (st, rh, Just wh)
|
|
where
|
|
newReadJournal = do
|
|
rjId' <- newJournalId $ random ms
|
|
rh <- createNewJournal dir rjId'
|
|
let st' = newMsgQueueState rjId'
|
|
closeOnException rh $ appendState sh st'
|
|
pure (st', rh, Nothing)
|
|
openJournal :: JournalState t -> IO (Either FilePath Handle)
|
|
openJournal JournalState {journalId} =
|
|
let path = journalFilePath dir journalId
|
|
in ifM (doesFileExist path) (Right <$> openFile path ReadWriteMode) (pure $ Left path)
|
|
-- do that for all append operations
|
|
|
|
fixFileSize :: Handle -> Int64 -> IO ()
|
|
fixFileSize h pos = do
|
|
let pos' = fromIntegral pos
|
|
size <- IO.hFileSize h
|
|
if
|
|
| size > pos' -> do
|
|
name <- IO.hShow h
|
|
logWarn $ "STORE: fixFileSize, size " <> tshow size <> " > pos " <> tshow pos <> " - truncating, " <> T.pack name
|
|
IO.hSetFileSize h pos'
|
|
| size < pos' -> do
|
|
-- From code logic this can't happen.
|
|
name <- IO.hShow h
|
|
E.throwIO $ userError $ "fixFileSize size " <> show size <> " < pos " <> show pos <> " - aborting: " <> name
|
|
| otherwise -> pure ()
|
|
|
|
removeJournal :: FilePath -> JournalState t -> IO ()
|
|
removeJournal dir JournalState {journalId} =
|
|
safeRemoveFile "removeJournal" $ journalFilePath dir journalId
|
|
|
|
removeJournalIfExists :: FilePath -> JournalState t -> IO ()
|
|
removeJournalIfExists dir JournalState {journalId} = do
|
|
let path = journalFilePath dir journalId
|
|
handleError "removeJournalIfExists" path $
|
|
whenM (doesFileExist path) $ removeFile path
|
|
|
|
safeRemoveFile :: Text -> FilePath -> IO ()
|
|
safeRemoveFile cxt path = handleError cxt path $ removeFile path
|
|
|
|
handleError :: Text -> FilePath -> IO () -> IO ()
|
|
handleError cxt path a =
|
|
a `catchAny` \e -> logError $ "STORE: " <> cxt <> ", " <> T.pack path <> ", " <> tshow e
|
|
|
|
-- This function is supposed to be resilient to crashes while updating state files,
|
|
-- and also resilient to crashes during its execution.
|
|
readQueueState :: JournalMsgStore s -> FilePath -> IO (Maybe MsgQueueState, Bool)
|
|
readQueueState JournalMsgStore {config} statePath =
|
|
ifM
|
|
(doesFileExist tempBackup)
|
|
(renameFile tempBackup statePath >> readState)
|
|
(ifM (doesFileExist statePath) readState $ pure (Nothing, False))
|
|
where
|
|
tempBackup = statePath <> ".bak"
|
|
readState = do
|
|
ls <- B.lines <$> readFileTail
|
|
case ls of
|
|
[] -> do
|
|
logWarn $ "STORE: readWriteQueueState, empty queue state, " <> T.pack statePath
|
|
pure (Nothing, False)
|
|
_ -> do
|
|
r <- useLastLine (length ls) True ls
|
|
forM_ (fst r) $ \st ->
|
|
unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st
|
|
pure r
|
|
useLastLine len isLastLine ls = case strDecode $ last ls of
|
|
Right st ->
|
|
-- when state file has fewer than maxStateLines, we don't compact it
|
|
let shouldBackup = len > maxStateLines config || not isLastLine
|
|
in pure (Just st, shouldBackup)
|
|
Left e -- if the last line failed to parse
|
|
| isLastLine -> case init ls of -- or use the previous line
|
|
[] -> do
|
|
logWarn $ "STORE: readWriteQueueState, invalid 1-line queue state - initialized, " <> T.pack statePath
|
|
pure (Nothing, True) -- backup state file, because last line was invalid
|
|
ls' -> do
|
|
logWarn $ "STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " <> T.pack statePath
|
|
useLastLine len False ls'
|
|
| otherwise -> E.throwIO $ userError $ "readWriteQueueState invalid state " <> statePath <> ": " <> show e
|
|
readFileTail =
|
|
IO.withFile statePath ReadMode $ \h -> do
|
|
size <- IO.hFileSize h
|
|
let sz = stateTailSize config
|
|
sz' = fromIntegral sz
|
|
if size > sz'
|
|
then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz
|
|
else B.hGet h (fromIntegral size)
|
|
|
|
stateBackupPath :: FilePath -> UTCTime -> FilePath
|
|
stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak"
|
|
|
|
validQueueState :: MsgQueueState -> Bool
|
|
validQueueState MsgQueueState {readState = rs, writeState = ws, size}
|
|
| journalId rs == journalId ws =
|
|
alwaysValid
|
|
&& msgPos rs <= msgPos ws
|
|
&& msgCount rs == msgCount ws
|
|
&& bytePos rs <= bytePos ws
|
|
&& byteCount rs == byteCount ws
|
|
&& size == msgCount rs - msgPos rs
|
|
| otherwise =
|
|
alwaysValid
|
|
&& size == msgCount ws + msgCount rs - msgPos rs
|
|
where
|
|
alwaysValid =
|
|
msgPos rs <= msgCount rs
|
|
&& bytePos rs <= byteCount rs
|
|
&& msgPos ws == msgCount ws
|
|
&& bytePos ws == byteCount ws
|
|
|
|
deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
|
|
deleteQueue_ ms q =
|
|
runExceptT $ isolateQueueId "deleteQueue_" ms rId $ do
|
|
r <- deleteStoreQueue (queueStore_ ms) q >>= mapM remove
|
|
atomically $ TM.delete rId (queueLocks ms)
|
|
pure r
|
|
where
|
|
rId = recipientId q
|
|
remove qr = do
|
|
mq_ <- atomically $ swapTVar (msgQueue' q) Nothing
|
|
mapM_ (closeMsgQueueHandles ms) mq_
|
|
removeQueueDirectory ms rId
|
|
pure (qr, mq_)
|
|
|
|
closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
|
|
closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms)
|
|
|
|
closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO ()
|
|
closeMsgQueueHandles ms q = readTVarIO (handles q) >>= mapM_ closeHandles
|
|
where
|
|
closeHandles (MsgQueueHandles sh rh wh_) = do
|
|
hClose sh
|
|
hClose rh
|
|
mapM_ hClose wh_
|
|
atomically $ modifyTVar' (openedQueueCount ms) (subtract 1)
|
|
|
|
removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO ()
|
|
removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st
|
|
|
|
removeQueueDirectory_ :: FilePath -> IO ()
|
|
removeQueueDirectory_ dir =
|
|
handleError "removeQueueDirectory" dir $ removePathForcibly dir
|
|
|
|
hAppend :: Handle -> Int64 -> ByteString -> IO ()
|
|
hAppend h pos s = do
|
|
fixFileSize h pos
|
|
IO.hSeek h SeekFromEnd 0
|
|
B.hPutStr h s
|
|
|
|
hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
|
|
hGetMsgAt h pos = do
|
|
IO.hSeek h AbsoluteSeek $ fromIntegral pos
|
|
s <- B.hGetLine h
|
|
case strDecode s of
|
|
Right !msg ->
|
|
let !len = fromIntegral (B.length s) + 1
|
|
in pure (msg, len)
|
|
Left e -> E.throwIO $ userError $ "hGetMsgAt invalid message: " <> e
|
|
|
|
openFile :: FilePath -> IOMode -> IO Handle
|
|
openFile f mode = do
|
|
h <- IO.openFile f mode
|
|
IO.hSetBuffering h LineBuffering
|
|
pure h
|
|
|
|
hClose :: Handle -> IO ()
|
|
hClose h =
|
|
IO.hClose h `catchAny` \e -> do
|
|
name <- IO.hShow h
|
|
logError $ "STORE: hClose, " <> T.pack name <> ", " <> tshow e
|
|
|
|
closeOnException :: Handle -> IO a -> IO a
|
|
closeOnException h a = a `E.onException` hClose h
|
|
|
|
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
|
|
getJournalQueueMessages ms q =
|
|
readQueueState ms (msgQueueStatePath dir rId) >>= \case
|
|
(Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size > 0 -> do
|
|
msgs <- getMsgs (journalId rs) (bytePos rs) (byteCount rs)
|
|
if journalId rs == journalId ws
|
|
then pure msgs
|
|
else (msgs ++) <$> getMsgs (journalId ws) 0 (bytePos ws)
|
|
_ -> pure []
|
|
where
|
|
rId = recipientId' q
|
|
dir = msgQueueDirectory ms rId
|
|
getMsgs jId from to =
|
|
IO.withFile (journalFilePath dir jId) ReadWriteMode $ \h' ->
|
|
getJournalRange h' from to
|
|
|
|
getJournalRange :: Handle -> Int64 -> Int64 -> IO [Message]
|
|
getJournalRange h from to
|
|
| to > from = do
|
|
IO.hSeek h AbsoluteSeek $ fromIntegral from
|
|
parseMsgs =<< B.hGet h (fromIntegral $ to - from)
|
|
| otherwise = pure []
|
|
where
|
|
parseMsgs s = do
|
|
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
|
|
unless (null errs) $ do
|
|
f <- IO.hShow h
|
|
putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
|
|
pure msgs
|