diff --git a/rfcs/2022-08-14-queue-rotation.md b/rfcs/2022-08-14-queue-rotation.md index 9b3e53c1b..d85837c55 100644 --- a/rfcs/2022-08-14-queue-rotation.md +++ b/rfcs/2022-08-14-queue-rotation.md @@ -22,7 +22,9 @@ Additional agent messages required for the protocol: `QREADY`: instruct the sender that the new is ready to use with sender's queue address as parameter, encoded as `QR` - sender will send HELLO to the new queue. -`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving HELLO. +`QHELLO`: sender sends to the new queue to confirm it's working, encoded as `QH` + +`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving `QHELLO`. ### Protocol @@ -39,7 +41,7 @@ B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of B ->> R ->> A: continue sending new messages to the old queue A ->> R': secure queue A ->> S ->> B: QREADY (R'): instruction to use new queue -B ->> R' ->> A: HELLO: to validate that the sender can send messages to the new queue before switching to it +B ->> R' ->> A: QHELLO: to validate that the sender can send messages to the new queue before switching to it A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained. A ->> R: suspend queue, receive all messages diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 3957be6c8..1bc082c2a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -109,7 +109,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta, SndPublicVerifyKey) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util @@ -428,11 +428,68 @@ subscribeConnection' c connId = SomeConn _ (RcvConnection _ rq) -> subscribe rq SomeConn _ (ContactConnection _ rq) -> subscribe rq where + -- TODO sndQueueAction? subscribe :: RcvQueue -> m () subscribe rq = do subscribeQueue c rq connId ns <- asks ntfSupervisor atomically $ sendNtfSubCommand ns (connId, NSCCreate) + doRcvQueueAction c rq + +-- TODO expire actions +doRcvQueueAction :: AgentMonad m => AgentClient -> RcvQueue -> m () +doRcvQueueAction c rq@RcvQueue {rcvQueueAction} = forM_ rcvQueueAction $ \(a, _ts) -> case a of + RQACreateNextQueue -> createNextRcvQueue c rq + RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue + RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue + RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue + +createNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () +createNextRcvQueue c rq = do + _nextRq_ <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) + -- unless new rcv queue exists + -- then newRcvQueue + -- store to the database + -- enqueue QNEW message + -- rcv_queue_action = null + pure () + +secureNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m () +secureNextRcvQueue _c _mainRq _nextRq = do + -- if not yet secured, secure new queue (it can be repeated with the same key) + -- set queue status to Secured + -- Enqueue QREADY message + -- rcv_queue_action = null + pure () + +suspendCurrRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m () +suspendCurrRcvQueue c currRq nextRq = do + -- Suspend curr queue + -- if 0 messages left: + -- + currRcvQueueDrained c currRq nextRq + +currRcvQueueDrained :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m () +currRcvQueueDrained c currRq nextRq = do + -- old queue status = Disabled + -- rcv_queue_action = RQADeleteQueue + -- + deleteCurrRcvQueue c currRq nextRq + +deleteCurrRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m () +deleteCurrRcvQueue _c _currRq _nextRq = do + -- delete old queue + -- make a new queue a main one + -- get message from a new queue storage and process it (possibly send to the processing queue) + pure () + +withNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> (AgentClient -> RcvQueue -> RcvQueue -> m ()) -> m () +withNextRcvQueue c rq action = do + withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case + Just nextRq -> action c rq nextRq + _ -> do + -- notify agent internal error + pure () subscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ())) subscribeConnections' _ [] = pure M.empty @@ -450,6 +507,7 @@ subscribeConnections' c connIds = do when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs let rs = M.unions $ errs' : sndRs : rcvRs notifyResultError rs + void . forkIO . forM_ rcvQs $ doRcvQueueAction c . fst pure rs where rcvOrSndQueue :: SomeConn -> Either (SndQueue, ConnData) (RcvQueue, ConnData) @@ -643,8 +701,13 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh AM_REPLY_ -> notifyDel msgId $ ERR e AM_A_MSG_ -> notifyDel msgId $ MERR mId e AM_QNEW_ -> pure () - AM_QKEYS_ -> pure () + AM_QKEYS_ -> do + -- TODO new send queue status = Confirmed + pure () AM_QREADY_ -> pure () + AM_QHELLO_ -> do + -- cancel switching, delete new send queue + pure () AM_QSWITCH_ -> pure () _ -- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server), @@ -671,10 +734,6 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh case rq_ of -- party initiating connection (in v1) Just RcvQueue {status} -> - -- it is unclear why subscribeQueue was needed here, - -- message delivery can only be enabled for queues that were created in the current session or subscribed - -- subscribeQueue c rq connId - -- -- If initiating party were to send CON to the user without waiting for reply HELLO (to reduce handshake time), -- it would lead to the non-deterministic internal ID of the first sent message, at to some other race conditions, -- because it can be sent before HELLO is received @@ -688,6 +747,10 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh qInfo <- createReplyQueue c cData sq void . enqueueMessage c cData sq SMP.noMsgFlags $ REPLY [qInfo] AM_A_MSG_ -> notify $ SENT mId + AM_QHELLO_ -> do + -- withStore' c $ \db -> setSndQueueStatus db sq Active + -- what else should happen here? + pure () _ -> pure () delMsg msgId where @@ -725,22 +788,13 @@ ackMessage' c connId msgId = do switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m () switchConnection' c connId = withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection _ _rq _) -> do + SomeConn _ (DuplexConnection _ rq _) -> do -- rcv_queue_action = RQACreateNewQueue - -- createNewRcvQueue rq + createNextRcvQueue c rq pure () SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX _ -> throwError $ CMD PROHIBITED -createNewRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () -createNewRcvQueue _c _rq = do - -- unless new rcv queue exists - -- then newRcvQueue - -- store to the database - -- enqueueSwitchMessage - -- rcv_queue_action = null - pure () - -- | Suspend SMP agent connection (OFF command) in Reader monad suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m Word16 suspendConnection' c connId = @@ -1050,16 +1104,17 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm (SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> tryError agentClientMsg >>= \case Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of - HELLO -> helloMsg >> ack >> withStore' c (\db -> deleteMsg db connId msgId) - REPLY cReq -> replyMsg cReq >> ack >> withStore' c (\db -> deleteMsg db connId msgId) + HELLO -> helloMsg >> ackDelete msgId + REPLY cReq -> replyMsg cReq >> ackDelete msgId -- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command A_MSG body -> do logServer "<--" c srv rId "MSG " notify $ MSG msgMeta msgFlags body - QNEW _ _ -> pure () - QKEYS _ _ -> pure () - QREADY _ -> pure () - QSWITCH _ -> pure () + QNEW addr qInfo -> rqNewMsg addr qInfo >> ackDelete msgId + QKEYS sKey qInfo -> rqKeys sKey qInfo $ ackDelete msgId + QREADY addr -> rqReady addr >> ackDelete msgId + QHELLO -> rqHello $ ackDelete msgId + QSWITCH addr -> rqSwitch addr >> ackDelete msgId Right _ -> prohibited >> ack Left e@(AGENT A_DUPLICATE) -> do withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case @@ -1101,6 +1156,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm sendAck c rq srvMsgId `catchError` \case SMP SMP.NO_MSG -> pure () e -> throwError e + ackDelete :: InternalId -> m () + ackDelete msgId = ack >> withStore' c (\db -> deleteMsg db connId msgId) handleNotifyAck :: m () -> m () handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack SMP.END -> @@ -1116,6 +1173,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm | otherwise -> ignored _ -> ignored ignored = pure "END from disconnected client - ignored" + LEN 0 -> do + -- load nextRq + -- currRcvQueueDrained c rq nextRq + pure () _ -> do logServer "<--" c srv rId $ "unexpected: " <> bshow cmd notify . ERR $ BROKER UNEXPECTED @@ -1218,6 +1279,54 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm connectReplyQueues c cData ownConnInfo smpQueues `catchError` (notify . ERR) _ -> prohibited + -- processed by queue sender + rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueInfo -> m () + rqNewMsg _addr _qInfo = do + -- generate sender and DH keys + -- store new send queue, update current send queue with queue ID of the new queue + -- Enqueue QKEYS message to the main rcv queue + pure () + + -- processed by queue recipient + rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m () + rqKeys _sKey _qInfo ackDelete = do + -- store sender keys + -- new rcv queue status = Confirmed + -- old rcv_queue_action = RQASecureNewQueue + -- load nextRq - this and all above can be one store function, at least one transaction + -- + ackDelete + -- secureNextRcvQueue c rq nextRq + pure () + + -- processed by queue sender + rqReady :: (SMPServer, SMP.SenderId) -> m () + rqReady _addr = do + -- Enqueue QHELLO message to the new queue + pure () + + -- processed by queue recipient, received from the new queue + rqHello :: m () -> m () + rqHello ackDelete = do + -- validate it's the next queue, or send error to the client + -- Enqueue QSWITCH message to the sender + -- snd_switch_action = RQASuspendCurrQueue + -- new queue status = Active + -- currRq <- load current queue + -- + ackDelete + -- + -- suspendCurrRcvQueue currRq rq + pure () + + -- processed by queue sender + rqSwitch :: (SMPServer, SMP.SenderId) -> m () + rqSwitch _addr = do + -- make new queue active + -- delete old queue from the database + -- update unsent messages? or just restart message deliveries? + pure () + smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () smpInvitation connReq@(CRInvitationUri crData _) cInfo = do logServer "<--" c srv rId "MSG " diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 7e2da9dd7..5caa9005b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -436,6 +436,7 @@ data AgentMessageType | AM_QNEW_ | AM_QKEYS_ | AM_QREADY_ + | AM_QHELLO_ | AM_QSWITCH_ deriving (Eq, Show) @@ -449,6 +450,7 @@ instance Encoding AgentMessageType where AM_QNEW_ -> "QN" AM_QKEYS_ -> "QK" AM_QREADY_ -> "QR" + AM_QHELLO_ -> "QH" AM_QSWITCH_ -> "QS" smpP = A.anyChar >>= \case @@ -462,6 +464,7 @@ instance Encoding AgentMessageType where 'N' -> pure AM_QNEW_ 'K' -> pure AM_QKEYS_ 'R' -> pure AM_QREADY_ + 'H' -> pure AM_QHELLO_ 'S' -> pure AM_QSWITCH_ _ -> fail "bad AgentMessageType" _ -> fail "bad AgentMessageType" @@ -482,6 +485,7 @@ agentMessageType = \case QNEW {} -> AM_QNEW_ QKEYS {} -> AM_QKEYS_ QREADY {} -> AM_QREADY_ + QHELLO -> AM_QHELLO_ QSWITCH {} -> AM_QSWITCH_ data APrivHeader = APrivHeader @@ -504,6 +508,7 @@ data AMsgType | QNEW_ | QKEYS_ | QREADY_ + | QHELLO_ | QSWITCH_ deriving (Eq) @@ -515,17 +520,19 @@ instance Encoding AMsgType where QNEW_ -> "QN" QKEYS_ -> "QK" QREADY_ -> "QR" + QHELLO_ -> "QH" QSWITCH_ -> "QS" smpP = - smpP >>= \case + A.anyChar >>= \case 'H' -> pure HELLO_ 'R' -> pure REPLY_ 'M' -> pure A_MSG_ 'Q' -> - smpP >>= \case + A.anyChar >>= \case 'N' -> pure QNEW_ 'K' -> pure QKEYS_ 'R' -> pure QREADY_ + 'H' -> pure QHELLO_ 'S' -> pure QSWITCH_ _ -> fail "bad AMsgType" _ -> fail "bad AMsgType" @@ -542,11 +549,13 @@ data AMessage A_MSG MsgBody | -- instruct sender to switch the queue to another QNEW (SMPServer, SMP.SenderId) SMPQueueInfo - | -- send to the recipient server key to secure the new queue and DH key to agree e2e encryption + | -- send server key and queue e2e DH key to the recipient QKEYS SndPublicVerifyKey SMPQueueInfo - | -- inform the sender that the queue is ready to use + | -- inform the sender that the queue is ready to use - sender sends QHELLO to it QREADY (SMPServer, SMP.SenderId) - | -- inform the sender that the queue is ready to use + | -- the first message sent by the sender to the new queue + QHELLO + | -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO QSWITCH (SMPServer, SMP.SenderId) deriving (Show) @@ -558,6 +567,7 @@ instance Encoding AMessage where QNEW addr qInfo -> smpEncode (QNEW_, addr, qInfo) QKEYS qInfo sKey -> smpEncode (QKEYS_, qInfo, sKey) QREADY addr -> smpEncode (QREADY_, addr) + QHELLO -> smpEncode QHELLO_ QSWITCH addr -> smpEncode (QSWITCH_, addr) smpP = smpP @@ -568,6 +578,7 @@ instance Encoding AMessage where QNEW_ -> QNEW <$> smpP <*> smpP QKEYS_ -> QKEYS <$> smpP <*> smpP QREADY_ -> QREADY <$> smpP + QHELLO_ -> pure QHELLO QSWITCH_ -> QSWITCH <$> smpP instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 1d4ccf527..abfc9ea6a 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -171,34 +171,34 @@ data ConnData = ConnData } deriving (Eq, Show) -data RcvQueueAction = RQACreateNewQueue | RQASecureNewQueue | RQASuspendQueue | RQADeleteQueue +data RcvQueueAction + = RQACreateNextQueue + | RQASecureNextQueue + | RQASuspendCurrQueue + | RQADeleteCurrQueue deriving (Eq, Show) instance TextEncoding RcvQueueAction where textEncode = \case - RQACreateNewQueue -> "create" - RQASecureNewQueue -> "secure" - RQASuspendQueue -> "suspend" - RQADeleteQueue -> "delete" + RQACreateNextQueue -> "create" + RQASecureNextQueue -> "secure" + RQASuspendCurrQueue -> "suspend" + RQADeleteCurrQueue -> "delete" textDecode = \case - "create" -> Just RQACreateNewQueue - "secure" -> Just RQASecureNewQueue - "suspend" -> Just RQASuspendQueue - "delete" -> Just RQADeleteQueue + "create" -> Just RQACreateNextQueue + "secure" -> Just RQASecureNextQueue + "suspend" -> Just RQASuspendCurrQueue + "delete" -> Just RQADeleteCurrQueue _ -> Nothing -data SndQueueAction = SQASendKeys | SQASendHello | SQASwitchQueue +data SndQueueAction = SQASwitchQueue deriving (Eq, Show) instance TextEncoding SndQueueAction where textEncode = \case - SQASendKeys -> "send_keys" - SQASendHello -> "send_hello" - SQASwitchQueue -> "switch_queue" + SQASwitchQueue -> "switch" textDecode = \case - "send_keys" -> Just SQASendKeys - "send_hello" -> Just SQASendHello - "switch_queue" -> Just SQASwitchQueue + "switch" -> Just SQASwitchQueue _ -> Nothing -- * Confirmation types diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index df07629bd..9fe8da42e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -37,6 +37,7 @@ module Simplex.Messaging.Agent.Store.SQLite setSndQueueStatus, getRcvQueue, setRcvQueueNtfCreds, + getNextRcvQueue, -- Confirmations createConfirmation, acceptConfirmation, @@ -105,6 +106,7 @@ import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import Data.Char (toLower) import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (find, foldl') import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.Map.Strict as M @@ -375,6 +377,11 @@ setRcvQueueNtfCreds db connId clientNtfCreds = Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} -> (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) Nothing -> (Nothing, Nothing, Nothing, Nothing) +getNextRcvQueue :: DB.Connection -> Maybe Int64 -> IO (Maybe RcvQueue) +getNextRcvQueue _db = \case + Just _rqId -> pure Nothing + _ -> pure Nothing + type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version) smpConfirmation :: SMPConfirmationRow -> SMPConfirmation diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220822_queue_rotation.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220822_queue_rotation.hs index 89e22c5b9..2853d928c 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220822_queue_rotation.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220822_queue_rotation.hs @@ -28,6 +28,9 @@ UPDATE rcv_queues SET created_at = '1970-01-01 00:00:00'; ALTER TABLE rcv_queues ADD COLUMN updated_at TEXT CHECK (updated_at NOT NULL); UPDATE rcv_queues SET updated_at = '1970-01-01 00:00:00'; +CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(rcv_queue_id); +CREATE UNIQUE INDEX idx_next_rcv_queue_id ON rcv_queues(next_rcv_queue_id); + -- * snd_queues ALTER TABLE snd_queues ADD COLUMN snd_queue_id INTEGER NULL; @@ -47,4 +50,7 @@ UPDATE snd_queues SET created_at = '1970-01-01 00:00:00'; ALTER TABLE snd_queues ADD COLUMN updated_at TEXT CHECK (updated_at NOT NULL); UPDATE snd_queues SET updated_at = '1970-01-01 00:00:00'; + +CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(snd_queue_id); +CREATE UNIQUE INDEX idx_next_snd_queue_id ON snd_queues(next_snd_queue_id); |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 4e39dc129..7bcb3985e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -208,3 +208,7 @@ CREATE TABLE ntf_subscriptions( FOREIGN KEY(ntf_host, ntf_port) REFERENCES ntf_servers ON DELETE RESTRICT ON UPDATE CASCADE ) WITHOUT ROWID; +CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(rcv_queue_id); +CREATE UNIQUE INDEX idx_next_rcv_queue_id ON rcv_queues(next_rcv_queue_id); +CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(snd_queue_id); +CREATE UNIQUE INDEX idx_next_snd_queue_id ON snd_queues(next_snd_queue_id); diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 1b1f08c40..1da8e1c07 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -611,13 +611,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody} - | thVersion == 1 || thVersion == 2 = encrypt msgBody - | otherwise = encrypt $ encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody} + | thVersion == 1 || thVersion == 2 = encrypt msgTs msgFlags msgBody + | otherwise = encrypt (MkSystemTime 0 0) noMsgFlags $ encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody} where - encrypt :: KnownNat i => C.MaxLenBS i -> RcvMessage - encrypt body = + encrypt :: KnownNat i => SystemTime -> MsgFlags -> C.MaxLenBS i -> RcvMessage + encrypt msgTs' msgFlags' body = let encBody = EncRcvMsgBody $ C.cbEncryptMaxLenBS (rcvDhSecret qr) (C.cbNonce msgId) body - in RcvMessage msgId msgTs msgFlags encBody + in RcvMessage msgId msgTs' msgFlags' encBody setDelivered :: Sub -> Message -> STM Bool setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index aaf68a526..3bfc9e05b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -58,8 +58,10 @@ instance MonadQueueStore QueueStore STM where secureQueue QueueStore {queues} rId sKey = withQueue rId queues $ \qVar -> readTVar qVar >>= \q -> case senderKey q of - Just _ -> pure Nothing - _ -> writeTVar qVar q {senderKey = Just sKey} $> Just q + Just k -> pure $ if sKey == k then Just q else Nothing + _ -> + let q' = q {senderKey = Just sKey} + in writeTVar qVar q' $> Just q' addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> STM (Either ErrorType QueueRec) addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index e60f08af7..27af23a0f 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -146,8 +146,10 @@ testCreateSecureV2 _ = (ok2, OK) #== "secures queue" (rId2, rId) #== "same queue ID in response 3" - Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub) - (err4, ERR AUTH) #== "rejects KEY if already secured" + Resp "abcd" _ OK <- signSendRecv h rKey ("abcd", rId, KEY sPub) + (sPub', _) <- C.generateSignatureKeyPair C.SEd448 + Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub') + (err4, ERR AUTH) #== "rejects if secured with different key" Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again") (ok3, OK) #== "accepts signed SEND" @@ -208,8 +210,10 @@ testCreateSecure (ATransport t) = (ok2, OK) #== "secures queue" (rId2, rId) #== "same queue ID in response 3" - Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub) - (err4, ERR AUTH) #== "rejects KEY if already secured" + Resp "abcd" _ OK <- signSendRecv h rKey ("abcd", rId, KEY sPub) + (sPub', _) <- C.generateSignatureKeyPair C.SEd448 + Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub') + (err4, ERR AUTH) #== "rejects if secured with different key" Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again") (ok3, OK) #== "accepts signed SEND"