mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 23:55:14 +00:00
smp server: add created/updated/used date to queues to manage expiration (#1306)
* smp server: add created/updated/used date to queues to manage expiration, all: make Map updates strict in value * remove strict * remove time precision * diff * style * only update when time changed
This commit is contained in:
@@ -986,8 +986,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
|
||||
Cmd SSender command -> Just <$> case command of
|
||||
SKEY sKey -> (corrId,entId,) <$> case qr_ of
|
||||
Just QueueRec {sndSecure, recipientId}
|
||||
| sndSecure -> secureQueue_ "SKEY" recipientId sKey
|
||||
Just qr@QueueRec {sndSecure}
|
||||
| sndSecure -> secureQueue_ "SKEY" qr sKey
|
||||
| otherwise -> pure $ ERR AUTH
|
||||
Nothing -> pure $ ERR INTERNAL
|
||||
SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody
|
||||
@@ -1010,7 +1010,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
GET -> withQueue getMessage
|
||||
ACK msgId -> withQueue (`acknowledgeMsg` msgId)
|
||||
KEY sKey -> (corrId,entId,) <$> case qr_ of
|
||||
Just QueueRec {recipientId} -> secureQueue_ "KEY" recipientId sKey
|
||||
Just qr -> secureQueue_ "KEY" qr sKey
|
||||
Nothing -> pure $ ERR INTERNAL
|
||||
NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey
|
||||
NDEL -> deleteQueueNotifier_ st
|
||||
@@ -1021,6 +1021,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg)
|
||||
createQueue st recipientKey dhKey subMode sndSecure = time "NEW" $ do
|
||||
(rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random
|
||||
updatedAt <- Just <$> liftIO getSystemDate
|
||||
let rcvDhSecret = C.dh' dhKey privDhKey
|
||||
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure}
|
||||
qRec (recipientId, senderId) =
|
||||
@@ -1032,7 +1033,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
senderKey = Nothing,
|
||||
notifier = Nothing,
|
||||
status = QueueActive,
|
||||
sndSecure
|
||||
sndSecure,
|
||||
updatedAt
|
||||
}
|
||||
(corrId,entId,) <$> addQueueRetry 3 qik qRec
|
||||
where
|
||||
@@ -1061,9 +1063,10 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
n <- asks $ queueIdBytes . config
|
||||
liftM2 (,) (randomId n) (randomId n)
|
||||
|
||||
secureQueue_ :: T.Text -> RecipientId -> SndPublicAuthKey -> M BrokerMsg
|
||||
secureQueue_ name rId sKey = time name $ do
|
||||
secureQueue_ :: T.Text -> QueueRec -> SndPublicAuthKey -> M BrokerMsg
|
||||
secureQueue_ name qr@QueueRec {recipientId = rId} sKey = time name $ do
|
||||
withLog $ \s -> logSecureQueue s rId sKey
|
||||
updateQueueDate qr
|
||||
st <- asks queueStore
|
||||
stats <- asks serverStats
|
||||
incStat $ qSecured stats
|
||||
@@ -1172,7 +1175,17 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
pure r
|
||||
|
||||
withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg)
|
||||
withQueue action = maybe (pure $ err AUTH) action qr_
|
||||
withQueue action = case qr_ of
|
||||
Just qr -> updateQueueDate qr >> action qr
|
||||
Nothing -> pure $ err INTERNAL
|
||||
|
||||
updateQueueDate :: QueueRec -> M ()
|
||||
updateQueueDate QueueRec {updatedAt, recipientId = rId} = do
|
||||
t <- liftIO getSystemDate
|
||||
when (Just t /= updatedAt) $ do
|
||||
withLog $ \s -> logUpdateQueueTime s rId t
|
||||
st <- asks queueStore
|
||||
liftIO $ updateQueueTime st rId t
|
||||
|
||||
subscribeNotifications :: M (Transmission BrokerMsg)
|
||||
subscribeNotifications = do
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
|
||||
@@ -16,7 +19,8 @@ data QueueRec = QueueRec
|
||||
senderKey :: !(Maybe SndPublicAuthKey),
|
||||
sndSecure :: !SenderCanSecure,
|
||||
notifier :: !(Maybe NtfCreds),
|
||||
status :: !ServerQueueStatus
|
||||
status :: !ServerQueueStatus,
|
||||
updatedAt :: !(Maybe RoundedSystemTime)
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@@ -34,3 +38,16 @@ instance StrEncoding NtfCreds where
|
||||
pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}
|
||||
|
||||
data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show)
|
||||
|
||||
newtype RoundedSystemTime = RoundedSystemTime Int64
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
instance StrEncoding RoundedSystemTime where
|
||||
strEncode (RoundedSystemTime t) = strEncode t
|
||||
strP = RoundedSystemTime <$> strP
|
||||
|
||||
getRoundedSystemTime :: Int64 -> IO RoundedSystemTime
|
||||
getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` prec) * prec) <$> getSystemTime
|
||||
|
||||
getSystemDate :: IO RoundedSystemTime
|
||||
getSystemDate = getRoundedSystemTime 86400
|
||||
|
||||
@@ -19,6 +19,7 @@ module Simplex.Messaging.Server.QueueStore.STM
|
||||
addQueueNotifier,
|
||||
deleteQueueNotifier,
|
||||
suspendQueue,
|
||||
updateQueueTime,
|
||||
deleteQueue,
|
||||
)
|
||||
where
|
||||
@@ -65,8 +66,8 @@ getQueue QueueStore {queues, senders, notifiers} party qId =
|
||||
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
|
||||
|
||||
secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec)
|
||||
secureQueue QueueStore {queues} rId sKey =
|
||||
atomically $ withQueue rId queues $ \qVar ->
|
||||
secureQueue QueueStore {queues} rId sKey = toResult <$> do
|
||||
TM.lookupIO rId queues $>>= \qVar -> atomically $
|
||||
readTVar qVar >>= \q -> case senderKey q of
|
||||
Just k -> pure $ if sKey == k then Just q else Nothing
|
||||
_ ->
|
||||
@@ -74,26 +75,30 @@ secureQueue QueueStore {queues} rId sKey =
|
||||
in writeTVar qVar q' $> Just q'
|
||||
|
||||
addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType QueueRec)
|
||||
addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = atomically $ do
|
||||
ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $
|
||||
addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do
|
||||
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $
|
||||
withQueue rId queues $ \qVar -> do
|
||||
q <- readTVar qVar
|
||||
forM_ (notifier q) $ (`TM.delete` notifiers) . notifierId
|
||||
writeTVar qVar $! q {notifier = Just ntfCreds}
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
writeTVar qVar q'
|
||||
TM.insert nId rId notifiers
|
||||
pure $ Just q
|
||||
pure q'
|
||||
|
||||
deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType ())
|
||||
deleteQueueNotifier QueueStore {queues, notifiers} rId =
|
||||
atomically $ withQueue rId queues $ \qVar -> do
|
||||
withQueue rId queues $ \qVar -> do
|
||||
q <- readTVar qVar
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers
|
||||
writeTVar qVar $! q {notifier = Nothing}
|
||||
pure $ Just ()
|
||||
|
||||
suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ())
|
||||
suspendQueue QueueStore {queues} rId =
|
||||
atomically $ withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just ()
|
||||
withQueue rId queues (`modifyTVar'` \q -> q {status = QueueOff})
|
||||
|
||||
updateQueueTime :: QueueStore -> RecipientId -> RoundedSystemTime -> IO ()
|
||||
updateQueueTime QueueStore {queues} rId t =
|
||||
void $ withQueue rId queues (`modifyTVar'` \q -> q {updatedAt = Just t})
|
||||
|
||||
deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do
|
||||
@@ -108,5 +113,5 @@ deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do
|
||||
toResult :: Maybe a -> Either ErrorType a
|
||||
toResult = maybe (Left AUTH) Right
|
||||
|
||||
withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM (Maybe a)) -> STM (Either ErrorType a)
|
||||
withQueue rId queues f = toResult <$> TM.lookup rId queues $>>= f
|
||||
withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM a) -> IO (Either ErrorType a)
|
||||
withQueue rId queues f = toResult <$> TM.lookupIO rId queues >>= atomically . mapM f
|
||||
|
||||
@@ -20,12 +20,14 @@ module Simplex.Messaging.Server.StoreLog
|
||||
logSuspendQueue,
|
||||
logDeleteQueue,
|
||||
logDeleteNotifier,
|
||||
logUpdateQueueTime,
|
||||
readWriteStoreLog,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Monad (foldM, unless, when)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Functor (($>))
|
||||
@@ -33,7 +35,7 @@ import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..))
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
import Simplex.Messaging.Transport.Buffer (trimCR)
|
||||
import Simplex.Messaging.Util (ifM)
|
||||
import System.Directory (doesFileExist, renameFile)
|
||||
@@ -52,9 +54,19 @@ data StoreLogRecord
|
||||
| SuspendQueue QueueId
|
||||
| DeleteQueue QueueId
|
||||
| DeleteNotifier QueueId
|
||||
| UpdateTime QueueId RoundedSystemTime
|
||||
|
||||
data SLRTag
|
||||
= CreateQueue_
|
||||
| SecureQueue_
|
||||
| AddNotifier_
|
||||
| SuspendQueue_
|
||||
| DeleteQueue_
|
||||
| DeleteNotifier_
|
||||
| UpdateTime_
|
||||
|
||||
instance StrEncoding QueueRec where
|
||||
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier} =
|
||||
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} =
|
||||
B.unwords
|
||||
[ "rid=" <> strEncode recipientId,
|
||||
"rk=" <> strEncode recipientKey,
|
||||
@@ -64,8 +76,10 @@ instance StrEncoding QueueRec where
|
||||
]
|
||||
<> if sndSecure then " sndSecure=" <> strEncode sndSecure else ""
|
||||
<> maybe "" notifierStr notifier
|
||||
<> maybe "" updatedAtStr updatedAt
|
||||
where
|
||||
notifierStr ntfCreds = " notifier=" <> strEncode ntfCreds
|
||||
updatedAtStr t = " updated_at=" <> strEncode t
|
||||
|
||||
strP = do
|
||||
recipientId <- "rid=" *> strP_
|
||||
@@ -75,24 +89,49 @@ instance StrEncoding QueueRec where
|
||||
senderKey <- "sk=" *> strP
|
||||
sndSecure <- (" sndSecure=" *> strP) <|> pure False
|
||||
notifier <- optional $ " notifier=" *> strP
|
||||
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive}
|
||||
updatedAt <- optional $ " updated_at=" *> strP
|
||||
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt}
|
||||
|
||||
instance StrEncoding SLRTag where
|
||||
strEncode = \case
|
||||
CreateQueue_ -> "CREATE"
|
||||
SecureQueue_ -> "SECURE"
|
||||
AddNotifier_ -> "NOTIFIER"
|
||||
SuspendQueue_ -> "SUSPEND"
|
||||
DeleteQueue_ -> "DELETE"
|
||||
DeleteNotifier_ -> "NDELETE"
|
||||
UpdateTime_ -> "TIME"
|
||||
|
||||
strP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"CREATE" -> pure CreateQueue_
|
||||
"SECURE" -> pure SecureQueue_
|
||||
"NOTIFIER" -> pure AddNotifier_
|
||||
"SUSPEND" -> pure SuspendQueue_
|
||||
"DELETE" -> pure DeleteQueue_
|
||||
"NDELETE" -> pure DeleteNotifier_
|
||||
"TIME" -> pure UpdateTime_
|
||||
s -> fail $ "invalid log record tag: " <> B.unpack s
|
||||
|
||||
instance StrEncoding StoreLogRecord where
|
||||
strEncode = \case
|
||||
CreateQueue q -> strEncode (Str "CREATE", q)
|
||||
SecureQueue rId sKey -> strEncode (Str "SECURE", rId, sKey)
|
||||
AddNotifier rId ntfCreds -> strEncode (Str "NOTIFIER", rId, ntfCreds)
|
||||
SuspendQueue rId -> strEncode (Str "SUSPEND", rId)
|
||||
DeleteQueue rId -> strEncode (Str "DELETE", rId)
|
||||
DeleteNotifier rId -> strEncode (Str "NDELETE", rId)
|
||||
CreateQueue q -> strEncode (CreateQueue_, q)
|
||||
SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey)
|
||||
AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds)
|
||||
SuspendQueue rId -> strEncode (SuspendQueue_, rId)
|
||||
DeleteQueue rId -> strEncode (DeleteQueue_, rId)
|
||||
DeleteNotifier rId -> strEncode (DeleteNotifier_, rId)
|
||||
UpdateTime rId t -> strEncode (UpdateTime_, rId, t)
|
||||
|
||||
strP =
|
||||
"CREATE " *> (CreateQueue <$> strP)
|
||||
<|> "SECURE " *> (SecureQueue <$> strP_ <*> strP)
|
||||
<|> "NOTIFIER " *> (AddNotifier <$> strP_ <*> strP)
|
||||
<|> "SUSPEND " *> (SuspendQueue <$> strP)
|
||||
<|> "DELETE " *> (DeleteQueue <$> strP)
|
||||
<|> "NDELETE " *> (DeleteNotifier <$> strP)
|
||||
strP_ >>= \case
|
||||
CreateQueue_ -> CreateQueue <$> strP
|
||||
SecureQueue_ -> SecureQueue <$> strP_ <*> strP
|
||||
AddNotifier_ -> AddNotifier <$> strP_ <*> strP
|
||||
SuspendQueue_ -> SuspendQueue <$> strP
|
||||
DeleteQueue_ -> DeleteQueue <$> strP
|
||||
DeleteNotifier_ -> DeleteNotifier <$> strP
|
||||
UpdateTime_ -> UpdateTime <$> strP_ <*> strP
|
||||
|
||||
openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode)
|
||||
openWriteStoreLog f = do
|
||||
@@ -138,6 +177,9 @@ logDeleteQueue s = writeStoreLogRecord s . DeleteQueue
|
||||
logDeleteNotifier :: StoreLog 'WriteMode -> QueueId -> IO ()
|
||||
logDeleteNotifier s = writeStoreLogRecord s . DeleteNotifier
|
||||
|
||||
logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> RoundedSystemTime -> IO ()
|
||||
logUpdateQueueTime s qId t = writeStoreLogRecord s $ UpdateTime qId t
|
||||
|
||||
readWriteStoreLog :: FilePath -> IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
|
||||
readWriteStoreLog f = do
|
||||
qs <- ifM (doesFileExist f) readQS (pure M.empty)
|
||||
@@ -169,5 +211,6 @@ readQueues f = foldM processLine M.empty . LB.lines =<< LB.readFile f
|
||||
SuspendQueue qId -> M.adjust (\q -> q {status = QueueOff}) qId m
|
||||
DeleteQueue qId -> M.delete qId m
|
||||
DeleteNotifier qId -> M.adjust (\q -> q {notifier = Nothing}) qId m
|
||||
UpdateTime qId t -> M.adjust (\q -> q {updatedAt = Just t}) qId m
|
||||
printError :: String -> IO ()
|
||||
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
|
||||
|
||||
Reference in New Issue
Block a user