From 990dcec3481035f49658331c021a510bba2237c5 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 9 Sep 2024 14:53:11 +0100 Subject: [PATCH] smp server: add created/updated/used date to queues to manage expiration (#1306) * smp server: add created/updated/used date to queues to manage expiration, all: make Map updates strict in value * remove strict * remove time precision * diff * style * only update when time changed --- src/Simplex/Messaging/Server.hs | 27 +++++-- src/Simplex/Messaging/Server/QueueStore.hs | 19 ++++- .../Messaging/Server/QueueStore/STM.hs | 27 ++++--- src/Simplex/Messaging/Server/StoreLog.hs | 73 +++++++++++++++---- 4 files changed, 112 insertions(+), 34 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index cffd1d6df..e2d5edc4e 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -986,8 +986,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> Just <$> case command of SKEY sKey -> (corrId,entId,) <$> case qr_ of - Just QueueRec {sndSecure, recipientId} - | sndSecure -> secureQueue_ "SKEY" recipientId sKey + Just qr@QueueRec {sndSecure} + | sndSecure -> secureQueue_ "SKEY" qr sKey | otherwise -> pure $ ERR AUTH Nothing -> pure $ ERR INTERNAL SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody @@ -1010,7 +1010,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s GET -> withQueue getMessage ACK msgId -> withQueue (`acknowledgeMsg` msgId) KEY sKey -> (corrId,entId,) <$> case qr_ of - Just QueueRec {recipientId} -> secureQueue_ "KEY" recipientId sKey + Just qr -> secureQueue_ "KEY" qr sKey Nothing -> pure $ ERR INTERNAL NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey NDEL -> deleteQueueNotifier_ st @@ -1021,6 +1021,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> SenderCanSecure -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode sndSecure = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random + updatedAt <- Just <$> liftIO getSystemDate let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure} qRec (recipientId, senderId) = @@ -1032,7 +1033,8 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s senderKey = Nothing, notifier = Nothing, status = QueueActive, - sndSecure + sndSecure, + updatedAt } (corrId,entId,) <$> addQueueRetry 3 qik qRec where @@ -1061,9 +1063,10 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s n <- asks $ queueIdBytes . config liftM2 (,) (randomId n) (randomId n) - secureQueue_ :: T.Text -> RecipientId -> SndPublicAuthKey -> M BrokerMsg - secureQueue_ name rId sKey = time name $ do + secureQueue_ :: T.Text -> QueueRec -> SndPublicAuthKey -> M BrokerMsg + secureQueue_ name qr@QueueRec {recipientId = rId} sKey = time name $ do withLog $ \s -> logSecureQueue s rId sKey + updateQueueDate qr st <- asks queueStore stats <- asks serverStats incStat $ qSecured stats @@ -1172,7 +1175,17 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s pure r withQueue :: (QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) - withQueue action = maybe (pure $ err AUTH) action qr_ + withQueue action = case qr_ of + Just qr -> updateQueueDate qr >> action qr + Nothing -> pure $ err INTERNAL + + updateQueueDate :: QueueRec -> M () + updateQueueDate QueueRec {updatedAt, recipientId = rId} = do + t <- liftIO getSystemDate + when (Just t /= updatedAt) $ do + withLog $ \s -> logUpdateQueueTime s rId t + st <- asks queueStore + liftIO $ updateQueueTime st rId t subscribeNotifications :: M (Transmission BrokerMsg) subscribeNotifications = do diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 8d5bd8fff..3f7da8d29 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -1,10 +1,13 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} module Simplex.Messaging.Server.QueueStore where +import Data.Int (Int64) +import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol @@ -16,7 +19,8 @@ data QueueRec = QueueRec senderKey :: !(Maybe SndPublicAuthKey), sndSecure :: !SenderCanSecure, notifier :: !(Maybe NtfCreds), - status :: !ServerQueueStatus + status :: !ServerQueueStatus, + updatedAt :: !(Maybe RoundedSystemTime) } deriving (Show) @@ -34,3 +38,16 @@ instance StrEncoding NtfCreds where pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show) + +newtype RoundedSystemTime = RoundedSystemTime Int64 + deriving (Eq, Ord, Show) + +instance StrEncoding RoundedSystemTime where + strEncode (RoundedSystemTime t) = strEncode t + strP = RoundedSystemTime <$> strP + +getRoundedSystemTime :: Int64 -> IO RoundedSystemTime +getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` prec) * prec) <$> getSystemTime + +getSystemDate :: IO RoundedSystemTime +getSystemDate = getRoundedSystemTime 86400 diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 3a1385269..da9dc4bb3 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -19,6 +19,7 @@ module Simplex.Messaging.Server.QueueStore.STM addQueueNotifier, deleteQueueNotifier, suspendQueue, + updateQueueTime, deleteQueue, ) where @@ -65,8 +66,8 @@ getQueue QueueStore {queues, senders, notifiers} party qId = SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues) secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec) -secureQueue QueueStore {queues} rId sKey = - atomically $ withQueue rId queues $ \qVar -> +secureQueue QueueStore {queues} rId sKey = toResult <$> do + TM.lookupIO rId queues $>>= \qVar -> atomically $ readTVar qVar >>= \q -> case senderKey q of Just k -> pure $ if sKey == k then Just q else Nothing _ -> @@ -74,26 +75,30 @@ secureQueue QueueStore {queues} rId sKey = in writeTVar qVar q' $> Just q' addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType QueueRec) -addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = atomically $ do - ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ +addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do + ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ withQueue rId queues $ \qVar -> do q <- readTVar qVar forM_ (notifier q) $ (`TM.delete` notifiers) . notifierId - writeTVar qVar $! q {notifier = Just ntfCreds} + let !q' = q {notifier = Just ntfCreds} + writeTVar qVar q' TM.insert nId rId notifiers - pure $ Just q + pure q' deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType ()) deleteQueueNotifier QueueStore {queues, notifiers} rId = - atomically $ withQueue rId queues $ \qVar -> do + withQueue rId queues $ \qVar -> do q <- readTVar qVar forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers writeTVar qVar $! q {notifier = Nothing} - pure $ Just () suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ()) suspendQueue QueueStore {queues} rId = - atomically $ withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just () + withQueue rId queues (`modifyTVar'` \q -> q {status = QueueOff}) + +updateQueueTime :: QueueStore -> RecipientId -> RoundedSystemTime -> IO () +updateQueueTime QueueStore {queues} rId t = + void $ withQueue rId queues (`modifyTVar'` \q -> q {updatedAt = Just t}) deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec) deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do @@ -108,5 +113,5 @@ deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do toResult :: Maybe a -> Either ErrorType a toResult = maybe (Left AUTH) Right -withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM (Maybe a)) -> STM (Either ErrorType a) -withQueue rId queues f = toResult <$> TM.lookup rId queues $>>= f +withQueue :: RecipientId -> TMap RecipientId (TVar QueueRec) -> (TVar QueueRec -> STM a) -> IO (Either ErrorType a) +withQueue rId queues f = toResult <$> TM.lookupIO rId queues >>= atomically . mapM f diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 94a340d94..c47b06eb5 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -20,12 +20,14 @@ module Simplex.Messaging.Server.StoreLog logSuspendQueue, logDeleteQueue, logDeleteNotifier, + logUpdateQueueTime, readWriteStoreLog, ) where import Control.Applicative (optional, (<|>)) import Control.Monad (foldM, unless, when) +import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) @@ -33,7 +35,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol -import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..), ServerQueueStatus (..)) +import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Util (ifM) import System.Directory (doesFileExist, renameFile) @@ -52,9 +54,19 @@ data StoreLogRecord | SuspendQueue QueueId | DeleteQueue QueueId | DeleteNotifier QueueId + | UpdateTime QueueId RoundedSystemTime + +data SLRTag + = CreateQueue_ + | SecureQueue_ + | AddNotifier_ + | SuspendQueue_ + | DeleteQueue_ + | DeleteNotifier_ + | UpdateTime_ instance StrEncoding QueueRec where - strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier} = + strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} = B.unwords [ "rid=" <> strEncode recipientId, "rk=" <> strEncode recipientKey, @@ -64,8 +76,10 @@ instance StrEncoding QueueRec where ] <> if sndSecure then " sndSecure=" <> strEncode sndSecure else "" <> maybe "" notifierStr notifier + <> maybe "" updatedAtStr updatedAt where notifierStr ntfCreds = " notifier=" <> strEncode ntfCreds + updatedAtStr t = " updated_at=" <> strEncode t strP = do recipientId <- "rid=" *> strP_ @@ -75,24 +89,49 @@ instance StrEncoding QueueRec where senderKey <- "sk=" *> strP sndSecure <- (" sndSecure=" *> strP) <|> pure False notifier <- optional $ " notifier=" *> strP - pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive} + updatedAt <- optional $ " updated_at=" *> strP + pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt} + +instance StrEncoding SLRTag where + strEncode = \case + CreateQueue_ -> "CREATE" + SecureQueue_ -> "SECURE" + AddNotifier_ -> "NOTIFIER" + SuspendQueue_ -> "SUSPEND" + DeleteQueue_ -> "DELETE" + DeleteNotifier_ -> "NDELETE" + UpdateTime_ -> "TIME" + + strP = + A.takeTill (== ' ') >>= \case + "CREATE" -> pure CreateQueue_ + "SECURE" -> pure SecureQueue_ + "NOTIFIER" -> pure AddNotifier_ + "SUSPEND" -> pure SuspendQueue_ + "DELETE" -> pure DeleteQueue_ + "NDELETE" -> pure DeleteNotifier_ + "TIME" -> pure UpdateTime_ + s -> fail $ "invalid log record tag: " <> B.unpack s instance StrEncoding StoreLogRecord where strEncode = \case - CreateQueue q -> strEncode (Str "CREATE", q) - SecureQueue rId sKey -> strEncode (Str "SECURE", rId, sKey) - AddNotifier rId ntfCreds -> strEncode (Str "NOTIFIER", rId, ntfCreds) - SuspendQueue rId -> strEncode (Str "SUSPEND", rId) - DeleteQueue rId -> strEncode (Str "DELETE", rId) - DeleteNotifier rId -> strEncode (Str "NDELETE", rId) + CreateQueue q -> strEncode (CreateQueue_, q) + SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey) + AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds) + SuspendQueue rId -> strEncode (SuspendQueue_, rId) + DeleteQueue rId -> strEncode (DeleteQueue_, rId) + DeleteNotifier rId -> strEncode (DeleteNotifier_, rId) + UpdateTime rId t -> strEncode (UpdateTime_, rId, t) strP = - "CREATE " *> (CreateQueue <$> strP) - <|> "SECURE " *> (SecureQueue <$> strP_ <*> strP) - <|> "NOTIFIER " *> (AddNotifier <$> strP_ <*> strP) - <|> "SUSPEND " *> (SuspendQueue <$> strP) - <|> "DELETE " *> (DeleteQueue <$> strP) - <|> "NDELETE " *> (DeleteNotifier <$> strP) + strP_ >>= \case + CreateQueue_ -> CreateQueue <$> strP + SecureQueue_ -> SecureQueue <$> strP_ <*> strP + AddNotifier_ -> AddNotifier <$> strP_ <*> strP + SuspendQueue_ -> SuspendQueue <$> strP + DeleteQueue_ -> DeleteQueue <$> strP + DeleteNotifier_ -> DeleteNotifier <$> strP + UpdateTime_ -> UpdateTime <$> strP_ <*> strP openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) openWriteStoreLog f = do @@ -138,6 +177,9 @@ logDeleteQueue s = writeStoreLogRecord s . DeleteQueue logDeleteNotifier :: StoreLog 'WriteMode -> QueueId -> IO () logDeleteNotifier s = writeStoreLogRecord s . DeleteNotifier +logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> RoundedSystemTime -> IO () +logUpdateQueueTime s qId t = writeStoreLogRecord s $ UpdateTime qId t + readWriteStoreLog :: FilePath -> IO (Map RecipientId QueueRec, StoreLog 'WriteMode) readWriteStoreLog f = do qs <- ifM (doesFileExist f) readQS (pure M.empty) @@ -169,5 +211,6 @@ readQueues f = foldM processLine M.empty . LB.lines =<< LB.readFile f SuspendQueue qId -> M.adjust (\q -> q {status = QueueOff}) qId m DeleteQueue qId -> M.delete qId m DeleteNotifier qId -> M.adjust (\q -> q {notifier = Nothing}) qId m + UpdateTime qId t -> M.adjust (\q -> q {updatedAt = Just t}) qId m printError :: String -> IO () printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s