From 03eca19d384d381bfc8b86fa10e623025c50f2b6 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 27 May 2023 16:52:49 +0200 Subject: [PATCH] server: expire messages when restoring them (#758) * server: expire messages when restoring them * add test * specify constructor Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * improve test --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- src/Simplex/Messaging/Server.hs | 18 +++++----- tests/ServerTests.hs | 60 ++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0854506b6..c02d6927d 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -741,7 +741,8 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages st <- asks queueStore ms <- asks msgStore quota <- asks $ msgQueueQuota . config - runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota) . B.lines) >>= \case + old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) + runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota old_) . B.lines) >>= \case Left e -> do logError . T.pack $ "error restoring messages: " <> e liftIO exitFailure @@ -749,7 +750,7 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages renameFile f $ f <> ".bak" logInfo "messages restored" where - restoreMsg st ms quota s = do + restoreMsg st ms quota old_ s = do r <- liftEither . first (msgErr "parsing") $ strDecode s case r of MLRv3 rId msg -> addToMsgQueue rId msg @@ -759,13 +760,14 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages addToMsgQueue rId msg' where addToMsgQueue rId msg = do - full <- atomically $ do + logFull <- atomically $ do q <- getMsgQueue ms rId quota - isNothing <$> writeMsg q msg - case msg of - Message {} -> - when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) - MessageQuota {} -> pure () + case msg of + Message {msgTs} + | maybe True (systemSeconds msgTs >=) old_ -> isNothing <$> writeMsg q msg + | otherwise -> pure False + MessageQuota {} -> writeMsg q msg $> False + when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message)) updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do let nonce = C.cbNonce msgId msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 934832202..581e18166 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -55,7 +55,9 @@ serverTests t@(ATransport t') = do describe "Exceeding queue quota" $ testExceedQueueQuota t' describe "Store log" $ testWithStoreLog t describe "Restore messages" $ testRestoreMessages t - describe "Restore messages (old / v2)" $ testRestoreMessagesV2 t + describe "Restore messages (old / v2)" $ do + testRestoreMessagesV2 t + testRestoreExpireMessages t describe "Timing of AUTH error" $ testTiming t describe "Message notifications" $ testMessageNotifications t describe "Message expiration" $ do @@ -779,6 +781,62 @@ testRestoreMessagesV2 at@(ATransport t) = runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation runClient _ test' = testSMPClient test' `shouldReturn` () +testRestoreExpireMessages :: ATransport -> Spec +testRestoreExpireMessages at@(ATransport t) = + it "should store messages on exit and restore on start" $ do + (sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519 + recipientId <- newTVarIO "" + recipientKey <- newTVarIO Nothing + dhShared <- newTVarIO Nothing + senderId <- newTVarIO "" + + withSmpServerStoreMsgLogOnV2 at testPort . runTest t $ \h -> do + runClient t $ \h1 -> do + (sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub + atomically $ do + writeTVar recipientId rId + writeTVar recipientKey $ Just rKey + writeTVar dhShared $ Just dh + writeTVar senderId sId + sId <- readTVarIO senderId + Resp "1" _ OK <- signSendRecv h sKey ("1", sId, _SEND "hello 1") + Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2") + threadDelay 3000000 + Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3") + Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4") + pure () + + logSize testStoreLogFile `shouldReturn` 2 + msgs <- B.readFile testStoreMsgsFile + length (B.lines msgs) `shouldBe` 4 + + let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200} + cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1} + withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure () + + logSize testStoreLogFile `shouldReturn` 1 + msgs' <- B.readFile testStoreMsgsFile + msgs' `shouldBe` msgs + + let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200} + cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2} + withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure () + + logSize testStoreLogFile `shouldReturn` 1 + -- two messages expired + msgs'' <- B.readFile testStoreMsgsFile + length (B.lines msgs'') `shouldBe` 2 + B.lines msgs'' `shouldBe` drop 2 (B.lines msgs) + + where + runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation + runTest _ test' server = do + testSMPClient test' `shouldReturn` () + killThread server + + runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation + runClient _ test' = testSMPClient test' `shouldReturn` () + createAndSecureQueue :: Transport c => THandle c -> SndPublicVerifyKey -> IO (SenderId, RecipientId, RcvPrivateSignKey, RcvDhSecret) createAndSecureQueue h sPub = do (rPub, rKey) <- C.generateSignatureKeyPair C.SEd448