mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 17:27:23 +00:00
agent function stubs for queue rotation
This commit is contained in:
@@ -22,7 +22,9 @@ Additional agent messages required for the protocol:
|
||||
|
||||
`QREADY`: instruct the sender that the new is ready to use with sender's queue address as parameter, encoded as `QR` - sender will send HELLO to the new queue.
|
||||
|
||||
`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving HELLO.
|
||||
`QHELLO`: sender sends to the new queue to confirm it's working, encoded as `QH`
|
||||
|
||||
`QSWITCH`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QR` - sent after receiving `QHELLO`.
|
||||
|
||||
### Protocol
|
||||
|
||||
@@ -39,7 +41,7 @@ B ->> R ->> A: QKEYS (R'): sender's key for the new queue (to avoid the race of
|
||||
B ->> R ->> A: continue sending new messages to the old queue
|
||||
A ->> R': secure queue
|
||||
A ->> S ->> B: QREADY (R'): instruction to use new queue
|
||||
B ->> R' ->> A: HELLO: to validate that the sender can send messages to the new queue before switching to it
|
||||
B ->> R' ->> A: QHELLO: to validate that the sender can send messages to the new queue before switching to it
|
||||
A ->> S ->> B: QSWITCH (R'): instruction to start using the new queue
|
||||
B ->> R' ->> A: the first message received to the new queue before the old one is drained and deleted should not be processed, it should be stored in the agent memory (and not acknowledged) and only processed once the old queue is drained.
|
||||
A ->> R: suspend queue, receive all messages
|
||||
|
||||
@@ -109,7 +109,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
|
||||
import Simplex.Messaging.Notifications.Types
|
||||
import Simplex.Messaging.Parsers (parse)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta, SndPublicVerifyKey)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util
|
||||
@@ -428,11 +428,68 @@ subscribeConnection' c connId =
|
||||
SomeConn _ (RcvConnection _ rq) -> subscribe rq
|
||||
SomeConn _ (ContactConnection _ rq) -> subscribe rq
|
||||
where
|
||||
-- TODO sndQueueAction?
|
||||
subscribe :: RcvQueue -> m ()
|
||||
subscribe rq = do
|
||||
subscribeQueue c rq connId
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
|
||||
doRcvQueueAction c rq
|
||||
|
||||
-- TODO expire actions
|
||||
doRcvQueueAction :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
doRcvQueueAction c rq@RcvQueue {rcvQueueAction} = forM_ rcvQueueAction $ \(a, _ts) -> case a of
|
||||
RQACreateNextQueue -> createNextRcvQueue c rq
|
||||
RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue
|
||||
RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue
|
||||
RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue
|
||||
|
||||
createNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
createNextRcvQueue c rq = do
|
||||
_nextRq_ <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
|
||||
-- unless new rcv queue exists
|
||||
-- then newRcvQueue
|
||||
-- store to the database
|
||||
-- enqueue QNEW message
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
|
||||
secureNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
secureNextRcvQueue _c _mainRq _nextRq = do
|
||||
-- if not yet secured, secure new queue (it can be repeated with the same key)
|
||||
-- set queue status to Secured
|
||||
-- Enqueue QREADY message
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
|
||||
suspendCurrRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
suspendCurrRcvQueue c currRq nextRq = do
|
||||
-- Suspend curr queue
|
||||
-- if 0 messages left:
|
||||
--
|
||||
currRcvQueueDrained c currRq nextRq
|
||||
|
||||
currRcvQueueDrained :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
currRcvQueueDrained c currRq nextRq = do
|
||||
-- old queue status = Disabled
|
||||
-- rcv_queue_action = RQADeleteQueue
|
||||
--
|
||||
deleteCurrRcvQueue c currRq nextRq
|
||||
|
||||
deleteCurrRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
deleteCurrRcvQueue _c _currRq _nextRq = do
|
||||
-- delete old queue
|
||||
-- make a new queue a main one
|
||||
-- get message from a new queue storage and process it (possibly send to the processing queue)
|
||||
pure ()
|
||||
|
||||
withNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> (AgentClient -> RcvQueue -> RcvQueue -> m ()) -> m ()
|
||||
withNextRcvQueue c rq action = do
|
||||
withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
|
||||
Just nextRq -> action c rq nextRq
|
||||
_ -> do
|
||||
-- notify agent internal error
|
||||
pure ()
|
||||
|
||||
subscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
|
||||
subscribeConnections' _ [] = pure M.empty
|
||||
@@ -450,6 +507,7 @@ subscribeConnections' c connIds = do
|
||||
when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs
|
||||
let rs = M.unions $ errs' : sndRs : rcvRs
|
||||
notifyResultError rs
|
||||
void . forkIO . forM_ rcvQs $ doRcvQueueAction c . fst
|
||||
pure rs
|
||||
where
|
||||
rcvOrSndQueue :: SomeConn -> Either (SndQueue, ConnData) (RcvQueue, ConnData)
|
||||
@@ -643,8 +701,13 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
AM_REPLY_ -> notifyDel msgId $ ERR e
|
||||
AM_A_MSG_ -> notifyDel msgId $ MERR mId e
|
||||
AM_QNEW_ -> pure ()
|
||||
AM_QKEYS_ -> pure ()
|
||||
AM_QKEYS_ -> do
|
||||
-- TODO new send queue status = Confirmed
|
||||
pure ()
|
||||
AM_QREADY_ -> pure ()
|
||||
AM_QHELLO_ -> do
|
||||
-- cancel switching, delete new send queue
|
||||
pure ()
|
||||
AM_QSWITCH_ -> pure ()
|
||||
_
|
||||
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
|
||||
@@ -671,10 +734,6 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
case rq_ of
|
||||
-- party initiating connection (in v1)
|
||||
Just RcvQueue {status} ->
|
||||
-- it is unclear why subscribeQueue was needed here,
|
||||
-- message delivery can only be enabled for queues that were created in the current session or subscribed
|
||||
-- subscribeQueue c rq connId
|
||||
--
|
||||
-- If initiating party were to send CON to the user without waiting for reply HELLO (to reduce handshake time),
|
||||
-- it would lead to the non-deterministic internal ID of the first sent message, at to some other race conditions,
|
||||
-- because it can be sent before HELLO is received
|
||||
@@ -688,6 +747,10 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
qInfo <- createReplyQueue c cData sq
|
||||
void . enqueueMessage c cData sq SMP.noMsgFlags $ REPLY [qInfo]
|
||||
AM_A_MSG_ -> notify $ SENT mId
|
||||
AM_QHELLO_ -> do
|
||||
-- withStore' c $ \db -> setSndQueueStatus db sq Active
|
||||
-- what else should happen here?
|
||||
pure ()
|
||||
_ -> pure ()
|
||||
delMsg msgId
|
||||
where
|
||||
@@ -725,22 +788,13 @@ ackMessage' c connId msgId = do
|
||||
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
switchConnection' c connId =
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
SomeConn _ (DuplexConnection _ _rq _) -> do
|
||||
SomeConn _ (DuplexConnection _ rq _) -> do
|
||||
-- rcv_queue_action = RQACreateNewQueue
|
||||
-- createNewRcvQueue rq
|
||||
createNextRcvQueue c rq
|
||||
pure ()
|
||||
SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
createNewRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
createNewRcvQueue _c _rq = do
|
||||
-- unless new rcv queue exists
|
||||
-- then newRcvQueue
|
||||
-- store to the database
|
||||
-- enqueueSwitchMessage
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
|
||||
-- | Suspend SMP agent connection (OFF command) in Reader monad
|
||||
suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m Word16
|
||||
suspendConnection' c connId =
|
||||
@@ -1050,16 +1104,17 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
(SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) ->
|
||||
tryError agentClientMsg >>= \case
|
||||
Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of
|
||||
HELLO -> helloMsg >> ack >> withStore' c (\db -> deleteMsg db connId msgId)
|
||||
REPLY cReq -> replyMsg cReq >> ack >> withStore' c (\db -> deleteMsg db connId msgId)
|
||||
HELLO -> helloMsg >> ackDelete msgId
|
||||
REPLY cReq -> replyMsg cReq >> ackDelete msgId
|
||||
-- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command
|
||||
A_MSG body -> do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
QNEW _ _ -> pure ()
|
||||
QKEYS _ _ -> pure ()
|
||||
QREADY _ -> pure ()
|
||||
QSWITCH _ -> pure ()
|
||||
QNEW addr qInfo -> rqNewMsg addr qInfo >> ackDelete msgId
|
||||
QKEYS sKey qInfo -> rqKeys sKey qInfo $ ackDelete msgId
|
||||
QREADY addr -> rqReady addr >> ackDelete msgId
|
||||
QHELLO -> rqHello $ ackDelete msgId
|
||||
QSWITCH addr -> rqSwitch addr >> ackDelete msgId
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
@@ -1101,6 +1156,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
sendAck c rq srvMsgId `catchError` \case
|
||||
SMP SMP.NO_MSG -> pure ()
|
||||
e -> throwError e
|
||||
ackDelete :: InternalId -> m ()
|
||||
ackDelete msgId = ack >> withStore' c (\db -> deleteMsg db connId msgId)
|
||||
handleNotifyAck :: m () -> m ()
|
||||
handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack
|
||||
SMP.END ->
|
||||
@@ -1116,6 +1173,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
| otherwise -> ignored
|
||||
_ -> ignored
|
||||
ignored = pure "END from disconnected client - ignored"
|
||||
LEN 0 -> do
|
||||
-- load nextRq
|
||||
-- currRcvQueueDrained c rq nextRq
|
||||
pure ()
|
||||
_ -> do
|
||||
logServer "<--" c srv rId $ "unexpected: " <> bshow cmd
|
||||
notify . ERR $ BROKER UNEXPECTED
|
||||
@@ -1218,6 +1279,54 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
connectReplyQueues c cData ownConnInfo smpQueues `catchError` (notify . ERR)
|
||||
_ -> prohibited
|
||||
|
||||
-- processed by queue sender
|
||||
rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueInfo -> m ()
|
||||
rqNewMsg _addr _qInfo = do
|
||||
-- generate sender and DH keys
|
||||
-- store new send queue, update current send queue with queue ID of the new queue
|
||||
-- Enqueue QKEYS message to the main rcv queue
|
||||
pure ()
|
||||
|
||||
-- processed by queue recipient
|
||||
rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m ()
|
||||
rqKeys _sKey _qInfo ackDelete = do
|
||||
-- store sender keys
|
||||
-- new rcv queue status = Confirmed
|
||||
-- old rcv_queue_action = RQASecureNewQueue
|
||||
-- load nextRq - this and all above can be one store function, at least one transaction
|
||||
--
|
||||
ackDelete
|
||||
-- secureNextRcvQueue c rq nextRq
|
||||
pure ()
|
||||
|
||||
-- processed by queue sender
|
||||
rqReady :: (SMPServer, SMP.SenderId) -> m ()
|
||||
rqReady _addr = do
|
||||
-- Enqueue QHELLO message to the new queue
|
||||
pure ()
|
||||
|
||||
-- processed by queue recipient, received from the new queue
|
||||
rqHello :: m () -> m ()
|
||||
rqHello ackDelete = do
|
||||
-- validate it's the next queue, or send error to the client
|
||||
-- Enqueue QSWITCH message to the sender
|
||||
-- snd_switch_action = RQASuspendCurrQueue
|
||||
-- new queue status = Active
|
||||
-- currRq <- load current queue
|
||||
--
|
||||
ackDelete
|
||||
--
|
||||
-- suspendCurrRcvQueue currRq rq
|
||||
pure ()
|
||||
|
||||
-- processed by queue sender
|
||||
rqSwitch :: (SMPServer, SMP.SenderId) -> m ()
|
||||
rqSwitch _addr = do
|
||||
-- make new queue active
|
||||
-- delete old queue from the database
|
||||
-- update unsent messages? or just restart message deliveries?
|
||||
pure ()
|
||||
|
||||
smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m ()
|
||||
smpInvitation connReq@(CRInvitationUri crData _) cInfo = do
|
||||
logServer "<--" c srv rId "MSG <KEY>"
|
||||
|
||||
@@ -436,6 +436,7 @@ data AgentMessageType
|
||||
| AM_QNEW_
|
||||
| AM_QKEYS_
|
||||
| AM_QREADY_
|
||||
| AM_QHELLO_
|
||||
| AM_QSWITCH_
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -449,6 +450,7 @@ instance Encoding AgentMessageType where
|
||||
AM_QNEW_ -> "QN"
|
||||
AM_QKEYS_ -> "QK"
|
||||
AM_QREADY_ -> "QR"
|
||||
AM_QHELLO_ -> "QH"
|
||||
AM_QSWITCH_ -> "QS"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
@@ -462,6 +464,7 @@ instance Encoding AgentMessageType where
|
||||
'N' -> pure AM_QNEW_
|
||||
'K' -> pure AM_QKEYS_
|
||||
'R' -> pure AM_QREADY_
|
||||
'H' -> pure AM_QHELLO_
|
||||
'S' -> pure AM_QSWITCH_
|
||||
_ -> fail "bad AgentMessageType"
|
||||
_ -> fail "bad AgentMessageType"
|
||||
@@ -482,6 +485,7 @@ agentMessageType = \case
|
||||
QNEW {} -> AM_QNEW_
|
||||
QKEYS {} -> AM_QKEYS_
|
||||
QREADY {} -> AM_QREADY_
|
||||
QHELLO -> AM_QHELLO_
|
||||
QSWITCH {} -> AM_QSWITCH_
|
||||
|
||||
data APrivHeader = APrivHeader
|
||||
@@ -504,6 +508,7 @@ data AMsgType
|
||||
| QNEW_
|
||||
| QKEYS_
|
||||
| QREADY_
|
||||
| QHELLO_
|
||||
| QSWITCH_
|
||||
deriving (Eq)
|
||||
|
||||
@@ -515,17 +520,19 @@ instance Encoding AMsgType where
|
||||
QNEW_ -> "QN"
|
||||
QKEYS_ -> "QK"
|
||||
QREADY_ -> "QR"
|
||||
QHELLO_ -> "QH"
|
||||
QSWITCH_ -> "QS"
|
||||
smpP =
|
||||
smpP >>= \case
|
||||
A.anyChar >>= \case
|
||||
'H' -> pure HELLO_
|
||||
'R' -> pure REPLY_
|
||||
'M' -> pure A_MSG_
|
||||
'Q' ->
|
||||
smpP >>= \case
|
||||
A.anyChar >>= \case
|
||||
'N' -> pure QNEW_
|
||||
'K' -> pure QKEYS_
|
||||
'R' -> pure QREADY_
|
||||
'H' -> pure QHELLO_
|
||||
'S' -> pure QSWITCH_
|
||||
_ -> fail "bad AMsgType"
|
||||
_ -> fail "bad AMsgType"
|
||||
@@ -542,11 +549,13 @@ data AMessage
|
||||
A_MSG MsgBody
|
||||
| -- instruct sender to switch the queue to another
|
||||
QNEW (SMPServer, SMP.SenderId) SMPQueueInfo
|
||||
| -- send to the recipient server key to secure the new queue and DH key to agree e2e encryption
|
||||
| -- send server key and queue e2e DH key to the recipient
|
||||
QKEYS SndPublicVerifyKey SMPQueueInfo
|
||||
| -- inform the sender that the queue is ready to use
|
||||
| -- inform the sender that the queue is ready to use - sender sends QHELLO to it
|
||||
QREADY (SMPServer, SMP.SenderId)
|
||||
| -- inform the sender that the queue is ready to use
|
||||
| -- the first message sent by the sender to the new queue
|
||||
QHELLO
|
||||
| -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO
|
||||
QSWITCH (SMPServer, SMP.SenderId)
|
||||
deriving (Show)
|
||||
|
||||
@@ -558,6 +567,7 @@ instance Encoding AMessage where
|
||||
QNEW addr qInfo -> smpEncode (QNEW_, addr, qInfo)
|
||||
QKEYS qInfo sKey -> smpEncode (QKEYS_, qInfo, sKey)
|
||||
QREADY addr -> smpEncode (QREADY_, addr)
|
||||
QHELLO -> smpEncode QHELLO_
|
||||
QSWITCH addr -> smpEncode (QSWITCH_, addr)
|
||||
smpP =
|
||||
smpP
|
||||
@@ -568,6 +578,7 @@ instance Encoding AMessage where
|
||||
QNEW_ -> QNEW <$> smpP <*> smpP
|
||||
QKEYS_ -> QKEYS <$> smpP <*> smpP
|
||||
QREADY_ -> QREADY <$> smpP
|
||||
QHELLO_ -> pure QHELLO
|
||||
QSWITCH_ -> QSWITCH <$> smpP
|
||||
|
||||
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
|
||||
|
||||
@@ -171,34 +171,34 @@ data ConnData = ConnData
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
data RcvQueueAction = RQACreateNewQueue | RQASecureNewQueue | RQASuspendQueue | RQADeleteQueue
|
||||
data RcvQueueAction
|
||||
= RQACreateNextQueue
|
||||
| RQASecureNextQueue
|
||||
| RQASuspendCurrQueue
|
||||
| RQADeleteCurrQueue
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance TextEncoding RcvQueueAction where
|
||||
textEncode = \case
|
||||
RQACreateNewQueue -> "create"
|
||||
RQASecureNewQueue -> "secure"
|
||||
RQASuspendQueue -> "suspend"
|
||||
RQADeleteQueue -> "delete"
|
||||
RQACreateNextQueue -> "create"
|
||||
RQASecureNextQueue -> "secure"
|
||||
RQASuspendCurrQueue -> "suspend"
|
||||
RQADeleteCurrQueue -> "delete"
|
||||
textDecode = \case
|
||||
"create" -> Just RQACreateNewQueue
|
||||
"secure" -> Just RQASecureNewQueue
|
||||
"suspend" -> Just RQASuspendQueue
|
||||
"delete" -> Just RQADeleteQueue
|
||||
"create" -> Just RQACreateNextQueue
|
||||
"secure" -> Just RQASecureNextQueue
|
||||
"suspend" -> Just RQASuspendCurrQueue
|
||||
"delete" -> Just RQADeleteCurrQueue
|
||||
_ -> Nothing
|
||||
|
||||
data SndQueueAction = SQASendKeys | SQASendHello | SQASwitchQueue
|
||||
data SndQueueAction = SQASwitchQueue
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance TextEncoding SndQueueAction where
|
||||
textEncode = \case
|
||||
SQASendKeys -> "send_keys"
|
||||
SQASendHello -> "send_hello"
|
||||
SQASwitchQueue -> "switch_queue"
|
||||
SQASwitchQueue -> "switch"
|
||||
textDecode = \case
|
||||
"send_keys" -> Just SQASendKeys
|
||||
"send_hello" -> Just SQASendHello
|
||||
"switch_queue" -> Just SQASwitchQueue
|
||||
"switch" -> Just SQASwitchQueue
|
||||
_ -> Nothing
|
||||
|
||||
-- * Confirmation types
|
||||
|
||||
@@ -37,6 +37,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
setSndQueueStatus,
|
||||
getRcvQueue,
|
||||
setRcvQueueNtfCreds,
|
||||
getNextRcvQueue,
|
||||
-- Confirmations
|
||||
createConfirmation,
|
||||
acceptConfirmation,
|
||||
@@ -105,6 +106,7 @@ import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import Data.Char (toLower)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (find, foldl')
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Data.Map.Strict as M
|
||||
@@ -375,6 +377,11 @@ setRcvQueueNtfCreds db connId clientNtfCreds =
|
||||
Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} -> (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret)
|
||||
Nothing -> (Nothing, Nothing, Nothing, Nothing)
|
||||
|
||||
getNextRcvQueue :: DB.Connection -> Maybe Int64 -> IO (Maybe RcvQueue)
|
||||
getNextRcvQueue _db = \case
|
||||
Just _rqId -> pure Nothing
|
||||
_ -> pure Nothing
|
||||
|
||||
type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version)
|
||||
|
||||
smpConfirmation :: SMPConfirmationRow -> SMPConfirmation
|
||||
|
||||
@@ -28,6 +28,9 @@ UPDATE rcv_queues SET created_at = '1970-01-01 00:00:00';
|
||||
ALTER TABLE rcv_queues ADD COLUMN updated_at TEXT CHECK (updated_at NOT NULL);
|
||||
UPDATE rcv_queues SET updated_at = '1970-01-01 00:00:00';
|
||||
|
||||
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(rcv_queue_id);
|
||||
CREATE UNIQUE INDEX idx_next_rcv_queue_id ON rcv_queues(next_rcv_queue_id);
|
||||
|
||||
-- * snd_queues
|
||||
|
||||
ALTER TABLE snd_queues ADD COLUMN snd_queue_id INTEGER NULL;
|
||||
@@ -47,4 +50,7 @@ UPDATE snd_queues SET created_at = '1970-01-01 00:00:00';
|
||||
|
||||
ALTER TABLE snd_queues ADD COLUMN updated_at TEXT CHECK (updated_at NOT NULL);
|
||||
UPDATE snd_queues SET updated_at = '1970-01-01 00:00:00';
|
||||
|
||||
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(snd_queue_id);
|
||||
CREATE UNIQUE INDEX idx_next_snd_queue_id ON snd_queues(next_snd_queue_id);
|
||||
|]
|
||||
|
||||
@@ -208,3 +208,7 @@ CREATE TABLE ntf_subscriptions(
|
||||
FOREIGN KEY(ntf_host, ntf_port) REFERENCES ntf_servers
|
||||
ON DELETE RESTRICT ON UPDATE CASCADE
|
||||
) WITHOUT ROWID;
|
||||
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(rcv_queue_id);
|
||||
CREATE UNIQUE INDEX idx_next_rcv_queue_id ON rcv_queues(next_rcv_queue_id);
|
||||
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(snd_queue_id);
|
||||
CREATE UNIQUE INDEX idx_next_snd_queue_id ON snd_queues(next_snd_queue_id);
|
||||
|
||||
@@ -611,13 +611,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
|
||||
encryptMsg :: QueueRec -> Message -> RcvMessage
|
||||
encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody}
|
||||
| thVersion == 1 || thVersion == 2 = encrypt msgBody
|
||||
| otherwise = encrypt $ encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody}
|
||||
| thVersion == 1 || thVersion == 2 = encrypt msgTs msgFlags msgBody
|
||||
| otherwise = encrypt (MkSystemTime 0 0) noMsgFlags $ encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody}
|
||||
where
|
||||
encrypt :: KnownNat i => C.MaxLenBS i -> RcvMessage
|
||||
encrypt body =
|
||||
encrypt :: KnownNat i => SystemTime -> MsgFlags -> C.MaxLenBS i -> RcvMessage
|
||||
encrypt msgTs' msgFlags' body =
|
||||
let encBody = EncRcvMsgBody $ C.cbEncryptMaxLenBS (rcvDhSecret qr) (C.cbNonce msgId) body
|
||||
in RcvMessage msgId msgTs msgFlags encBody
|
||||
in RcvMessage msgId msgTs' msgFlags' encBody
|
||||
|
||||
setDelivered :: Sub -> Message -> STM Bool
|
||||
setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId
|
||||
|
||||
@@ -58,8 +58,10 @@ instance MonadQueueStore QueueStore STM where
|
||||
secureQueue QueueStore {queues} rId sKey =
|
||||
withQueue rId queues $ \qVar ->
|
||||
readTVar qVar >>= \q -> case senderKey q of
|
||||
Just _ -> pure Nothing
|
||||
_ -> writeTVar qVar q {senderKey = Just sKey} $> Just q
|
||||
Just k -> pure $ if sKey == k then Just q else Nothing
|
||||
_ ->
|
||||
let q' = q {senderKey = Just sKey}
|
||||
in writeTVar qVar q' $> Just q'
|
||||
|
||||
addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> STM (Either ErrorType QueueRec)
|
||||
addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do
|
||||
|
||||
@@ -146,8 +146,10 @@ testCreateSecureV2 _ =
|
||||
(ok2, OK) #== "secures queue"
|
||||
(rId2, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub)
|
||||
(err4, ERR AUTH) #== "rejects KEY if already secured"
|
||||
Resp "abcd" _ OK <- signSendRecv h rKey ("abcd", rId, KEY sPub)
|
||||
(sPub', _) <- C.generateSignatureKeyPair C.SEd448
|
||||
Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub')
|
||||
(err4, ERR AUTH) #== "rejects if secured with different key"
|
||||
|
||||
Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again")
|
||||
(ok3, OK) #== "accepts signed SEND"
|
||||
@@ -208,8 +210,10 @@ testCreateSecure (ATransport t) =
|
||||
(ok2, OK) #== "secures queue"
|
||||
(rId2, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub)
|
||||
(err4, ERR AUTH) #== "rejects KEY if already secured"
|
||||
Resp "abcd" _ OK <- signSendRecv h rKey ("abcd", rId, KEY sPub)
|
||||
(sPub', _) <- C.generateSignatureKeyPair C.SEd448
|
||||
Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub')
|
||||
(err4, ERR AUTH) #== "rejects if secured with different key"
|
||||
|
||||
Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again")
|
||||
(ok3, OK) #== "accepts signed SEND"
|
||||
|
||||
Reference in New Issue
Block a user