mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 14:14:54 +00:00
rqNewMsg
This commit is contained in:
@@ -419,6 +419,7 @@ subscribeConnection' c connId =
|
||||
SomeConn _ (DuplexConnection cData rq sq) -> do
|
||||
resumeMsgDelivery c cData sq
|
||||
subscribe rq
|
||||
doRcvQueueAction c cData rq sq
|
||||
SomeConn _ (SndConnection cData sq) -> do
|
||||
resumeMsgDelivery c cData sq
|
||||
case status (sq :: SndQueue) of
|
||||
@@ -434,25 +435,31 @@ subscribeConnection' c connId =
|
||||
subscribeQueue c rq connId
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
|
||||
doRcvQueueAction c rq
|
||||
|
||||
-- TODO expire actions
|
||||
doRcvQueueAction :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
doRcvQueueAction c rq@RcvQueue {rcvQueueAction} = forM_ rcvQueueAction $ \(a, _ts) -> case a of
|
||||
RQACreateNextQueue -> createNextRcvQueue c rq
|
||||
RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue
|
||||
RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue
|
||||
RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue
|
||||
doRcvQueueAction :: AgentMonad m => AgentClient -> ConnData -> RcvQueue -> SndQueue -> m ()
|
||||
doRcvQueueAction c cData rq@RcvQueue {rcvQueueAction} sq =
|
||||
forM_ rcvQueueAction $ \(a, _ts) -> case a of
|
||||
RQACreateNextQueue -> createNextRcvQueue c cData rq sq
|
||||
RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue
|
||||
RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue
|
||||
RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue
|
||||
|
||||
createNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
|
||||
createNextRcvQueue c rq = do
|
||||
_nextRq_ <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
|
||||
-- unless new rcv queue exists
|
||||
-- then newRcvQueue
|
||||
-- store to the database
|
||||
-- enqueue QNEW message
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
createNextRcvQueue :: AgentMonad m => AgentClient -> ConnData -> RcvQueue -> SndQueue -> m ()
|
||||
createNextRcvQueue c cData rq@RcvQueue {server, sndId} sq = do
|
||||
clientVRange <- asks $ smpClientVRange . config
|
||||
nextQueueUri <-
|
||||
withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
|
||||
Just RcvQueue {server = smpServer, sndId = senderId, e2ePrivKey} -> do
|
||||
let queueAddress = SMPQueueAddress {smpServer, senderId, dhPublicKey = C.publicKey e2ePrivKey}
|
||||
pure SMPQueueUri {clientVRange, queueAddress}
|
||||
_ -> do
|
||||
srv <- getSMPServer c
|
||||
(rq', qUri) <- newRcvQueue c srv clientVRange
|
||||
withStore' c $ \db -> dbCreateNextRcvQueue db rq rq'
|
||||
pure qUri
|
||||
void $ enqueueMessage c cData sq SMP.noMsgFlags QNEW {currentAddress = (server, sndId), nextQueueUri}
|
||||
withStore' c $ \db -> setRcvQueueAction db rq Nothing
|
||||
|
||||
secureNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
secureNextRcvQueue _c _mainRq _nextRq = do
|
||||
@@ -507,7 +514,9 @@ subscribeConnections' c connIds = do
|
||||
when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs
|
||||
let rs = M.unions $ errs' : sndRs : rcvRs
|
||||
notifyResultError rs
|
||||
void . forkIO . forM_ rcvQs $ doRcvQueueAction c . fst
|
||||
void . forkIO . forM_ cs $ \case
|
||||
SomeConn _ (DuplexConnection cData rq sq) -> doRcvQueueAction c cData rq sq
|
||||
_ -> pure ()
|
||||
pure rs
|
||||
where
|
||||
rcvOrSndQueue :: SomeConn -> Either (SndQueue, ConnData) (RcvQueue, ConnData)
|
||||
@@ -788,10 +797,10 @@ ackMessage' c connId msgId = do
|
||||
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
switchConnection' c connId =
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
SomeConn _ (DuplexConnection _ rq _) -> do
|
||||
-- rcv_queue_action = RQACreateNewQueue
|
||||
createNextRcvQueue c rq
|
||||
pure ()
|
||||
SomeConn _ (DuplexConnection cData rq sq) -> do
|
||||
-- TODO check that rotation is possible (whether the current server supports it)
|
||||
withStore' c $ \db -> setRcvQueueAction db rq $ Just RQACreateNextQueue
|
||||
createNextRcvQueue c cData rq sq
|
||||
SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
@@ -1110,8 +1119,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
A_MSG body -> do
|
||||
logServer "<--" c srv rId "MSG <MSG>"
|
||||
notify $ MSG msgMeta msgFlags body
|
||||
QNEW addr qInfo -> rqNewMsg addr qInfo >> ackDelete msgId
|
||||
QKEYS sKey qInfo -> rqKeys sKey qInfo $ ackDelete msgId
|
||||
QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId
|
||||
QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId
|
||||
QREADY addr -> rqReady addr >> ackDelete msgId
|
||||
QHELLO -> rqHello $ ackDelete msgId
|
||||
QSWITCH addr -> rqSwitch addr >> ackDelete msgId
|
||||
@@ -1280,16 +1289,31 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
_ -> prohibited
|
||||
|
||||
-- processed by queue sender
|
||||
rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueInfo -> m ()
|
||||
rqNewMsg _addr _qInfo = do
|
||||
-- generate sender and DH keys
|
||||
-- store new send queue, update current send queue with queue ID of the new queue
|
||||
-- Enqueue QKEYS message to the main rcv queue
|
||||
pure ()
|
||||
rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueUri -> m ()
|
||||
rqNewMsg _currAddr nextQUri = case conn of
|
||||
DuplexConnection _ _ sq -> do
|
||||
clientVRange <- asks $ smpClientVRange . config
|
||||
case (nextQUri `compatibleVersion` clientVRange) of
|
||||
Just qInfo@(Compatible nextQueueInfo) -> do
|
||||
sq'@SndQueue {sndPublicKey} <- newSndQueue qInfo
|
||||
withStore' c $ \db -> dbCreateNextSndQueue db sq sq'
|
||||
case sndPublicKey of
|
||||
Just nextSenderKey ->
|
||||
void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo}
|
||||
-- TODO possibly, notify user that the queue is rotating
|
||||
_ -> do
|
||||
-- TODO notify user: internal error
|
||||
pure ()
|
||||
_ -> do
|
||||
-- TODO notify user: incompatible version
|
||||
pure ()
|
||||
_ -> do
|
||||
-- TODO notify user: message can only be sent to duplex connection
|
||||
pure ()
|
||||
|
||||
-- processed by queue recipient
|
||||
rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m ()
|
||||
rqKeys _sKey _qInfo ackDelete = do
|
||||
rqKeys _sKey _nextQInfo ackDelete = do
|
||||
-- store sender keys
|
||||
-- new rcv queue status = Confirmed
|
||||
-- old rcv_queue_action = RQASecureNewQueue
|
||||
|
||||
@@ -500,7 +500,7 @@ newRcvQueue_ a c srv vRange = do
|
||||
rcvDhSecret = C.dh' rcvPublicDhKey privDhKey,
|
||||
e2ePrivKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = Just sndId,
|
||||
sndId,
|
||||
status = New,
|
||||
rcvQueueAction = Nothing,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
|
||||
@@ -548,15 +548,15 @@ data AMessage
|
||||
| -- | agent envelope for the client message
|
||||
A_MSG MsgBody
|
||||
| -- instruct sender to switch the queue to another
|
||||
QNEW (SMPServer, SMP.SenderId) SMPQueueInfo
|
||||
QNEW {currentAddress :: (SMPServer, SMP.SenderId), nextQueueUri :: SMPQueueUri}
|
||||
| -- send server key and queue e2e DH key to the recipient
|
||||
QKEYS SndPublicVerifyKey SMPQueueInfo
|
||||
QKEYS {nextSenderKey :: SndPublicVerifyKey, nextQueueInfo :: SMPQueueInfo}
|
||||
| -- inform the sender that the queue is ready to use - sender sends QHELLO to it
|
||||
QREADY (SMPServer, SMP.SenderId)
|
||||
QREADY {nextAddress :: (SMPServer, SMP.SenderId)}
|
||||
| -- the first message sent by the sender to the new queue
|
||||
QHELLO
|
||||
| -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO
|
||||
QSWITCH (SMPServer, SMP.SenderId)
|
||||
QSWITCH {nextAddress :: (SMPServer, SMP.SenderId)}
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding AMessage where
|
||||
@@ -564,8 +564,8 @@ instance Encoding AMessage where
|
||||
HELLO -> smpEncode HELLO_
|
||||
REPLY smpQueues -> smpEncode (REPLY_, smpQueues)
|
||||
A_MSG body -> smpEncode (A_MSG_, Tail body)
|
||||
QNEW addr qInfo -> smpEncode (QNEW_, addr, qInfo)
|
||||
QKEYS qInfo sKey -> smpEncode (QKEYS_, qInfo, sKey)
|
||||
QNEW currAddr nextQUri -> smpEncode (QNEW_, currAddr, strEncode nextQUri)
|
||||
QKEYS sKey nextQInfo -> smpEncode (QKEYS_, sKey, nextQInfo)
|
||||
QREADY addr -> smpEncode (QREADY_, addr)
|
||||
QHELLO -> smpEncode QHELLO_
|
||||
QSWITCH addr -> smpEncode (QSWITCH_, addr)
|
||||
@@ -575,7 +575,10 @@ instance Encoding AMessage where
|
||||
HELLO_ -> pure HELLO
|
||||
REPLY_ -> REPLY <$> smpP
|
||||
A_MSG_ -> A_MSG . unTail <$> smpP
|
||||
QNEW_ -> QNEW <$> smpP <*> smpP
|
||||
QNEW_ -> do
|
||||
currentAddress <- smpP
|
||||
nextQueueUri <- strDecode <$?> smpP
|
||||
pure QNEW {currentAddress, nextQueueUri}
|
||||
QKEYS_ -> QKEYS <$> smpP <*> smpP
|
||||
QREADY_ -> QREADY <$> smpP
|
||||
QHELLO_ -> pure QHELLO
|
||||
|
||||
@@ -52,7 +52,7 @@ data RcvQueue = RcvQueue
|
||||
-- | public sender's DH key and agreed shared DH secret for simple per-queue e2e
|
||||
e2eDhSecret :: Maybe C.DhSecretX25519,
|
||||
-- | sender queue ID
|
||||
sndId :: Maybe SMP.SenderId,
|
||||
sndId :: SMP.SenderId,
|
||||
-- | queue status
|
||||
status :: QueueStatus,
|
||||
-- | action to perform, to be done on connection subscription, if it fails and not reset
|
||||
|
||||
@@ -38,6 +38,9 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
getRcvQueue,
|
||||
setRcvQueueNtfCreds,
|
||||
getNextRcvQueue,
|
||||
dbCreateNextRcvQueue,
|
||||
dbCreateNextSndQueue,
|
||||
setRcvQueueAction,
|
||||
-- Confirmations
|
||||
createConfirmation,
|
||||
acceptConfirmation,
|
||||
@@ -405,6 +408,15 @@ getNextRcvQueue db = \case
|
||||
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt}
|
||||
_ -> pure Nothing
|
||||
|
||||
dbCreateNextRcvQueue :: DB.Connection -> RcvQueue -> RcvQueue -> IO ()
|
||||
dbCreateNextRcvQueue _db _rq _nextRq = pure ()
|
||||
|
||||
dbCreateNextSndQueue :: DB.Connection -> SndQueue -> SndQueue -> IO ()
|
||||
dbCreateNextSndQueue _db _sq _nextSq = pure ()
|
||||
|
||||
setRcvQueueAction :: DB.Connection -> RcvQueue -> Maybe RcvQueueAction -> IO ()
|
||||
setRcvQueueAction _db _rq _rqAction_ = pure ()
|
||||
|
||||
type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version)
|
||||
|
||||
smpConfirmation :: SMPConfirmationRow -> SMPConfirmation
|
||||
@@ -1129,7 +1141,7 @@ insertRcvQueue_ db connId RcvQueue {..} = do
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO rcv_queues
|
||||
(rcv_queue_id, host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);
|
||||
(rcv_queue_id, host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|
||||
|]
|
||||
((qId, host server, port server, rcvId, connId) :. (rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (smpClientVersion, createdAt, updatedAt))
|
||||
|
||||
@@ -1142,7 +1154,7 @@ insertSndQueue_ db connId SndQueue {..} = do
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO snd_queues
|
||||
(snd_queue_id, host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?);
|
||||
(snd_queue_id, host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);
|
||||
|]
|
||||
((qId, host server, port server, sndId, connId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (smpClientVersion, createdAt, updatedAt))
|
||||
|
||||
|
||||
@@ -163,7 +163,7 @@ rcvQueue1 =
|
||||
rcvDhSecret = testDhSecret,
|
||||
e2ePrivKey = testPrivDhKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = Just "2345",
|
||||
sndId = "2345",
|
||||
status = New,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
rcvQueueAction = Nothing,
|
||||
@@ -345,7 +345,7 @@ testUpgradeSndConnToDuplex =
|
||||
rcvDhSecret = testDhSecret,
|
||||
e2ePrivKey = testPrivDhKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = Just "4567",
|
||||
sndId = "4567",
|
||||
status = New,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
rcvQueueAction = Nothing,
|
||||
|
||||
Reference in New Issue
Block a user