mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 22:45:06 +00:00
server: expire messages when restoring them (#758)
* server: expire messages when restoring them * add test * specify constructor Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * improve test --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
a5a3a2cbad
commit
03eca19d38
@@ -741,7 +741,8 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
|
||||
st <- asks queueStore
|
||||
ms <- asks msgStore
|
||||
quota <- asks $ msgQueueQuota . config
|
||||
runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota) . B.lines) >>= \case
|
||||
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
|
||||
runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota old_) . B.lines) >>= \case
|
||||
Left e -> do
|
||||
logError . T.pack $ "error restoring messages: " <> e
|
||||
liftIO exitFailure
|
||||
@@ -749,7 +750,7 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "messages restored"
|
||||
where
|
||||
restoreMsg st ms quota s = do
|
||||
restoreMsg st ms quota old_ s = do
|
||||
r <- liftEither . first (msgErr "parsing") $ strDecode s
|
||||
case r of
|
||||
MLRv3 rId msg -> addToMsgQueue rId msg
|
||||
@@ -759,13 +760,14 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
|
||||
addToMsgQueue rId msg'
|
||||
where
|
||||
addToMsgQueue rId msg = do
|
||||
full <- atomically $ do
|
||||
logFull <- atomically $ do
|
||||
q <- getMsgQueue ms rId quota
|
||||
isNothing <$> writeMsg q msg
|
||||
case msg of
|
||||
Message {} ->
|
||||
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
|
||||
MessageQuota {} -> pure ()
|
||||
case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure False
|
||||
MessageQuota {} -> writeMsg q msg $> False
|
||||
when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
|
||||
updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do
|
||||
let nonce = C.cbNonce msgId
|
||||
msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body
|
||||
|
||||
+59
-1
@@ -55,7 +55,9 @@ serverTests t@(ATransport t') = do
|
||||
describe "Exceeding queue quota" $ testExceedQueueQuota t'
|
||||
describe "Store log" $ testWithStoreLog t
|
||||
describe "Restore messages" $ testRestoreMessages t
|
||||
describe "Restore messages (old / v2)" $ testRestoreMessagesV2 t
|
||||
describe "Restore messages (old / v2)" $ do
|
||||
testRestoreMessagesV2 t
|
||||
testRestoreExpireMessages t
|
||||
describe "Timing of AUTH error" $ testTiming t
|
||||
describe "Message notifications" $ testMessageNotifications t
|
||||
describe "Message expiration" $ do
|
||||
@@ -779,6 +781,62 @@ testRestoreMessagesV2 at@(ATransport t) =
|
||||
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
|
||||
runClient _ test' = testSMPClient test' `shouldReturn` ()
|
||||
|
||||
testRestoreExpireMessages :: ATransport -> Spec
|
||||
testRestoreExpireMessages at@(ATransport t) =
|
||||
it "should store messages on exit and restore on start" $ do
|
||||
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
|
||||
recipientId <- newTVarIO ""
|
||||
recipientKey <- newTVarIO Nothing
|
||||
dhShared <- newTVarIO Nothing
|
||||
senderId <- newTVarIO ""
|
||||
|
||||
withSmpServerStoreMsgLogOnV2 at testPort . runTest t $ \h -> do
|
||||
runClient t $ \h1 -> do
|
||||
(sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub
|
||||
atomically $ do
|
||||
writeTVar recipientId rId
|
||||
writeTVar recipientKey $ Just rKey
|
||||
writeTVar dhShared $ Just dh
|
||||
writeTVar senderId sId
|
||||
sId <- readTVarIO senderId
|
||||
Resp "1" _ OK <- signSendRecv h sKey ("1", sId, _SEND "hello 1")
|
||||
Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2")
|
||||
threadDelay 3000000
|
||||
Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3")
|
||||
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4")
|
||||
pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
msgs <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs) `shouldBe` 4
|
||||
|
||||
let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200}
|
||||
cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1}
|
||||
withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
msgs' <- B.readFile testStoreMsgsFile
|
||||
msgs' `shouldBe` msgs
|
||||
|
||||
let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200}
|
||||
cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2}
|
||||
withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- two messages expired
|
||||
msgs'' <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs'') `shouldBe` 2
|
||||
B.lines msgs'' `shouldBe` drop 2 (B.lines msgs)
|
||||
|
||||
where
|
||||
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
|
||||
runTest _ test' server = do
|
||||
testSMPClient test' `shouldReturn` ()
|
||||
killThread server
|
||||
|
||||
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
|
||||
runClient _ test' = testSMPClient test' `shouldReturn` ()
|
||||
|
||||
createAndSecureQueue :: Transport c => THandle c -> SndPublicVerifyKey -> IO (SenderId, RecipientId, RcvPrivateSignKey, RcvDhSecret)
|
||||
createAndSecureQueue h sPub = do
|
||||
(rPub, rKey) <- C.generateSignatureKeyPair C.SEd448
|
||||
|
||||
Reference in New Issue
Block a user