mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 14:14:54 +00:00
process rotation messages
This commit is contained in:
@@ -410,7 +410,7 @@ rejectContact' c contactConnId invId =
|
||||
processConfirmation :: AgentMonad m => AgentClient -> RcvQueue -> SMPConfirmation -> m ()
|
||||
processConfirmation c rq@RcvQueue {e2ePrivKey, smpClientVersion = v} SMPConfirmation {senderKey, e2ePubKey, smpClientVersion = v'} = do
|
||||
let dhSecret = C.dh' e2ePubKey e2ePrivKey
|
||||
withStore' c $ \db -> setRcvQueueConfirmedE2E db rq dhSecret $ min v v'
|
||||
withStore' c $ \db -> setRcvQueueConfirmedE2E db rq senderKey dhSecret $ min v v'
|
||||
-- TODO if this call to secureQueue fails the connection will not complete
|
||||
-- add secure rcv queue on subscription
|
||||
secureQueue c rq senderKey
|
||||
@@ -423,7 +423,7 @@ subscribeConnection' c connId =
|
||||
SomeConn _ (DuplexConnection cData rq sq) -> do
|
||||
resumeMsgDelivery c cData sq
|
||||
subscribe rq
|
||||
doRcvQueueAction c cData rq sq
|
||||
void . forkIO $ doRcvQueueAction c cData rq sq
|
||||
SomeConn _ (SndConnection cData sq) -> do
|
||||
resumeMsgDelivery c cData sq
|
||||
case status (sq :: SndQueue) of
|
||||
@@ -445,7 +445,7 @@ doRcvQueueAction :: AgentMonad m => AgentClient -> ConnData -> RcvQueue -> SndQu
|
||||
doRcvQueueAction c cData rq@RcvQueue {rcvQueueAction} sq =
|
||||
forM_ rcvQueueAction $ \(a, _ts) -> case a of
|
||||
RQACreateNextQueue -> createNextRcvQueue c cData rq sq
|
||||
RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue
|
||||
RQASecureNextQueue -> withNextRcvQueue c rq $ secureNextRcvQueue cData sq
|
||||
RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue
|
||||
RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue
|
||||
|
||||
@@ -465,13 +465,17 @@ createNextRcvQueue c cData rq@RcvQueue {server, sndId} sq = do
|
||||
void $ enqueueMessage c cData sq SMP.noMsgFlags QNEW {currentAddress = (server, sndId), nextQueueUri}
|
||||
withStore' c $ \db -> setRcvQueueAction db rq Nothing
|
||||
|
||||
secureNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
secureNextRcvQueue _c _mainRq _nextRq = do
|
||||
-- if not yet secured, secure new queue (it can be repeated with the same key)
|
||||
-- set queue status to Secured
|
||||
-- Enqueue QREADY message
|
||||
-- rcv_queue_action = null
|
||||
pure ()
|
||||
secureNextRcvQueue :: AgentMonad m => ConnData -> SndQueue -> AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
secureNextRcvQueue cData sq c rq nextRq@RcvQueue {server, sndId, status, sndPublicKey} = do
|
||||
when (status == Confirmed) $ case sndPublicKey of
|
||||
Just sKey -> do
|
||||
secureQueue c rq sKey
|
||||
withStore' c $ \db -> setRcvQueueStatus db nextRq Secured
|
||||
_ -> do
|
||||
-- notify user: no sender key
|
||||
pure ()
|
||||
void . enqueueMessage c cData sq SMP.noMsgFlags $ QREADY (server, sndId)
|
||||
withStore' c $ \db -> setRcvQueueAction db rq Nothing
|
||||
|
||||
suspendCurrRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m ()
|
||||
suspendCurrRcvQueue c currRq nextRq = do
|
||||
@@ -1094,6 +1098,7 @@ subscriber c@AgentClient {msgQ} = forever $ do
|
||||
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
|
||||
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) =
|
||||
withStore c (\db -> getRcvConn db srv rId) >>= \case
|
||||
-- TODO somehow it should get next queue if the message is to it
|
||||
SomeConn _ conn@(DuplexConnection cData rq _) -> processSMP conn cData rq
|
||||
SomeConn _ conn@(RcvConnection cData rq) -> processSMP conn cData rq
|
||||
SomeConn _ conn@(ContactConnection cData rq) -> processSMP conn cData rq
|
||||
@@ -1300,51 +1305,58 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
-- processed by queue sender
|
||||
rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueUri -> m ()
|
||||
rqNewMsg _currAddr nextQUri = case conn of
|
||||
DuplexConnection _ rq sq -> do
|
||||
DuplexConnection _ _ sq -> do
|
||||
-- TODO check that current address matches
|
||||
clientVRange <- asks $ smpClientVRange . config
|
||||
case (nextQUri `compatibleVersion` clientVRange) of
|
||||
Just qInfo@(Compatible nextQInfo) -> do
|
||||
sq'@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue qInfo
|
||||
withStore' c $ \db -> dbCreateNextSndQueue db sq sq'
|
||||
rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
|
||||
case (sndPublicKey, e2ePubKey) of
|
||||
(Just nextSenderKey, Just dhPublicKey) -> do
|
||||
let qAddr = (queueAddress (nextQInfo :: SMPQueueInfo)) {dhPublicKey}
|
||||
nextQueueInfo = (nextQInfo :: SMPQueueInfo) {queueAddress = qAddr}
|
||||
void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo}
|
||||
rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
|
||||
notify . SWITCH SPStarted $ connectionStats conn rq' (Just sq')
|
||||
_ -> do
|
||||
-- TODO notify user: internal error
|
||||
pure ()
|
||||
_ -> do
|
||||
-- TODO notify user: incompatible version
|
||||
pure ()
|
||||
_ -> do
|
||||
-- TODO notify user: message can only be sent to duplex connection
|
||||
pure ()
|
||||
_ -> throwError $ INTERNAL "absent sender keys"
|
||||
_ -> throwError $ AGENT A_VERSION
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue recipient
|
||||
rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m ()
|
||||
rqKeys _sKey (SMPQueueInfo v SMPQueueAddress {smpServer, senderId, dhPublicKey}) ackDelete = do
|
||||
-- withStore c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
|
||||
-- Just rq'
|
||||
-- | -> do
|
||||
-- | otherwise
|
||||
|
||||
-- store sender keys
|
||||
-- new rcv queue status = Confirmed
|
||||
-- old rcv_queue_action = RQASecureNewQueue
|
||||
-- load nextRq - this and all above can be one store function, at least one transaction
|
||||
--
|
||||
ackDelete
|
||||
-- secureNextRcvQueue c rq nextRq
|
||||
pure ()
|
||||
rqKeys senderKey qInfo ackDelete =
|
||||
case conn of
|
||||
DuplexConnection _ _ sq -> do
|
||||
clientVRange <- asks $ smpClientVRange . config
|
||||
case qInfo `proveCompatible` clientVRange of
|
||||
Just (Compatible (SMPQueueInfo clntVer' SMPQueueAddress {smpServer, senderId, dhPublicKey})) -> do
|
||||
withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
|
||||
Just rq'@RcvQueue {server, sndId, e2ePrivKey = dhPrivKey, smpClientVersion = clntVer}
|
||||
| server == smpServer && sndId == senderId -> do
|
||||
let dhSecret = C.dh' dhPublicKey dhPrivKey
|
||||
withStore' c $ \db -> do
|
||||
setRcvQueueConfirmedE2E db rq' senderKey dhSecret $ min clntVer clntVer'
|
||||
setRcvQueueAction db rq $ Just RQASecureNextQueue
|
||||
ackDelete
|
||||
secureNextRcvQueue cData sq c rq rq'
|
||||
| otherwise -> throwError $ INTERNAL "incorrect queue address"
|
||||
_ -> throwError $ INTERNAL "message can only be sent during rotation"
|
||||
_ -> throwError $ AGENT A_VERSION
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue sender
|
||||
rqReady :: (SMPServer, SMP.SenderId) -> m ()
|
||||
rqReady _addr = do
|
||||
-- Enqueue QHELLO message to the new queue
|
||||
pure ()
|
||||
rqReady (smpServer, senderId) =
|
||||
case conn of
|
||||
DuplexConnection _ _ sq ->
|
||||
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
|
||||
Just sq'@SndQueue {server, sndId}
|
||||
| server == smpServer && sndId == senderId ->
|
||||
void . enqueueMessage c cData sq' SMP.noMsgFlags $ QHELLO
|
||||
| otherwise -> throwError $ INTERNAL "incorrect queue address"
|
||||
_ -> throwError $ INTERNAL "message can only be sent during rotation"
|
||||
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
|
||||
|
||||
-- processed by queue recipient, received from the new queue
|
||||
rqHello :: m () -> m ()
|
||||
|
||||
@@ -501,6 +501,7 @@ newRcvQueue_ a c srv vRange = do
|
||||
e2ePrivKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId,
|
||||
sndPublicKey = Nothing,
|
||||
status = New,
|
||||
rcvQueueAction = Nothing,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
|
||||
@@ -53,6 +53,10 @@ data RcvQueue = RcvQueue
|
||||
e2eDhSecret :: Maybe C.DhSecretX25519,
|
||||
-- | sender queue ID
|
||||
sndId :: SMP.SenderId,
|
||||
-- | public key used by the server to verify sender's transmissions
|
||||
-- it is Maybe as previously it was not saved - old queues may have NULL in it.
|
||||
-- For all new queues it is never cleared.
|
||||
sndPublicKey :: Maybe C.APublicVerifyKey,
|
||||
-- | queue status
|
||||
status :: QueueStatus,
|
||||
-- | action to perform, to be done on connection subscription, if it fails and not reset
|
||||
@@ -85,7 +89,7 @@ data SndQueue = SndQueue
|
||||
-- | sender queue ID
|
||||
sndId :: SMP.SenderId,
|
||||
-- | key pair used by the sender to sign transmissions
|
||||
-- This is Maybe as previously it was not saved - old queues may have NULL in it.
|
||||
-- sndPublicKey is Maybe as previously it was not saved - old queues may have NULL in it.
|
||||
-- For all new queues it is never cleared.
|
||||
sndPublicKey :: Maybe C.APublicVerifyKey,
|
||||
sndPrivateKey :: SndPrivateSignKey,
|
||||
@@ -131,6 +135,13 @@ deriving instance Eq (Connection d)
|
||||
|
||||
deriving instance Show (Connection d)
|
||||
|
||||
connData :: Connection d -> ConnData
|
||||
connData = \case
|
||||
RcvConnection cData _ -> cData
|
||||
SndConnection cData _ -> cData
|
||||
DuplexConnection cData _ _ -> cData
|
||||
ContactConnection cData _ -> cData
|
||||
|
||||
data SConnType :: ConnType -> Type where
|
||||
SCRcv :: SConnType CRcv
|
||||
SCSnd :: SConnType CSnd
|
||||
|
||||
@@ -38,6 +38,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
getRcvQueue,
|
||||
setRcvQueueNtfCreds,
|
||||
getNextRcvQueue,
|
||||
getNextSndQueue,
|
||||
dbCreateNextRcvQueue,
|
||||
dbCreateNextSndQueue,
|
||||
setRcvQueueAction,
|
||||
@@ -331,19 +332,21 @@ setRcvQueueStatus db RcvQueue {rcvId, server = ProtocolServer {host, port}} stat
|
||||
|]
|
||||
[":status" := status, ":host" := host, ":port" := port, ":rcv_id" := rcvId]
|
||||
|
||||
setRcvQueueConfirmedE2E :: DB.Connection -> RcvQueue -> C.DhSecretX25519 -> Version -> IO ()
|
||||
setRcvQueueConfirmedE2E db RcvQueue {rcvId, server = ProtocolServer {host, port}} e2eDhSecret smpClientVersion =
|
||||
setRcvQueueConfirmedE2E :: DB.Connection -> RcvQueue -> C.APublicVerifyKey -> C.DhSecretX25519 -> Version -> IO ()
|
||||
setRcvQueueConfirmedE2E db RcvQueue {rcvId, server = ProtocolServer {host, port}} sndPublicKey e2eDhSecret smpClientVersion =
|
||||
DB.executeNamed
|
||||
db
|
||||
[sql|
|
||||
UPDATE rcv_queues
|
||||
SET e2e_dh_secret = :e2e_dh_secret,
|
||||
snd_key = :snd_key,
|
||||
status = :status,
|
||||
smp_client_version = :smp_client_version
|
||||
WHERE host = :host AND port = :port AND rcv_id = :rcv_id
|
||||
|]
|
||||
[ ":status" := Confirmed,
|
||||
":e2e_dh_secret" := e2eDhSecret,
|
||||
":snd_key" := sndPublicKey,
|
||||
":smp_client_version" := smpClientVersion,
|
||||
":host" := host,
|
||||
":port" := port,
|
||||
@@ -389,7 +392,7 @@ getNextRcvQueue db = \case
|
||||
db
|
||||
[sql|
|
||||
SELECT q.host, q.port, s.key_hash,
|
||||
q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status,
|
||||
q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.snd_key, q.status,
|
||||
q.rcv_queue_action, q.rcv_queue_action_ts, q.next_rcv_queue_id,
|
||||
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret,
|
||||
q.smp_client_version, q.created_at, q.updated_at
|
||||
@@ -399,21 +402,27 @@ getNextRcvQueue db = \case
|
||||
|]
|
||||
(rqId, True)
|
||||
where
|
||||
rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rqAction_, rqActionTs_, dbNextRcvQueueId) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (smpClientVersion_, createdAt, updatedAt)) =
|
||||
rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, sndPublicKey, status) :. (rqAction_, rqActionTs_, dbNextRcvQueueId) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (smpClientVersion_, createdAt, updatedAt)) =
|
||||
let server = toSMPServer srvRow
|
||||
smpClientVersion = fromMaybe 1 smpClientVersion_
|
||||
rcvQueueAction = (,) <$> rqAction_ <*> rqActionTs_
|
||||
clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of
|
||||
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
|
||||
_ -> Nothing
|
||||
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt}
|
||||
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, sndPublicKey, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt}
|
||||
_ -> pure Nothing
|
||||
|
||||
getNextSndQueue :: DB.Connection -> Maybe Int64 -> IO (Maybe SndQueue)
|
||||
getNextSndQueue _db _sqId_ = pure Nothing
|
||||
|
||||
dbCreateNextRcvQueue :: DB.Connection -> RcvQueue -> RcvQueue -> IO ()
|
||||
dbCreateNextRcvQueue _db _rq _nextRq = pure ()
|
||||
|
||||
dbCreateNextSndQueue :: DB.Connection -> SndQueue -> SndQueue -> IO ()
|
||||
dbCreateNextSndQueue _db _sq _nextSq = pure ()
|
||||
dbCreateNextSndQueue _db _sq _nextSq = do
|
||||
-- create next queue record
|
||||
-- update current queue with the next queue ID
|
||||
pure ()
|
||||
|
||||
setRcvQueueAction :: DB.Connection -> RcvQueue -> Maybe RcvQueueAction -> IO ()
|
||||
setRcvQueueAction _db _rq _rqAction_ = pure ()
|
||||
@@ -1203,7 +1212,7 @@ getRcvQueueByConnId_ dbConn connId =
|
||||
dbConn
|
||||
[sql|
|
||||
SELECT q.host, q.port, s.key_hash,
|
||||
q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status,
|
||||
q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.snd_key, q.status,
|
||||
q.rcv_queue_action, q.rcv_queue_action_ts, q.next_rcv_queue_id,
|
||||
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret,
|
||||
q.smp_client_version, q.created_at, q.updated_at
|
||||
@@ -1213,14 +1222,14 @@ getRcvQueueByConnId_ dbConn connId =
|
||||
|]
|
||||
(connId, False)
|
||||
where
|
||||
rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rqAction_, rqActionTs_, dbNextRcvQueueId) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (smpClientVersion_, createdAt, updatedAt)) =
|
||||
rcvQueue (srvRow :. (rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, sndPublicKey, status) :. (rqAction_, rqActionTs_, dbNextRcvQueueId) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (smpClientVersion_, createdAt, updatedAt)) =
|
||||
let server = toSMPServer srvRow
|
||||
smpClientVersion = fromMaybe 1 smpClientVersion_
|
||||
rcvQueueAction = (,) <$> rqAction_ <*> rqActionTs_
|
||||
clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of
|
||||
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
|
||||
_ -> Nothing
|
||||
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt}
|
||||
in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, sndPublicKey, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt}
|
||||
|
||||
getSndQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe SndQueue)
|
||||
getSndQueueByConnId_ dbConn connId =
|
||||
|
||||
@@ -164,6 +164,7 @@ rcvQueue1 =
|
||||
e2ePrivKey = testPrivDhKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = "2345",
|
||||
sndPublicKey = Nothing,
|
||||
status = New,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
rcvQueueAction = Nothing,
|
||||
@@ -346,6 +347,7 @@ testUpgradeSndConnToDuplex =
|
||||
e2ePrivKey = testPrivDhKey,
|
||||
e2eDhSecret = Nothing,
|
||||
sndId = "4567",
|
||||
sndPublicKey = Nothing,
|
||||
status = New,
|
||||
dbNextRcvQueueId = Nothing,
|
||||
rcvQueueAction = Nothing,
|
||||
|
||||
Reference in New Issue
Block a user