From 60294521f4e7a8faa576872eba140de1a3ffd21c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 7 Jun 2022 10:18:40 +0100 Subject: [PATCH] add msgId to ACK to avoid the risks of losing messages with concurrent delivery (in app/NSE) (#387) * add msgId to ACK to avoid the risks of losing messages with concurrent delivery (in app/NSE) * update ACK to only remove message and update stats if msgId matches * add tests, fix * rename sameMsgId/msgDeleted --- src/Simplex/Messaging/Agent.hs | 15 +- src/Simplex/Messaging/Agent/Client.hs | 8 +- src/Simplex/Messaging/Agent/Store.hs | 2 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 11 +- src/Simplex/Messaging/Client.hs | 6 +- src/Simplex/Messaging/Protocol.hs | 12 +- src/Simplex/Messaging/Server.hs | 170 +++++++++++-------- src/Simplex/Messaging/Server/Env/STM.hs | 14 +- src/Simplex/Messaging/Server/MsgStore.hs | 4 +- src/Simplex/Messaging/Server/MsgStore/STM.hs | 25 ++- tests/ServerTests.hs | 102 +++++++++-- 11 files changed, 238 insertions(+), 131 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index a19b1c677..a0239b3e7 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -508,8 +508,8 @@ ackMessage' c connId msgId = do ack :: RcvQueue -> m () ack rq = do let mId = InternalId msgId - withStore $ \st -> checkRcvMsg st connId mId - sendAck c rq + srvMsgId <- withStore $ \st -> checkRcvMsg st connId mId + sendAck c rq srvMsgId withStore $ \st -> deleteMsg st connId mId -- | Suspend SMP agent connection (OFF command) in Reader monad @@ -711,6 +711,11 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) _ -> prohibited >> ack _ -> prohibited >> ack _ -> prohibited >> ack + where + ack :: m () + ack = sendAck c rq srvMsgId + handleNotifyAck :: m () -> m () + handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack SMP.END -> atomically (TM.lookup srv smpClients $>>= tryReadTMVar >>= processEND) >>= logServer "<--" c srv rId @@ -731,15 +736,9 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) notify :: ACommand 'Agent -> m () notify msg = atomically $ writeTBQueue subQ ("", connId, msg) - handleNotifyAck :: m () -> m () - handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack - prohibited :: m () prohibited = notify . ERR $ AGENT A_PROHIBITED - ack :: m () - ack = sendAck c rq - decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> m (SMP.PrivHeader, AgentMsgEnvelope) decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 6f186ece2..999f0d786 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -71,7 +71,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -486,10 +486,10 @@ secureQueue c RcvQueue {server, rcvId, rcvPrivateKey} senderKey = withLogClient c server rcvId "KEY " $ \smp -> secureSMPQueue smp rcvPrivateKey rcvId senderKey -sendAck :: AgentMonad m => AgentClient -> RcvQueue -> m () -sendAck c RcvQueue {server, rcvId, rcvPrivateKey} = +sendAck :: AgentMonad m => AgentClient -> RcvQueue -> MsgId -> m () +sendAck c RcvQueue {server, rcvId, rcvPrivateKey} msgId = withLogClient c server rcvId "ACK" $ \smp -> - ackSMPMessage smp rcvPrivateKey rcvId + ackSMPMessage smp rcvPrivateKey rcvId msgId suspendQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () suspendQueue c RcvQueue {server, rcvId, rcvPrivateKey} = diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 34c1579ae..d2629333e 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -67,7 +67,7 @@ class Monad m => MonadAgentStore s m where createSndMsg :: s -> ConnId -> SndMsgData -> m () getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, PendingMsgData) getPendingMsgs :: s -> ConnId -> m [InternalId] - checkRcvMsg :: s -> ConnId -> InternalId -> m () + checkRcvMsg :: s -> ConnId -> InternalId -> m MsgId deleteMsg :: s -> ConnId -> InternalId -> m () -- Double ratchet persistence diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index ec34ab9a3..0eea2bc11 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -485,21 +485,18 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto map fromOnly <$> DB.query db "SELECT internal_id FROM snd_messages WHERE conn_id = ?" (Only connId) - checkRcvMsg :: SQLiteStore -> ConnId -> InternalId -> m () + checkRcvMsg :: SQLiteStore -> ConnId -> InternalId -> m SMP.MsgId checkRcvMsg st connId msgId = liftIOEither . withTransaction st $ \db -> - hasMsg - <$> DB.query + firstRow fromOnly SEMsgNotFound $ + DB.query db [sql| - SELECT conn_id, internal_id + SELECT broker_id FROM rcv_messages WHERE conn_id = ? AND internal_id = ? |] (connId, msgId) - where - hasMsg :: [(ConnId, InternalId)] -> Either StoreError () - hasMsg r = if null r then Left SEMsgNotFound else Right () deleteMsg :: SQLiteStore -> ConnId -> InternalId -> m () deleteMsg st connId msgId = diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index e87a56e8c..10fef28de 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -317,9 +317,9 @@ sendSMPMessage c spKey sId flags msg = -- | Acknowledge message delivery (server deletes the message). -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery -ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> ExceptT ProtocolClientError IO () -ackSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId = - sendSMPCommand c (Just rpKey) rId ACK >>= \case +ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> MsgId -> ExceptT ProtocolClientError IO () +ackSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId msgId = + sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case OK -> return () cmd@MSG {} -> lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 2503bb9f2..bb4d9593a 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -217,7 +217,9 @@ data Command (p :: Party) where KEY :: SndPublicVerifyKey -> Command Recipient NKEY :: NtfPublicVerifyKey -> Command Recipient GET :: Command Recipient - ACK :: Command Recipient + -- ACK v1 has to be supported for encoding/decoding + -- ACK :: Command Recipient + ACK :: MsgId -> Command Recipient OFF :: Command Recipient DEL :: Command Recipient -- SMP sender commands @@ -608,7 +610,9 @@ instance PartyI p => ProtocolEncoding (Command p) where KEY k -> e (KEY_, ' ', k) NKEY k -> e (NKEY_, ' ', k) GET -> e GET_ - ACK -> e ACK_ + ACK msgId + | v == 1 -> e ACK_ + | otherwise -> e (ACK_, ' ', msgId) OFF -> e OFF_ DEL -> e DEL_ SEND flags msg @@ -653,7 +657,9 @@ instance ProtocolEncoding Cmd where KEY_ -> KEY <$> _smpP NKEY_ -> NKEY <$> _smpP GET_ -> pure GET - ACK_ -> pure ACK + ACK_ + | v == 1 -> pure $ ACK "" + | otherwise -> ACK <$> _smpP OFF_ -> pure OFF DEL_ -> pure DEL CT SSender tag -> diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bda7261ad..5b0ee4f5a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -207,10 +207,11 @@ clientDisconnected c@Client {subscriptions, connected} = do sameClientSession :: Client -> Client -> Bool sameClientSession Client {sessionId} Client {sessionId = s'} = sessionId == s' -cancelSub :: MonadUnliftIO m => Sub -> m () -cancelSub = \case - Sub {subThread = SubThread t} -> killThread t - _ -> return () +cancelSub :: MonadUnliftIO m => TVar Sub -> m () +cancelSub sub = + readTVarIO sub >>= \case + Sub {subThread = SubThread t} -> killThread t + _ -> return () receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m () receive th Client {rcvQ, sndQ, activeAt} = forever $ do @@ -310,7 +311,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri (pure (corrId, queueId, ERR AUTH)) SUB -> subscribeQueue queueId GET -> getMessage - ACK -> acknowledgeMsg + ACK msgId -> acknowledgeMsg msgId KEY sKey -> secureQueue_ st sKey NKEY nKey -> addQueueNotifier_ st nKey OFF -> suspendQueue_ st @@ -387,42 +388,55 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri subscribeQueue :: RecipientId -> m (Transmission BrokerMsg) subscribeQueue rId = - atomically (getSubscription rId) >>= \case - Just s -> deliverMessage tryPeekMsg rId s - -- cannot use SUB in the same connection where GET was used - _ -> pure (corrId, rId, ERR $ CMD PROHIBITED) - - getSubscription :: RecipientId -> STM (Maybe Sub) - getSubscription rId = do - TM.lookup rId subscriptions >>= \case - Just Sub {subThread = ProhibitSub} -> pure Nothing - Just s -> tryTakeTMVar (delivered s) $> Just s - Nothing -> do + atomically (TM.lookup rId subscriptions) >>= \case + Nothing -> + atomically newSub >>= deliver + Just sub -> + readTVarIO sub >>= \case + Sub {subThread = ProhibitSub} -> + -- cannot use SUB in the same connection where GET was used + pure (corrId, rId, ERR $ CMD PROHIBITED) + s -> + atomically (tryTakeTMVar $ delivered s) >> deliver sub + where + newSub :: STM (TVar Sub) + newSub = do writeTBQueue subscribedQ (rId, clnt) - s <- newSubscription - TM.insert rId s subscriptions - pure $ Just s + sub <- newTVar =<< newSubscription NoSub + TM.insert rId sub subscriptions + pure sub + deliver :: TVar Sub -> m (Transmission BrokerMsg) + deliver sub = do + q <- getStoreMsgQueue rId + msg_ <- atomically $ tryPeekMsg q + deliverMessage rId sub q msg_ getMessage :: m (Transmission BrokerMsg) getMessage = - atomically getProhibitedSub >>= \case - Just s -> do + atomically (TM.lookup queueId subscriptions) >>= \case + Nothing -> + atomically newSub >>= getMessage_ + Just sub -> + readTVarIO sub >>= \case + s@Sub {subThread = ProhibitSub} -> + atomically (tryTakeTMVar $ delivered s) + >> getMessage_ s + -- cannot use GET in the same connection where there is an active subscription + _ -> pure (corrId, queueId, ERR $ CMD PROHIBITED) + where + newSub :: STM Sub + newSub = do + s <- newSubscription ProhibitSub + sub <- newTVar s + TM.insert queueId sub subscriptions + pure s + getMessage_ :: Sub -> m (Transmission BrokerMsg) + getMessage_ s = do q <- getStoreMsgQueue queueId atomically $ tryPeekMsg q >>= \case - Just msg -> tryPutTMVar (delivered s) () $> (corrId, queueId, msgCmd msg) + Just msg -> setDelivered s msg $> (corrId, queueId, msgCmd msg) _ -> pure (corrId, queueId, ERR NO_MSG) - _ -> pure (corrId, queueId, ERR $ CMD PROHIBITED) -- cannot use GET in the same connection where there is an active subscription - where - getProhibitedSub :: STM (Maybe Sub) - getProhibitedSub = - TM.lookup queueId subscriptions >>= \case - Just s@Sub {subThread = ProhibitSub} -> tryTakeTMVar (delivered s) $> Just s - Just _ -> pure Nothing - Nothing -> do - s <- prohibitedSubscription - TM.insert queueId s subscriptions - pure $ Just s subscribeNotifications :: m (Transmission BrokerMsg) subscribeNotifications = atomically $ do @@ -431,23 +445,37 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri TM.insert queueId () ntfSubscriptions pure ok - acknowledgeMsg :: m (Transmission BrokerMsg) - acknowledgeMsg = - atomically (withSub queueId $ \s -> const s <$$> tryTakeTMVar (delivered s)) - >>= \case - Just (Just s) -> do - stats <- asks serverStats - atomically $ modifyTVar (msgRecv stats) (+ 1) - atomically $ modifyTVar (msgQueues stats) (S.insert queueId) - case s of - Sub {subThread = ProhibitSub} -> - (getStoreMsgQueue queueId >>= atomically . tryDelMsg) $> ok - _ -> - deliverMessage tryDelPeekMsg queueId s - _ -> return $ err NO_MSG - - withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a) - withSub rId f = mapM f =<< TM.lookup rId subscriptions + acknowledgeMsg :: MsgId -> m (Transmission BrokerMsg) + acknowledgeMsg msgId = do + atomically (TM.lookup queueId subscriptions) >>= \case + Nothing -> pure $ err NO_MSG + Just sub -> + atomically (getDelivered sub) >>= \case + Just s -> do + q <- getStoreMsgQueue queueId + case s of + Sub {subThread = ProhibitSub} -> do + msgDeleted <- atomically $ tryDelMsg q msgId + when msgDeleted updateStats + pure ok + _ -> do + (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId + when msgDeleted updateStats + deliverMessage queueId sub q msg_ + _ -> pure $ err NO_MSG + where + getDelivered :: TVar Sub -> STM (Maybe Sub) + getDelivered sub = do + s@Sub {delivered} <- readTVar sub + tryTakeTMVar delivered $>>= \msgId' -> + if B.null msgId || msgId == msgId' + then pure $ Just s + else putTMVar delivered msgId' $> Nothing + updateStats :: m () + updateStats = do + stats <- asks serverStats + atomically $ modifyTVar (msgRecv stats) (+ 1) + atomically $ modifyTVar (msgQueues stats) (S.insert queueId) sendMessage :: QueueStore -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg) sendMessage st flags msgBody @@ -496,35 +524,33 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri unlessM (isFullTBQueue sndQ) $ writeTBQueue q (CorrId "", nId, NMSG) - deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m (Transmission BrokerMsg) - deliverMessage tryPeek rId = \case - Sub {subThread = NoSub} -> do - q <- getStoreMsgQueue rId - atomically (tryPeek q) >>= \case - Nothing -> forkSub q $> ok - Just msg -> atomically setDelivered $> (corrId, rId, msgCmd msg) - _ -> pure ok + deliverMessage :: RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) + deliverMessage rId sub q msg_ = + readTVarIO sub >>= \case + s@Sub {subThread = NoSub} -> + case msg_ of + Just msg -> atomically (setDelivered s msg) $> (corrId, rId, msgCmd msg) + _ -> forkSub $> ok + _ -> pure ok where - forkSub :: MsgQueue -> m () - forkSub q = do - atomically . setSub $ \s -> s {subThread = SubPending} - t <- forkIO $ subscriber q - atomically . setSub $ \case + forkSub :: m () + forkSub = do + atomically . modifyTVar sub $ \s -> s {subThread = SubPending} + t <- forkIO subscriber + atomically . modifyTVar sub $ \case s@Sub {subThread = SubPending} -> s {subThread = SubThread t} s -> s - subscriber :: MsgQueue -> m () - subscriber q = atomically $ do + subscriber :: m () + subscriber = atomically $ do msg <- peekMsg q writeTBQueue sndQ (CorrId "", rId, msgCmd msg) - setSub (\s -> s {subThread = NoSub}) - void setDelivered + s <- readTVar sub + void $ setDelivered s msg + writeTVar sub s {subThread = NoSub} - setSub :: (Sub -> Sub) -> STM () - setSub f = TM.adjust f rId subscriptions - - setDelivered :: STM (Maybe Bool) - setDelivered = withSub rId $ \s -> tryPutTMVar (delivered s) () + setDelivered :: Sub -> Message -> STM Bool + setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId getStoreMsgQueue :: RecipientId -> m MsgQueue getStoreMsgQueue rId = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 3bec11ee4..35f0dc051 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -94,7 +94,7 @@ data Server = Server } data Client = Client - { subscriptions :: TMap RecipientId Sub, + { subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (Transmission Cmd), sndQ :: TBQueue (Transmission BrokerMsg), @@ -117,7 +117,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread ThreadId | ProhibitSub data Sub = Sub { subThread :: SubscriptionThread, - delivered :: TMVar () + delivered :: TMVar MsgId } newServer :: Natural -> STM Server @@ -149,14 +149,8 @@ newServerStats ts = do fromTime <- newTVar ts pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime} -newSubscription :: STM Sub -newSubscription = newSubscription_ NoSub - -prohibitedSubscription :: STM Sub -prohibitedSubscription = newSubscription_ ProhibitSub - -newSubscription_ :: SubscriptionThread -> STM Sub -newSubscription_ subThread = do +newSubscription :: SubscriptionThread -> STM Sub +newSubscription subThread = do delivered <- newEmptyTMVar return Sub {subThread, delivered} diff --git a/src/Simplex/Messaging/Server/MsgStore.hs b/src/Simplex/Messaging/Server/MsgStore.hs index c4f36acb5..56b86cc49 100644 --- a/src/Simplex/Messaging/Server/MsgStore.hs +++ b/src/Simplex/Messaging/Server/MsgStore.hs @@ -23,6 +23,6 @@ class MonadMsgQueue q m where writeMsg :: q -> Message -> m () -- non blocking tryPeekMsg :: q -> m (Maybe Message) -- non blocking peekMsg :: q -> m Message -- blocking - tryDelMsg :: q -> m () -- non blocking - tryDelPeekMsg :: q -> m (Maybe Message) -- atomic delete (== read) last and peek next message, if available + tryDelMsg :: q -> MsgId -> m Bool -- non blocking + tryDelPeekMsg :: q -> MsgId -> m (Bool, Maybe Message) -- atomic delete (== read) last and peek next message, if available deleteExpiredMsgs :: q -> Int64 -> m () diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 06048b8e1..ae0ae60fd 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -2,16 +2,19 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.MsgStore.STM where -import Control.Monad (void, when) +import Control.Monad (when) +import Data.Functor (($>)) import Data.Int (Int64) import Data.Time.Clock.System (SystemTime (systemSeconds)) import Numeric.Natural -import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.Protocol (MsgId, RecipientId) import Simplex.Messaging.Server.MsgStore import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -49,12 +52,22 @@ instance MonadMsgQueue MsgQueue STM where peekMsg :: MsgQueue -> STM Message peekMsg = peekTBQueue . msgQueue - tryDelMsg :: MsgQueue -> STM () - tryDelMsg = void . tryReadTBQueue . msgQueue + tryDelMsg :: MsgQueue -> MsgId -> STM Bool + tryDelMsg (MsgQueue q) msgId' = + tryPeekTBQueue q >>= \case + Just Message {msgId} + | msgId == msgId' -> tryReadTBQueue q $> True + | otherwise -> pure False + _ -> pure False -- atomic delete (== read) last and peek next message if available - tryDelPeekMsg :: MsgQueue -> STM (Maybe Message) - tryDelPeekMsg (MsgQueue q) = tryReadTBQueue q >> tryPeekTBQueue q + tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Bool, Maybe Message) + tryDelPeekMsg (MsgQueue q) msgId' = + tryPeekTBQueue q >>= \case + msg_@(Just Message {msgId}) + | msgId == msgId' -> (True,) <$> (tryReadTBQueue q >> tryPeekTBQueue q) + | otherwise -> pure (False, msg_) + _ -> pure (False, Nothing) deleteExpiredMsgs :: MsgQueue -> Int64 -> STM () deleteExpiredMsgs (MsgQueue q) old = loop diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 725688ba8..92b737768 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -41,6 +41,8 @@ serverTests t@(ATransport t') = do describe "SMP messages" $ do describe "duplex communication over 2 SMP connections" $ testDuplex t describe "switch subscription to another TCP connection" $ testSwitchSub t + describe "GET command" $ testGetCommand t' + describe "GET & SUB commands" $ testGetSubCommands t' describe "Store log" $ testWithStoreLog t describe "Timing of AUTH error" $ testTiming t describe "Message notifications" $ testMessageNotifications t @@ -91,10 +93,10 @@ testCreateSecure (ATransport t) = Resp "" _ (MSG mId1 _ _ msg1) <- tGet h (dec mId1 msg1, Right "hello") #== "delivers message" - Resp "cdab" _ ok4 <- signSendRecv h rKey ("cdab", rId, ACK) + Resp "cdab" _ ok4 <- signSendRecv h rKey ("cdab", rId, ACK mId1) (ok4, OK) #== "replies OK when message acknowledged if no more messages" - Resp "dabc" _ err6 <- signSendRecv h rKey ("dabc", rId, ACK) + Resp "dabc" _ err6 <- signSendRecv h rKey ("dabc", rId, ACK mId1) (err6, ERR NO_MSG) #== "replies ERR when message acknowledged without messages" (sPub, sKey) <- C.generateSignatureKeyPair C.SEd448 @@ -121,7 +123,7 @@ testCreateSecure (ATransport t) = Resp "" _ (MSG mId2 _ _ msg2) <- tGet h (dec mId2 msg2, Right "hello again") #== "delivers message 2" - Resp "cdab" _ ok5 <- signSendRecv h rKey ("cdab", rId, ACK) + Resp "cdab" _ ok5 <- signSendRecv h rKey ("cdab", rId, ACK mId2) (ok5, OK) #== "replies OK when message acknowledged 2" Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, _SEND "hello") @@ -188,7 +190,7 @@ testCreateDelete (ATransport t) = Resp "dabc" _ err8 <- sendRecv sh ("", "dabc", sId, _SEND "hello") (err8, ERR AUTH) #== "rejects unsigned SEND too when deleted" - Resp "abcd" _ err11 <- signSendRecv rh rKey ("abcd", rId, ACK) + Resp "abcd" _ err11 <- signSendRecv rh rKey ("abcd", rId, ACK "") (err11, ERR AUTH) #== "rejects ACK when conn deleted - the second message is deleted" Resp "bcda" _ err9 <- signSendRecv rh rKey ("bcda", rId, OFF) @@ -239,7 +241,7 @@ testDuplex (ATransport t) = -- "key ..." is ad-hoc, not a part of SMP protocol Resp "" _ (MSG mId1 _ _ msg1) <- tGet alice - Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK) + Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK mId1) Right ["key", bobKey] <- pure $ B.words <$> aDec mId1 msg1 (bobKey, strEncode bsPub) #== "key received from Bob" Resp "dabc" _ OK <- signSendRecv alice arKey ("dabc", aRcv, KEY bsPub) @@ -252,7 +254,7 @@ testDuplex (ATransport t) = -- "reply_id ..." is ad-hoc, not a part of SMP protocol Resp "" _ (MSG mId2 _ _ msg2) <- tGet alice - Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK) + Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK mId2) Right ["reply_id", bId] <- pure $ B.words <$> aDec mId2 msg2 (bId, encode bSnd) #== "reply queue ID received from Bob" @@ -261,7 +263,7 @@ testDuplex (ATransport t) = -- "key ..." is ad-hoc, not a part of SMP protocol Resp "" _ (MSG mId3 _ _ msg3) <- tGet bob - Resp "abcd" _ OK <- signSendRecv bob brKey ("abcd", bRcv, ACK) + Resp "abcd" _ OK <- signSendRecv bob brKey ("abcd", bRcv, ACK mId3) Right ["key", aliceKey] <- pure $ B.words <$> bDec mId3 msg3 (aliceKey, strEncode asPub) #== "key received from Alice" Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, KEY asPub) @@ -269,13 +271,13 @@ testDuplex (ATransport t) = Resp "cdab" _ OK <- signSendRecv bob bsKey ("cdab", aSnd, _SEND "hi alice") Resp "" _ (MSG mId4 _ _ msg4) <- tGet alice - Resp "dabc" _ OK <- signSendRecv alice arKey ("dabc", aRcv, ACK) + Resp "dabc" _ OK <- signSendRecv alice arKey ("dabc", aRcv, ACK mId4) (aDec mId4 msg4, Right "hi alice") #== "message received from Bob" Resp "abcd" _ OK <- signSendRecv alice asKey ("abcd", bSnd, _SEND "how are you bob") Resp "" _ (MSG mId5 _ _ msg5) <- tGet bob - Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, ACK) + Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, ACK mId5) (bDec mId5 msg5, Right "how are you bob") #== "message received from alice" testSwitchSub :: ATransport -> Spec @@ -293,12 +295,12 @@ testSwitchSub (ATransport t) = Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh1 (dec mId1 msg1, Right "test1") #== "test message 1 delivered to the 1st TCP connection" - Resp "abcd" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK) + Resp "abcd" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1) (dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK" Resp "bcda" _ (MSG mId2' _ _ msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB) (dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)" - Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK) + Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2') Resp "" _ end <- tGet rh1 (end, END) #== "unsubscribed the 1st TCP connection" @@ -308,16 +310,82 @@ testSwitchSub (ATransport t) = Resp "" _ (MSG mId3 _ _ msg3) <- tGet rh2 (dec mId3 msg3, Right "test3") #== "delivered to the 2nd TCP connection" - Resp "abcd" _ err <- signSendRecv rh1 rKey ("abcd", rId, ACK) + Resp "abcd" _ err <- signSendRecv rh1 rKey ("abcd", rId, ACK mId3) (err, ERR NO_MSG) #== "rejects ACK from the 1st TCP connection" - Resp "bcda" _ ok3 <- signSendRecv rh2 rKey ("bcda", rId, ACK) + Resp "bcda" _ ok3 <- signSendRecv rh2 rKey ("bcda", rId, ACK mId3) (ok3, OK) #== "accepts ACK from the 2nd TCP connection" 1000 `timeout` tGet @BrokerMsg rh1 >>= \case Nothing -> return () Just _ -> error "nothing else is delivered to the 1st TCP connection" +testGetCommand :: forall c. Transport c => TProxy c -> Spec +testGetCommand t = + it "should retrieve messages from the queue using GET command" $ do + (sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519 + smpTest t $ \sh -> do + queue <- newEmptyTMVarIO + testSMPClient @c $ \rh -> + atomically . putTMVar queue =<< createAndSecureQueue rh sPub + testSMPClient @c $ \rh -> do + (sId, rId, rKey, dhShared) <- atomically $ takeTMVar queue + let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce) + Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello") + Resp "2" _ (MSG mId1 _ _ msg1) <- signSendRecv rh rKey ("2", rId, GET) + (dec mId1 msg1, Right "hello") #== "retrieved from queue" + Resp "3" _ OK <- signSendRecv rh rKey ("3", rId, ACK mId1) + pure () + +testGetSubCommands :: forall c. Transport c => TProxy c -> Spec +testGetSubCommands t = + it "should retrieve messages with GET and receive with SUB, only one ACK would work" $ do + (sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519 + smpTest3 t $ \rh1 rh2 sh -> do + (sId, rId, rKey, dhShared) <- createAndSecureQueue rh1 sPub + let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce) + Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello 1") + Resp "1a" _ OK <- signSendRecv sh sKey ("1a", sId, _SEND "hello 2") + Resp "1b" _ OK <- signSendRecv sh sKey ("1b", sId, _SEND "hello 3") + Resp "1c" _ OK <- signSendRecv sh sKey ("1c", sId, _SEND "hello 4") + -- both get the same if not ACK'd + Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh1 + Resp "2" _ (MSG mId1' _ _ msg1') <- signSendRecv rh2 rKey ("2", rId, GET) + (dec mId1 msg1, Right "hello 1") #== "received from queue via SUB" + (dec mId1' msg1', Right "hello 1") #== "retrieved from queue with GET" + mId1 `shouldBe` mId1' + msg1 `shouldBe` msg1' + -- subscriber cannot GET, getter cannot SUB + Resp "3" _ (ERR (CMD PROHIBITED)) <- signSendRecv rh1 rKey ("3", rId, GET) + Resp "3a" _ (ERR (CMD PROHIBITED)) <- signSendRecv rh2 rKey ("3a", rId, SUB) + -- ACK for SUB delivers the next message + Resp "4" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("4", rId, ACK mId1) + (dec mId2 msg2, Right "hello 2") #== "received from queue via SUB" + -- bad msgId returns error + Resp "5" _ (ERR NO_MSG) <- signSendRecv rh2 rKey ("5", rId, ACK "1234") + -- already ACK'd by subscriber, but still returns OK when msgId matches + Resp "5a" _ OK <- signSendRecv rh2 rKey ("5a", rId, ACK mId1) + -- msg2 is not lost - even if subscriber does not ACK it, it is delivered to getter + Resp "6" _ (MSG mId2' _ _ msg2') <- signSendRecv rh2 rKey ("6", rId, GET) + (dec mId2' msg2', Right "hello 2") #== "retrieved from queue with GET" + mId2 `shouldBe` mId2' + msg2 `shouldBe` msg2' + -- getter ACK returns OK, even though there is the next message + Resp "7" _ OK <- signSendRecv rh2 rKey ("7", rId, ACK mId2') + Resp "8" _ (MSG mId3 _ _ msg3) <- signSendRecv rh2 rKey ("8", rId, GET) + (dec mId3 msg3, Right "hello 3") #== "retrieved from queue with GET" + -- subscriber ACK does not lose message + Resp "9" _ (MSG mId3' _ _ msg3') <- signSendRecv rh1 rKey ("9", rId, ACK mId2') + (dec mId3' msg3', Right "hello 3") #== "retrieved from queue with GET" + mId3 `shouldBe` mId3' + msg3 `shouldBe` msg3' + Resp "10" _ (MSG mId4 _ _ msg4) <- signSendRecv rh1 rKey ("10", rId, ACK mId3) + (dec mId4 msg4, Right "hello 4") #== "retrieved from queue with GET" + Resp "11" _ OK <- signSendRecv rh1 rKey ("11", rId, ACK mId4) + -- no more messages for getter too + Resp "12" _ (ERR NO_MSG) <- signSendRecv rh2 rKey ("12", rId, GET) + pure () + testWithStoreLog :: ATransport -> Spec testWithStoreLog at@(ATransport t) = it "should store simplex queues to log and restore them after server restart" $ do @@ -474,7 +542,7 @@ testMessageNotifications (ATransport t) = Resp "3" _ OK <- signSendRecv sh sKey ("3", sId, _SEND "hello") Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh (dec mId1 msg1, Right "hello") #== "delivered from queue" - Resp "3a" _ OK <- signSendRecv rh rKey ("3a", rId, ACK) + Resp "3a" _ OK <- signSendRecv rh rKey ("3a", rId, ACK mId1) Resp "" _ NMSG <- tGet nh1 Resp "4" _ OK <- signSendRecv nh2 nKey ("4", nId, NSUB) Resp "" _ END <- tGet nh1 @@ -563,13 +631,17 @@ syntaxTests (ATransport t) = do it "no signature" $ ("", "abcd", "12345678", (KEY_, ' ', samplePubKey)) >#> ("", "abcd", "12345678", ERR $ CMD NO_AUTH) it "no queue ID" $ (sampleSig, "bcda", "", (KEY_, ' ', samplePubKey)) >#> ("", "bcda", "", ERR $ CMD NO_AUTH) noParamsSyntaxTest "SUB" SUB_ - noParamsSyntaxTest "ACK" ACK_ noParamsSyntaxTest "OFF" OFF_ noParamsSyntaxTest "DEL" DEL_ describe "SEND" $ do it "valid syntax" $ (sampleSig, "cdab", "12345678", (SEND_, ' ', noMsgFlags, ' ', "hello" :: ByteString)) >#> ("", "cdab", "12345678", ERR AUTH) it "no parameters" $ (sampleSig, "abcd", "12345678", SEND_) >#> ("", "abcd", "12345678", ERR $ CMD SYNTAX) it "no queue ID" $ (sampleSig, "bcda", "", (SEND_, ' ', noMsgFlags, ' ', "hello" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_ENTITY) + describe "ACK" $ do + it "valid syntax" $ (sampleSig, "cdab", "12345678", (ACK_, ' ', "1234" :: ByteString)) >#> ("", "cdab", "12345678", ERR AUTH) + it "no parameters" $ (sampleSig, "abcd", "12345678", ACK_) >#> ("", "abcd", "12345678", ERR $ CMD SYNTAX) + it "no queue ID" $ (sampleSig, "bcda", "", (ACK_, ' ', "1234" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_AUTH) + it "no signature" $ ("", "cdab", "12345678", (ACK_, ' ', "1234" :: ByteString)) >#> ("", "cdab", "12345678", ERR $ CMD NO_AUTH) describe "PING" $ do it "valid syntax" $ ("", "abcd", "", PING_) >#> ("", "abcd", "", PONG) describe "broker response not allowed" $ do