diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7baafeb32..9d462ad3e 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -419,6 +419,7 @@ subscribeConnection' c connId = SomeConn _ (DuplexConnection cData rq sq) -> do resumeMsgDelivery c cData sq subscribe rq + doRcvQueueAction c cData rq sq SomeConn _ (SndConnection cData sq) -> do resumeMsgDelivery c cData sq case status (sq :: SndQueue) of @@ -434,25 +435,31 @@ subscribeConnection' c connId = subscribeQueue c rq connId ns <- asks ntfSupervisor atomically $ sendNtfSubCommand ns (connId, NSCCreate) - doRcvQueueAction c rq -- TODO expire actions -doRcvQueueAction :: AgentMonad m => AgentClient -> RcvQueue -> m () -doRcvQueueAction c rq@RcvQueue {rcvQueueAction} = forM_ rcvQueueAction $ \(a, _ts) -> case a of - RQACreateNextQueue -> createNextRcvQueue c rq - RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue - RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue - RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue +doRcvQueueAction :: AgentMonad m => AgentClient -> ConnData -> RcvQueue -> SndQueue -> m () +doRcvQueueAction c cData rq@RcvQueue {rcvQueueAction} sq = + forM_ rcvQueueAction $ \(a, _ts) -> case a of + RQACreateNextQueue -> createNextRcvQueue c cData rq sq + RQASecureNextQueue -> withNextRcvQueue c rq secureNextRcvQueue + RQASuspendCurrQueue -> withNextRcvQueue c rq suspendCurrRcvQueue + RQADeleteCurrQueue -> withNextRcvQueue c rq deleteCurrRcvQueue -createNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () -createNextRcvQueue c rq = do - _nextRq_ <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) - -- unless new rcv queue exists - -- then newRcvQueue - -- store to the database - -- enqueue QNEW message - -- rcv_queue_action = null - pure () +createNextRcvQueue :: AgentMonad m => AgentClient -> ConnData -> RcvQueue -> SndQueue -> m () +createNextRcvQueue c cData rq@RcvQueue {server, sndId} sq = do + clientVRange <- asks $ smpClientVRange . config + nextQueueUri <- + withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case + Just RcvQueue {server = smpServer, sndId = senderId, e2ePrivKey} -> do + let queueAddress = SMPQueueAddress {smpServer, senderId, dhPublicKey = C.publicKey e2ePrivKey} + pure SMPQueueUri {clientVRange, queueAddress} + _ -> do + srv <- getSMPServer c + (rq', qUri) <- newRcvQueue c srv clientVRange + withStore' c $ \db -> dbCreateNextRcvQueue db rq rq' + pure qUri + void $ enqueueMessage c cData sq SMP.noMsgFlags QNEW {currentAddress = (server, sndId), nextQueueUri} + withStore' c $ \db -> setRcvQueueAction db rq Nothing secureNextRcvQueue :: AgentMonad m => AgentClient -> RcvQueue -> RcvQueue -> m () secureNextRcvQueue _c _mainRq _nextRq = do @@ -507,7 +514,9 @@ subscribeConnections' c connIds = do when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs let rs = M.unions $ errs' : sndRs : rcvRs notifyResultError rs - void . forkIO . forM_ rcvQs $ doRcvQueueAction c . fst + void . forkIO . forM_ cs $ \case + SomeConn _ (DuplexConnection cData rq sq) -> doRcvQueueAction c cData rq sq + _ -> pure () pure rs where rcvOrSndQueue :: SomeConn -> Either (SndQueue, ConnData) (RcvQueue, ConnData) @@ -788,10 +797,10 @@ ackMessage' c connId msgId = do switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m () switchConnection' c connId = withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection _ rq _) -> do - -- rcv_queue_action = RQACreateNewQueue - createNextRcvQueue c rq - pure () + SomeConn _ (DuplexConnection cData rq sq) -> do + -- TODO check that rotation is possible (whether the current server supports it) + withStore' c $ \db -> setRcvQueueAction db rq $ Just RQACreateNextQueue + createNextRcvQueue c cData rq sq SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX _ -> throwError $ CMD PROHIBITED @@ -1110,8 +1119,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm A_MSG body -> do logServer "<--" c srv rId "MSG " notify $ MSG msgMeta msgFlags body - QNEW addr qInfo -> rqNewMsg addr qInfo >> ackDelete msgId - QKEYS sKey qInfo -> rqKeys sKey qInfo $ ackDelete msgId + QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId + QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId QREADY addr -> rqReady addr >> ackDelete msgId QHELLO -> rqHello $ ackDelete msgId QSWITCH addr -> rqSwitch addr >> ackDelete msgId @@ -1280,16 +1289,31 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm _ -> prohibited -- processed by queue sender - rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueInfo -> m () - rqNewMsg _addr _qInfo = do - -- generate sender and DH keys - -- store new send queue, update current send queue with queue ID of the new queue - -- Enqueue QKEYS message to the main rcv queue - pure () + rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueUri -> m () + rqNewMsg _currAddr nextQUri = case conn of + DuplexConnection _ _ sq -> do + clientVRange <- asks $ smpClientVRange . config + case (nextQUri `compatibleVersion` clientVRange) of + Just qInfo@(Compatible nextQueueInfo) -> do + sq'@SndQueue {sndPublicKey} <- newSndQueue qInfo + withStore' c $ \db -> dbCreateNextSndQueue db sq sq' + case sndPublicKey of + Just nextSenderKey -> + void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo} + -- TODO possibly, notify user that the queue is rotating + _ -> do + -- TODO notify user: internal error + pure () + _ -> do + -- TODO notify user: incompatible version + pure () + _ -> do + -- TODO notify user: message can only be sent to duplex connection + pure () -- processed by queue recipient rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m () - rqKeys _sKey _qInfo ackDelete = do + rqKeys _sKey _nextQInfo ackDelete = do -- store sender keys -- new rcv queue status = Confirmed -- old rcv_queue_action = RQASecureNewQueue diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1da9691fa..fb9aa6fb5 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -500,7 +500,7 @@ newRcvQueue_ a c srv vRange = do rcvDhSecret = C.dh' rcvPublicDhKey privDhKey, e2ePrivKey, e2eDhSecret = Nothing, - sndId = Just sndId, + sndId, status = New, rcvQueueAction = Nothing, dbNextRcvQueueId = Nothing, diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 5caa9005b..60f17098b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -548,15 +548,15 @@ data AMessage | -- | agent envelope for the client message A_MSG MsgBody | -- instruct sender to switch the queue to another - QNEW (SMPServer, SMP.SenderId) SMPQueueInfo + QNEW {currentAddress :: (SMPServer, SMP.SenderId), nextQueueUri :: SMPQueueUri} | -- send server key and queue e2e DH key to the recipient - QKEYS SndPublicVerifyKey SMPQueueInfo + QKEYS {nextSenderKey :: SndPublicVerifyKey, nextQueueInfo :: SMPQueueInfo} | -- inform the sender that the queue is ready to use - sender sends QHELLO to it - QREADY (SMPServer, SMP.SenderId) + QREADY {nextAddress :: (SMPServer, SMP.SenderId)} | -- the first message sent by the sender to the new queue QHELLO | -- instruct the sender to start sending messages to the new queue - after recipient receives HELLO - QSWITCH (SMPServer, SMP.SenderId) + QSWITCH {nextAddress :: (SMPServer, SMP.SenderId)} deriving (Show) instance Encoding AMessage where @@ -564,8 +564,8 @@ instance Encoding AMessage where HELLO -> smpEncode HELLO_ REPLY smpQueues -> smpEncode (REPLY_, smpQueues) A_MSG body -> smpEncode (A_MSG_, Tail body) - QNEW addr qInfo -> smpEncode (QNEW_, addr, qInfo) - QKEYS qInfo sKey -> smpEncode (QKEYS_, qInfo, sKey) + QNEW currAddr nextQUri -> smpEncode (QNEW_, currAddr, strEncode nextQUri) + QKEYS sKey nextQInfo -> smpEncode (QKEYS_, sKey, nextQInfo) QREADY addr -> smpEncode (QREADY_, addr) QHELLO -> smpEncode QHELLO_ QSWITCH addr -> smpEncode (QSWITCH_, addr) @@ -575,7 +575,10 @@ instance Encoding AMessage where HELLO_ -> pure HELLO REPLY_ -> REPLY <$> smpP A_MSG_ -> A_MSG . unTail <$> smpP - QNEW_ -> QNEW <$> smpP <*> smpP + QNEW_ -> do + currentAddress <- smpP + nextQueueUri <- strDecode <$?> smpP + pure QNEW {currentAddress, nextQueueUri} QKEYS_ -> QKEYS <$> smpP <*> smpP QREADY_ -> QREADY <$> smpP QHELLO_ -> pure QHELLO diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index abfc9ea6a..bb4d79d1d 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -52,7 +52,7 @@ data RcvQueue = RcvQueue -- | public sender's DH key and agreed shared DH secret for simple per-queue e2e e2eDhSecret :: Maybe C.DhSecretX25519, -- | sender queue ID - sndId :: Maybe SMP.SenderId, + sndId :: SMP.SenderId, -- | queue status status :: QueueStatus, -- | action to perform, to be done on connection subscription, if it fails and not reset diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index e595a5e93..b7e6efae1 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -38,6 +38,9 @@ module Simplex.Messaging.Agent.Store.SQLite getRcvQueue, setRcvQueueNtfCreds, getNextRcvQueue, + dbCreateNextRcvQueue, + dbCreateNextSndQueue, + setRcvQueueAction, -- Confirmations createConfirmation, acceptConfirmation, @@ -405,6 +408,15 @@ getNextRcvQueue db = \case in RcvQueue {server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, rcvQueueAction, dbNextRcvQueueId, smpClientVersion, clientNtfCreds, createdAt, updatedAt} _ -> pure Nothing +dbCreateNextRcvQueue :: DB.Connection -> RcvQueue -> RcvQueue -> IO () +dbCreateNextRcvQueue _db _rq _nextRq = pure () + +dbCreateNextSndQueue :: DB.Connection -> SndQueue -> SndQueue -> IO () +dbCreateNextSndQueue _db _sq _nextSq = pure () + +setRcvQueueAction :: DB.Connection -> RcvQueue -> Maybe RcvQueueAction -> IO () +setRcvQueueAction _db _rq _rqAction_ = pure () + type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version) smpConfirmation :: SMPConfirmationRow -> SMPConfirmation @@ -1129,7 +1141,7 @@ insertRcvQueue_ db connId RcvQueue {..} = do db [sql| INSERT INTO rcv_queues - (rcv_queue_id, host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?); + (rcv_queue_id, host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?); |] ((qId, host server, port server, rcvId, connId) :. (rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (smpClientVersion, createdAt, updatedAt)) @@ -1142,7 +1154,7 @@ insertSndQueue_ db connId SndQueue {..} = do db [sql| INSERT INTO snd_queues - (snd_queue_id, host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?); + (snd_queue_id, host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, smp_client_version, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?); |] ((qId, host server, port server, sndId, connId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (smpClientVersion, createdAt, updatedAt)) diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 32715379a..f14b91855 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -163,7 +163,7 @@ rcvQueue1 = rcvDhSecret = testDhSecret, e2ePrivKey = testPrivDhKey, e2eDhSecret = Nothing, - sndId = Just "2345", + sndId = "2345", status = New, dbNextRcvQueueId = Nothing, rcvQueueAction = Nothing, @@ -345,7 +345,7 @@ testUpgradeSndConnToDuplex = rcvDhSecret = testDhSecret, e2ePrivKey = testPrivDhKey, e2eDhSecret = Nothing, - sndId = Just "4567", + sndId = "4567", status = New, dbNextRcvQueueId = Nothing, rcvQueueAction = Nothing,